Add periodic reconciliation, node health checker, and graceful pod termination

Three high-priority reliability features that close gaps identified in AUDIT.md:

1. Periodic reconciliation: PodController now runs reconcile_all() every 30s
   via a tokio::time::interval branch in the select! loop, detecting zone
   crashes between events.

2. Node health checker: New NodeHealthChecker polls node heartbeats every 15s
   and marks nodes with stale heartbeats (>40s) as NotReady with reason
   NodeStatusUnknown, preserving last_transition_time correctly.

3. Graceful pod termination: DELETE sets deletion_timestamp and phase=Terminating
   instead of immediate removal. Controller drives a state machine (shutdown →
   halt on grace expiry → deprovision → finalize) with periodic reconcile
   advancing it. New POST .../finalize endpoint performs actual storage removal.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Till Wegmueller 2026-02-14 20:39:36 +01:00
parent cb6ca8cd3c
commit 58171c7555
No known key found for this signature in database
10 changed files with 924 additions and 6 deletions

1
Cargo.lock generated
View file

@ -1488,6 +1488,7 @@ version = "0.1.0"
dependencies = [
"axum",
"axum-server",
"chrono",
"futures-util",
"hyper",
"json-patch",

View file

@ -25,6 +25,7 @@ tracing-subscriber = { workspace = true }
uuid = { workspace = true }
tokio-stream = { workspace = true }
futures-util = { workspace = true }
chrono = { workspace = true }
json-patch = "3.0"
rcgen = { workspace = true }
rustls = { workspace = true }

View file

@ -14,6 +14,8 @@ use reddwarf_storage::KeyEncoder;
use std::sync::Arc;
use tracing::info;
const DEFAULT_TERMINATION_GRACE_PERIOD: i64 = 30;
/// GET /api/v1/namespaces/{namespace}/pods/{name}
pub async fn get_pod(
State(state): State<Arc<AppState>>,
@ -94,12 +96,64 @@ pub async fn replace_pod(
}
/// DELETE /api/v1/namespaces/{namespace}/pods/{name}
///
/// Initiates graceful termination: sets deletion_timestamp and phase=Terminating
/// instead of immediately removing the pod from storage. The controller will
/// drive the zone shutdown state machine and call finalize_pod() when cleanup
/// is complete.
pub async fn delete_pod(
State(state): State<Arc<AppState>>,
Path((namespace, name)): Path<(String, String)>,
) -> Result<Response> {
info!("Deleting pod: {}/{}", namespace, name);
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, namespace.clone(), name.clone());
let mut pod: Pod = get_resource(&state, &key).await?;
// Idempotent: if deletion_timestamp is already set, return current state
if pod.metadata.deletion_timestamp.is_some() {
info!(
"Pod {}/{} already has deletion_timestamp set, returning current state",
namespace, name
);
return Ok(ApiResponse::ok(pod).into_response());
}
// Set deletion metadata
pod.metadata.deletion_timestamp = Some(
reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()),
);
// Grace period from spec, defaulting to 30s
let grace_period = pod
.spec
.as_ref()
.and_then(|s| s.termination_grace_period_seconds)
.unwrap_or(DEFAULT_TERMINATION_GRACE_PERIOD);
pod.metadata.deletion_grace_period_seconds = Some(grace_period);
// Set phase to Terminating
let status = pod.status.get_or_insert_with(Default::default);
status.phase = Some("Terminating".to_string());
// Update resource — emits a MODIFIED event (not DELETED)
let updated = update_resource(&state, pod).await?;
Ok(ApiResponse::ok(updated).into_response())
}
/// POST /api/v1/namespaces/{namespace}/pods/{name}/finalize
///
/// Performs the actual storage removal of a pod. Called by the controller after
/// zone cleanup is complete.
pub async fn finalize_pod(
State(state): State<Arc<AppState>>,
Path((namespace, name)): Path<(String, String)>,
) -> Result<Response> {
info!("Finalizing pod: {}/{}", namespace, name);
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, namespace, name.clone());
@ -303,4 +357,128 @@ mod tests {
assert!(matches!(event.event_type, WatchEventType::Modified));
assert_eq!(event.resource_key.name, "event-test");
}
#[tokio::test]
async fn test_delete_pod_sets_deletion_timestamp() {
let state = setup_state().await;
let pod = make_test_pod("graceful-pod", "default");
create_resource(&*state, pod).await.unwrap();
// Simulate what delete_pod handler does: read, set deletion_timestamp, update
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk.clone(), "default", "graceful-pod");
let mut pod: Pod = get_resource(&*state, &key).await.unwrap();
assert!(pod.metadata.deletion_timestamp.is_none());
// Set deletion metadata
pod.metadata.deletion_timestamp = Some(
reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
chrono::Utc::now(),
),
);
pod.metadata.deletion_grace_period_seconds = Some(30);
let status = pod.status.get_or_insert_with(Default::default);
status.phase = Some("Terminating".to_string());
let updated = update_resource(&*state, pod).await.unwrap();
// Pod should still exist in storage
let retrieved: Pod = get_resource(&*state, &key).await.unwrap();
assert!(retrieved.metadata.deletion_timestamp.is_some());
assert_eq!(retrieved.metadata.deletion_grace_period_seconds, Some(30));
assert_eq!(
retrieved.status.as_ref().unwrap().phase.as_deref(),
Some("Terminating")
);
// Should have emitted a MODIFIED event (not DELETED)
assert!(updated.resource_version().is_some());
}
#[tokio::test]
async fn test_delete_pod_idempotent() {
let state = setup_state().await;
let pod = make_test_pod("idem-pod", "default");
create_resource(&*state, pod).await.unwrap();
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, "default", "idem-pod");
// First delete: set deletion_timestamp
let mut pod: Pod = get_resource(&*state, &key).await.unwrap();
pod.metadata.deletion_timestamp = Some(
reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
chrono::Utc::now(),
),
);
pod.metadata.deletion_grace_period_seconds = Some(30);
pod.status
.get_or_insert_with(Default::default)
.phase = Some("Terminating".to_string());
update_resource(&*state, pod).await.unwrap();
// Second "delete" attempt should see deletion_timestamp is already set
let pod2: Pod = get_resource(&*state, &key).await.unwrap();
assert!(pod2.metadata.deletion_timestamp.is_some());
// Pod should still exist in storage (not removed)
assert_eq!(
pod2.status.as_ref().unwrap().phase.as_deref(),
Some("Terminating")
);
}
#[tokio::test]
async fn test_finalize_removes_pod_from_storage() {
let state = setup_state().await;
let pod = make_test_pod("finalize-pod", "default");
create_resource(&*state, pod).await.unwrap();
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, "default", "finalize-pod");
// Pod should exist
let _: Pod = get_resource(&*state, &key).await.unwrap();
// Finalize (actual storage removal)
delete_resource(&state, &key).await.unwrap();
// Pod should be gone
let result: std::result::Result<Pod, _> = get_resource(&*state, &key).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_delete_fires_modified_not_deleted_event() {
let state = setup_state().await;
let pod = make_test_pod("event-del-pod", "default");
create_resource(&*state, pod).await.unwrap();
// Subscribe to events
let mut rx = state.subscribe();
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, "default", "event-del-pod");
let mut pod: Pod = get_resource(&*state, &key).await.unwrap();
pod.metadata.deletion_timestamp = Some(
reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
chrono::Utc::now(),
),
);
pod.status
.get_or_insert_with(Default::default)
.phase = Some("Terminating".to_string());
update_resource(&*state, pod).await.unwrap();
// Should get a MODIFIED event
let event = rx.recv().await.unwrap();
assert!(matches!(event.event_type, WatchEventType::Modified));
assert_eq!(event.resource_key.name, "event-del-pod");
}
}

View file

@ -72,6 +72,10 @@ impl ApiServer {
"/api/v1/namespaces/{namespace}/pods/{name}/status",
axum::routing::put(update_pod_status),
)
.route(
"/api/v1/namespaces/{namespace}/pods/{name}/finalize",
axum::routing::post(finalize_pod),
)
.route("/api/v1/pods", get(list_pods))
// Nodes
.route("/api/v1/nodes", get(list_nodes).post(create_node))

View file

@ -227,6 +227,36 @@ impl ApiClient {
.map_err(|e| RuntimeError::internal_error(format!("Failed to parse node: {}", e)))
}
/// POST /api/v1/namespaces/{namespace}/pods/{name}/finalize
///
/// Called by the controller after zone cleanup is complete to remove the pod
/// from API server storage.
pub async fn finalize_pod(&self, namespace: &str, name: &str) -> Result<()> {
let url = format!(
"{}/api/v1/namespaces/{}/pods/{}/finalize",
self.base_url, namespace, name
);
debug!("POST {}", url);
let resp = self
.client
.post(&url)
.send()
.await
.map_err(|e| RuntimeError::internal_error(format!("HTTP request failed: {}", e)))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(RuntimeError::internal_error(format!(
"POST finalize pod failed with status {}: {}",
status, body
)));
}
Ok(())
}
pub fn base_url(&self) -> &str {
&self.base_url
}

View file

@ -3,9 +3,11 @@ use crate::error::{Result, RuntimeError};
use crate::network::{vnic_name_for_pod, Ipam};
use crate::traits::ZoneRuntime;
use crate::types::*;
use chrono::Utc;
use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus};
use reddwarf_core::{ResourceEvent, ResourceQuantities, WatchEventType};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
@ -25,6 +27,8 @@ pub struct PodControllerConfig {
pub etherstub_name: String,
/// Pod CIDR (e.g., "10.88.0.0/16")
pub pod_cidr: String,
/// Interval between periodic full reconciliation cycles
pub reconcile_interval: Duration,
}
/// Pod controller that watches for Pod events and drives zone lifecycle
@ -69,6 +73,9 @@ impl PodController {
}
let mut rx = self.event_tx.subscribe();
let mut reconcile_tick = tokio::time::interval(self.config.reconcile_interval);
// Consume the first tick — we just did reconcile_all() above
reconcile_tick.tick().await;
loop {
tokio::select! {
@ -76,6 +83,12 @@ impl PodController {
info!("Pod controller shutting down");
return Ok(());
}
_ = reconcile_tick.tick() => {
debug!("Periodic reconcile tick");
if let Err(e) = self.reconcile_all().await {
error!("Periodic reconcile failed: {}", e);
}
}
result = rx.recv() => {
match result {
Ok(event) => {
@ -185,6 +198,11 @@ impl PodController {
return Ok(());
}
// If the pod has a deletion_timestamp, drive the termination state machine
if pod.metadata.deletion_timestamp.is_some() {
return self.handle_termination(pod).await;
}
// Check current phase
let phase = pod
.status
@ -321,7 +339,12 @@ impl PodController {
Ok(())
}
/// Handle pod deletion — deprovision the zone and release IP
/// Handle pod deletion — deprovision the zone and release IP.
///
/// If the pod has a `deletion_timestamp`, the graceful termination state
/// machine (`handle_termination`) is responsible for cleanup, so this method
/// becomes a no-op. Otherwise (e.g. a direct storage delete that bypasses
/// the graceful path), fall back to the original immediate cleanup.
pub async fn handle_delete(&self, pod: &Pod) -> Result<()> {
let pod_name = pod
.metadata
@ -330,6 +353,15 @@ impl PodController {
.ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?;
let namespace = pod.metadata.namespace.as_deref().unwrap_or("default");
// If deletion_timestamp is set, handle_termination is driving cleanup
if pod.metadata.deletion_timestamp.is_some() {
debug!(
"Pod {}/{} has deletion_timestamp, skipping handle_delete (handled by termination state machine)",
namespace, pod_name
);
return Ok(());
}
// Only deprovision pods assigned to this node
if let Some(spec) = &pod.spec {
if let Some(node_name) = &spec.node_name {
@ -365,6 +397,142 @@ impl PodController {
Ok(())
}
/// Drive the graceful termination state machine for a pod with
/// `deletion_timestamp` set.
///
/// | Zone State | Grace Expired? | Action |
/// |-----------------|----------------|--------------------------------------------|
/// | Running | No | shutdown_zone() (graceful) |
/// | Running | Yes | halt_zone() (force kill) |
/// | ShuttingDown | No | Wait (next reconcile will re-check) |
/// | ShuttingDown | Yes | halt_zone() (force kill) |
/// | Stopped/Absent | — | deprovision(), release IP, finalize_pod() |
async fn handle_termination(&self, pod: &Pod) -> Result<()> {
let pod_name = pod
.metadata
.name
.as_deref()
.ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?;
let namespace = pod.metadata.namespace.as_deref().unwrap_or("default");
let zone_name = pod_zone_name(namespace, pod_name);
let grace_expired = self.is_grace_period_expired(pod);
// Query actual zone state
let zone_state = match self.runtime.get_zone_state(&zone_name).await {
Ok(state) => state,
Err(RuntimeError::ZoneNotFound { .. }) => ZoneState::Absent,
Err(e) => {
warn!(
"Could not check zone state for {}: {} — treating as absent",
zone_name, e
);
ZoneState::Absent
}
};
debug!(
"Termination state machine for pod {}/{}: zone={}, grace_expired={}",
namespace, pod_name, zone_state, grace_expired
);
match zone_state {
ZoneState::Running => {
if grace_expired {
warn!(
"Grace period expired for pod {}/{}, force halting zone {}",
namespace, pod_name, zone_name
);
if let Err(e) = self.runtime.halt_zone(&zone_name).await {
warn!("Failed to halt zone {}: {}", zone_name, e);
}
// Deprovision will happen on next reconcile when zone is stopped
} else {
info!(
"Initiating graceful shutdown for zone {} (pod {}/{})",
zone_name, namespace, pod_name
);
if let Err(e) = self.runtime.shutdown_zone(&zone_name).await {
warn!("Failed to shut down zone {}: {}", zone_name, e);
}
// Next reconcile will re-check the zone state
}
}
ZoneState::ShuttingDown => {
if grace_expired {
warn!(
"Grace period expired while zone {} was shutting down, force halting",
zone_name
);
if let Err(e) = self.runtime.halt_zone(&zone_name).await {
warn!("Failed to halt zone {}: {}", zone_name, e);
}
} else {
debug!(
"Zone {} is gracefully shutting down, waiting for next reconcile",
zone_name
);
}
}
// Zone is stopped or absent — full cleanup
_ => {
info!(
"Zone {} is stopped/absent, cleaning up pod {}/{}",
zone_name, namespace, pod_name
);
// Try to deprovision (cleans up datasets, network, etc.)
// Build a minimal zone config for deprovision — use existing IP if allocated
let zone_config_result = self.pod_to_zone_config(pod);
if let Ok(zone_config) = zone_config_result {
if let Err(e) = self.runtime.deprovision(&zone_config).await {
// Zone may already be fully cleaned up — that's fine
debug!(
"Deprovision for zone {} returned error (may be expected): {}",
zone_name, e
);
}
}
// Release the IP allocation
if let Err(e) = self.ipam.release(namespace, pod_name) {
debug!(
"Failed to release IP for pod {}/{}: {} (may already be released)",
namespace, pod_name, e
);
}
// Finalize — remove the pod from API server storage
if let Err(e) = self.api_client.finalize_pod(namespace, pod_name).await {
error!(
"Failed to finalize pod {}/{}: {}",
namespace, pod_name, e
);
} else {
info!("Pod {}/{} finalized and removed", namespace, pod_name);
}
}
}
Ok(())
}
/// Check whether the pod's grace period has expired
fn is_grace_period_expired(&self, pod: &Pod) -> bool {
let deletion_ts = match &pod.metadata.deletion_timestamp {
Some(t) => t.0,
None => return false,
};
let grace_secs = pod
.metadata
.deletion_grace_period_seconds
.unwrap_or(30);
let deadline = deletion_ts + chrono::Duration::seconds(grace_secs);
Utc::now() >= deadline
}
/// Convert a Pod spec to a ZoneConfig with per-pod VNIC and IP
fn pod_to_zone_config(&self, pod: &Pod) -> Result<ZoneConfig> {
let pod_name = pod
@ -510,6 +678,7 @@ mod tests {
use k8s_openapi::api::core::v1::{Container, PodSpec};
use reddwarf_storage::RedbBackend;
use std::net::Ipv4Addr;
use std::time::Duration;
use tempfile::tempdir;
fn make_test_controller() -> (PodController, tempfile::TempDir) {
@ -532,6 +701,7 @@ mod tests {
default_brand: ZoneBrand::Reddwarf,
etherstub_name: "reddwarf0".to_string(),
pod_cidr: "10.88.0.0/16".to_string(),
reconcile_interval: Duration::from_secs(30),
};
let controller = PodController::new(runtime, api_client, event_tx, config, ipam);
@ -803,4 +973,172 @@ mod tests {
assert_eq!(zone_config.cpu_cap, None);
assert_eq!(zone_config.memory_cap, None);
}
#[test]
fn test_grace_period_not_expired() {
let (controller, _dir) = make_test_controller();
let mut pod = Pod::default();
pod.metadata.name = Some("grace-pod".to_string());
pod.metadata.deletion_timestamp = Some(
k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()),
);
pod.metadata.deletion_grace_period_seconds = Some(30);
assert!(!controller.is_grace_period_expired(&pod));
}
#[test]
fn test_grace_period_expired() {
let (controller, _dir) = make_test_controller();
let mut pod = Pod::default();
pod.metadata.name = Some("expired-pod".to_string());
// Set deletion_timestamp 60 seconds in the past
pod.metadata.deletion_timestamp = Some(
k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
Utc::now() - chrono::Duration::seconds(60),
),
);
pod.metadata.deletion_grace_period_seconds = Some(30);
assert!(controller.is_grace_period_expired(&pod));
}
#[test]
fn test_grace_period_no_deletion_timestamp() {
let (controller, _dir) = make_test_controller();
let pod = Pod::default();
assert!(!controller.is_grace_period_expired(&pod));
}
#[tokio::test]
async fn test_handle_termination_absent_zone_calls_finalize() {
let (controller, _dir) = make_test_controller();
// Build a pod with deletion_timestamp that is assigned to our node
let mut pod = Pod::default();
pod.metadata.name = Some("term-pod".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.metadata.deletion_timestamp = Some(
k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()),
);
pod.metadata.deletion_grace_period_seconds = Some(30);
pod.spec = Some(PodSpec {
node_name: Some("node1".to_string()),
containers: vec![Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
..Default::default()
}],
..Default::default()
});
// Zone doesn't exist in the mock runtime → should be treated as Absent
// handle_termination will try to deprovision (fail, that's ok) and then
// finalize (will fail since no API server, but the path is exercised)
let result = controller.handle_termination(&pod).await;
// The function should complete Ok (finalize failure is logged, not returned)
assert!(result.is_ok());
}
#[tokio::test]
async fn test_handle_termination_running_zone_graceful_shutdown() {
let (controller, _dir) = make_test_controller();
// First, provision a zone so it's Running
let mut pod = Pod::default();
pod.metadata.name = Some("running-term".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.spec = Some(PodSpec {
node_name: Some("node1".to_string()),
containers: vec![Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
..Default::default()
}],
..Default::default()
});
// Provision the zone through the runtime
let zone_config = controller.pod_to_zone_config(&pod).unwrap();
controller.runtime.provision(&zone_config).await.unwrap();
// Verify it's running
let zone_name = pod_zone_name("default", "running-term");
let state = controller.runtime.get_zone_state(&zone_name).await.unwrap();
assert_eq!(state, ZoneState::Running);
// Set deletion_timestamp (grace period NOT expired)
pod.metadata.deletion_timestamp = Some(
k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()),
);
pod.metadata.deletion_grace_period_seconds = Some(30);
// Handle termination — should call shutdown_zone
let result = controller.handle_termination(&pod).await;
assert!(result.is_ok());
// After shutdown, MockRuntime transitions zone to Installed
let state = controller.runtime.get_zone_state(&zone_name).await.unwrap();
assert_eq!(state, ZoneState::Installed);
}
#[tokio::test]
async fn test_handle_delete_skips_when_deletion_timestamp_set() {
let (controller, _dir) = make_test_controller();
let mut pod = Pod::default();
pod.metadata.name = Some("skip-pod".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.metadata.deletion_timestamp = Some(
k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()),
);
pod.spec = Some(PodSpec {
node_name: Some("node1".to_string()),
containers: vec![Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
..Default::default()
}],
..Default::default()
});
// handle_delete should return Ok immediately — skipping deprovision
let result = controller.handle_delete(&pod).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_reconcile_with_deletion_timestamp_uses_termination() {
let (controller, _dir) = make_test_controller();
// Build a pod with deletion_timestamp, assigned to our node, phase Terminating
let mut pod = Pod::default();
pod.metadata.name = Some("recon-term".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.metadata.deletion_timestamp = Some(
k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()),
);
pod.metadata.deletion_grace_period_seconds = Some(30);
pod.spec = Some(PodSpec {
node_name: Some("node1".to_string()),
containers: vec![Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
..Default::default()
}],
..Default::default()
});
pod.status = Some(PodStatus {
phase: Some("Terminating".to_string()),
..Default::default()
});
// reconcile() should detect deletion_timestamp and call handle_termination
// Zone is absent → will try to finalize (will fail, but logged)
let result = controller.reconcile(&pod).await;
assert!(result.is_ok());
}
}

View file

@ -11,6 +11,7 @@ pub mod illumos;
pub mod mock;
pub mod network;
pub mod node_agent;
pub mod node_health;
pub mod storage;
pub mod traits;
pub mod types;
@ -35,6 +36,7 @@ pub use storage::{MockStorageEngine, StorageEngine, VolumeInfo};
pub use api_client::ApiClient;
pub use controller::{PodController, PodControllerConfig};
pub use node_agent::{NodeAgent, NodeAgentConfig};
pub use node_health::{NodeHealthChecker, NodeHealthCheckerConfig};
// Conditionally re-export illumos runtime
#[cfg(target_os = "illumos")]

View file

@ -0,0 +1,352 @@
use crate::api_client::ApiClient;
use crate::error::Result;
use chrono::Utc;
use k8s_openapi::api::core::v1::{Node, NodeCondition};
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
/// Configuration for the node health checker
#[derive(Debug, Clone)]
pub struct NodeHealthCheckerConfig {
/// Interval between health checks
pub check_interval: Duration,
/// How long since the last heartbeat before a node is considered stale
pub heartbeat_timeout: Duration,
}
impl Default for NodeHealthCheckerConfig {
fn default() -> Self {
Self {
check_interval: Duration::from_secs(15),
// 4x the default heartbeat interval (10s) = 40s
heartbeat_timeout: Duration::from_secs(40),
}
}
}
/// Periodically checks node heartbeats and marks stale nodes as NotReady
pub struct NodeHealthChecker {
api_client: Arc<ApiClient>,
config: NodeHealthCheckerConfig,
}
impl NodeHealthChecker {
pub fn new(api_client: Arc<ApiClient>, config: NodeHealthCheckerConfig) -> Self {
Self { api_client, config }
}
/// Run the health checker loop
pub async fn run(&self, token: CancellationToken) -> Result<()> {
info!(
"Starting node health checker (interval: {:?}, timeout: {:?})",
self.config.check_interval, self.config.heartbeat_timeout
);
let mut interval = tokio::time::interval(self.config.check_interval);
// Consume the first immediate tick — nodes just registered
interval.tick().await;
loop {
tokio::select! {
_ = token.cancelled() => {
info!("Node health checker shutting down");
return Ok(());
}
_ = interval.tick() => {
if let Err(e) = self.check_all_nodes().await {
error!("Node health check failed: {}", e);
}
}
}
}
}
/// Check all nodes for stale heartbeats
async fn check_all_nodes(&self) -> Result<()> {
debug!("Running node health check");
let body = self.api_client.get_json("/api/v1/nodes").await?;
let items = body["items"].as_array().cloned().unwrap_or_default();
for item in items {
let node: Node = match serde_json::from_value(item) {
Ok(n) => n,
Err(e) => {
warn!("Failed to parse node from list: {}", e);
continue;
}
};
let node_name = match node.metadata.name.as_deref() {
Some(n) => n,
None => continue,
};
if let Err(e) = self.check_node(node_name, &node).await {
warn!("Failed to check node {}: {}", node_name, e);
}
}
Ok(())
}
/// Check a single node's heartbeat and mark it NotReady if stale
async fn check_node(&self, node_name: &str, node: &Node) -> Result<()> {
let conditions = match node
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
{
Some(c) => c,
None => {
debug!("Node {} has no conditions, skipping", node_name);
return Ok(());
}
};
let ready_condition = match conditions.iter().find(|c| c.type_ == "Ready") {
Some(c) => c,
None => {
debug!("Node {} has no Ready condition, skipping", node_name);
return Ok(());
}
};
// Skip if already marked NotReady by us (avoid re-updating)
if ready_condition.status == "False"
&& ready_condition.reason.as_deref() == Some("NodeStatusUnknown")
{
debug!(
"Node {} already marked NotReady by health checker, skipping",
node_name
);
return Ok(());
}
// Check heartbeat staleness
let last_heartbeat = match &ready_condition.last_heartbeat_time {
Some(t) => t.0,
None => {
debug!("Node {} has no last_heartbeat_time, skipping", node_name);
return Ok(());
}
};
let elapsed = Utc::now() - last_heartbeat;
let timeout = chrono::Duration::from_std(self.config.heartbeat_timeout)
.unwrap_or(chrono::Duration::seconds(40));
if elapsed <= timeout {
debug!(
"Node {} heartbeat is fresh ({}s ago)",
node_name,
elapsed.num_seconds()
);
return Ok(());
}
// Node is stale — mark it NotReady
warn!(
"Node {} heartbeat is stale ({}s ago, timeout {}s) — marking NotReady",
node_name,
elapsed.num_seconds(),
timeout.num_seconds()
);
let mut updated_node = node.clone();
// Preserve last_transition_time if the status was already False,
// otherwise set it to now
let last_transition_time = if ready_condition.status == "False" {
ready_condition.last_transition_time.clone()
} else {
Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
Utc::now(),
))
};
let new_condition = NodeCondition {
type_: "Ready".to_string(),
status: "False".to_string(),
reason: Some("NodeStatusUnknown".to_string()),
message: Some(format!(
"Node heartbeat not received for {}s (timeout: {}s)",
elapsed.num_seconds(),
timeout.num_seconds()
)),
last_heartbeat_time: ready_condition.last_heartbeat_time.clone(),
last_transition_time,
};
// Replace the Ready condition in the node status
if let Some(status) = updated_node.status.as_mut() {
if let Some(conditions) = status.conditions.as_mut() {
if let Some(ready) = conditions.iter_mut().find(|c| c.type_ == "Ready") {
*ready = new_condition;
}
}
}
self.api_client
.update_node_status(node_name, &updated_node)
.await?;
info!("Node {} marked as NotReady (stale heartbeat)", node_name);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use k8s_openapi::api::core::v1::{NodeCondition, NodeStatus};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, Time};
fn make_node(name: &str, ready_status: &str, heartbeat_age_secs: i64) -> Node {
let heartbeat_time = Utc::now() - chrono::Duration::seconds(heartbeat_age_secs);
Node {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
status: Some(NodeStatus {
conditions: Some(vec![NodeCondition {
type_: "Ready".to_string(),
status: ready_status.to_string(),
reason: Some("KubeletReady".to_string()),
message: Some("node agent is healthy".to_string()),
last_heartbeat_time: Some(Time(heartbeat_time)),
last_transition_time: Some(Time(
Utc::now() - chrono::Duration::seconds(3600),
)),
}]),
..Default::default()
}),
..Default::default()
}
}
fn make_stale_notready_node(name: &str, heartbeat_age_secs: i64) -> Node {
let heartbeat_time = Utc::now() - chrono::Duration::seconds(heartbeat_age_secs);
Node {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
status: Some(NodeStatus {
conditions: Some(vec![NodeCondition {
type_: "Ready".to_string(),
status: "False".to_string(),
reason: Some("NodeStatusUnknown".to_string()),
message: Some("Node heartbeat not received".to_string()),
last_heartbeat_time: Some(Time(heartbeat_time)),
last_transition_time: Some(Time(
Utc::now() - chrono::Duration::seconds(100),
)),
}]),
..Default::default()
}),
..Default::default()
}
}
/// Fresh heartbeat should result in no status change (check_node returns Ok
/// and does not attempt an API call)
#[tokio::test]
async fn test_fresh_heartbeat_is_noop() {
let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443"));
let config = NodeHealthCheckerConfig {
check_interval: Duration::from_secs(15),
heartbeat_timeout: Duration::from_secs(40),
};
let checker = NodeHealthChecker::new(api_client, config);
// 10 seconds ago — well within the 40s timeout
let node = make_node("fresh-node", "True", 10);
// Should succeed without making any API call (would fail since no server)
let result = checker.check_node("fresh-node", &node).await;
assert!(result.is_ok());
}
/// Stale heartbeat on a Ready node should attempt to mark it NotReady.
/// Since we have no real API server the update call will fail, proving the
/// checker detected the staleness and tried to act.
#[tokio::test]
async fn test_stale_heartbeat_triggers_update() {
let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443"));
let config = NodeHealthCheckerConfig {
check_interval: Duration::from_secs(15),
heartbeat_timeout: Duration::from_secs(40),
};
let checker = NodeHealthChecker::new(api_client, config);
// 60 seconds ago — exceeds the 40s timeout
let node = make_node("stale-node", "True", 60);
// Should fail because there's no real API server — but that proves it
// detected the staleness and attempted to update the node
let result = checker.check_node("stale-node", &node).await;
assert!(result.is_err());
}
/// A node already marked NotReady with reason "NodeStatusUnknown" should be
/// skipped (no redundant update)
#[tokio::test]
async fn test_already_notready_is_skipped() {
let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443"));
let config = NodeHealthCheckerConfig {
check_interval: Duration::from_secs(15),
heartbeat_timeout: Duration::from_secs(40),
};
let checker = NodeHealthChecker::new(api_client, config);
// 120 seconds stale but already marked by us
let node = make_stale_notready_node("dead-node", 120);
// Should return Ok without attempting an API call
let result = checker.check_node("dead-node", &node).await;
assert!(result.is_ok());
}
/// last_transition_time should be preserved when a node is already False
/// (but not yet with our reason)
#[tokio::test]
async fn test_transition_time_preserved_when_already_false() {
// Build a node that has status=False but with a different reason
let heartbeat_time = Utc::now() - chrono::Duration::seconds(60);
let original_transition_time = Utc::now() - chrono::Duration::seconds(300);
let node = Node {
metadata: ObjectMeta {
name: Some("failing-node".to_string()),
..Default::default()
},
status: Some(NodeStatus {
conditions: Some(vec![NodeCondition {
type_: "Ready".to_string(),
status: "False".to_string(),
reason: Some("SomeOtherReason".to_string()),
message: Some("something else".to_string()),
last_heartbeat_time: Some(Time(heartbeat_time)),
last_transition_time: Some(Time(original_transition_time)),
}]),
..Default::default()
}),
..Default::default()
};
let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443"));
let config = NodeHealthCheckerConfig {
check_interval: Duration::from_secs(15),
heartbeat_timeout: Duration::from_secs(40),
};
let checker = NodeHealthChecker::new(api_client, config);
// Will fail at the API call, but we can verify the logic by checking that
// the code path was entered (it didn't skip due to already-notready check)
let result = checker.check_node("failing-node", &node).await;
assert!(result.is_err()); // proves it tried to update (different reason)
}
}

View file

@ -47,7 +47,7 @@ impl ZoneState {
ZoneState::Installed => "Pending",
ZoneState::Ready => "Pending",
ZoneState::Running => "Running",
ZoneState::ShuttingDown => "Succeeded",
ZoneState::ShuttingDown => "Terminating",
ZoneState::Down => "Failed",
ZoneState::Absent => "Unknown",
}
@ -268,7 +268,7 @@ mod tests {
assert_eq!(ZoneState::Installed.to_pod_phase(), "Pending");
assert_eq!(ZoneState::Ready.to_pod_phase(), "Pending");
assert_eq!(ZoneState::Running.to_pod_phase(), "Running");
assert_eq!(ZoneState::ShuttingDown.to_pod_phase(), "Succeeded");
assert_eq!(ZoneState::ShuttingDown.to_pod_phase(), "Terminating");
assert_eq!(ZoneState::Down.to_pod_phase(), "Failed");
assert_eq!(ZoneState::Absent.to_pod_phase(), "Unknown");
}

View file

@ -2,8 +2,9 @@ use clap::{Parser, Subcommand};
use reddwarf_apiserver::{ApiError, ApiServer, AppState, Config as ApiConfig, TlsMode};
use reddwarf_core::Namespace;
use reddwarf_runtime::{
ApiClient, Ipam, MockRuntime, MockStorageEngine, NodeAgent, NodeAgentConfig, PodController,
PodControllerConfig, StorageEngine, StoragePoolConfig, ZoneBrand,
ApiClient, Ipam, MockRuntime, MockStorageEngine, NodeAgent, NodeAgentConfig,
NodeHealthChecker, NodeHealthCheckerConfig, PodController, PodControllerConfig, StorageEngine,
StoragePoolConfig, ZoneBrand,
};
use reddwarf_scheduler::scheduler::SchedulerConfig;
use reddwarf_scheduler::Scheduler;
@ -318,6 +319,7 @@ async fn run_agent(
default_brand: ZoneBrand::Reddwarf,
etherstub_name: etherstub_name.to_string(),
pod_cidr: pod_cidr.to_string(),
reconcile_interval: std::time::Duration::from_secs(30),
};
let controller = PodController::new(
@ -336,7 +338,7 @@ async fn run_agent(
// 6. Spawn node agent
let node_agent_config = NodeAgentConfig::new(node_name.to_string(), api_url);
let node_agent = NodeAgent::new(api_client, node_agent_config);
let node_agent = NodeAgent::new(api_client.clone(), node_agent_config);
let agent_token = token.clone();
let node_agent_handle = tokio::spawn(async move {
if let Err(e) = node_agent.run(agent_token).await {
@ -344,6 +346,15 @@ async fn run_agent(
}
});
// 7. Spawn node health checker
let health_checker = NodeHealthChecker::new(api_client, NodeHealthCheckerConfig::default());
let health_token = token.clone();
let health_handle = tokio::spawn(async move {
if let Err(e) = health_checker.run(health_token).await {
error!("Node health checker error: {}", e);
}
});
info!(
"All components started. API server on {}, node name: {}, pod CIDR: {}",
bind, node_name, pod_cidr
@ -362,6 +373,7 @@ async fn run_agent(
scheduler_handle,
controller_handle,
node_agent_handle,
health_handle,
);
})
.await;