From 58171c7555a163039fb8ea10f85fa9b45dd8e825 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sat, 14 Feb 2026 20:39:36 +0100 Subject: [PATCH] Add periodic reconciliation, node health checker, and graceful pod termination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three high-priority reliability features that close gaps identified in AUDIT.md: 1. Periodic reconciliation: PodController now runs reconcile_all() every 30s via a tokio::time::interval branch in the select! loop, detecting zone crashes between events. 2. Node health checker: New NodeHealthChecker polls node heartbeats every 15s and marks nodes with stale heartbeats (>40s) as NotReady with reason NodeStatusUnknown, preserving last_transition_time correctly. 3. Graceful pod termination: DELETE sets deletion_timestamp and phase=Terminating instead of immediate removal. Controller drives a state machine (shutdown → halt on grace expiry → deprovision → finalize) with periodic reconcile advancing it. New POST .../finalize endpoint performs actual storage removal. Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 1 + crates/reddwarf-apiserver/Cargo.toml | 1 + .../reddwarf-apiserver/src/handlers/pods.rs | 178 +++++++++ crates/reddwarf-apiserver/src/server.rs | 4 + crates/reddwarf-runtime/src/api_client.rs | 30 ++ crates/reddwarf-runtime/src/controller.rs | 340 ++++++++++++++++- crates/reddwarf-runtime/src/lib.rs | 2 + crates/reddwarf-runtime/src/node_health.rs | 352 ++++++++++++++++++ crates/reddwarf-runtime/src/types.rs | 4 +- crates/reddwarf/src/main.rs | 18 +- 10 files changed, 924 insertions(+), 6 deletions(-) create mode 100644 crates/reddwarf-runtime/src/node_health.rs diff --git a/Cargo.lock b/Cargo.lock index 0200451..c24f206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1488,6 +1488,7 @@ version = "0.1.0" dependencies = [ "axum", "axum-server", + "chrono", "futures-util", "hyper", "json-patch", diff --git a/crates/reddwarf-apiserver/Cargo.toml b/crates/reddwarf-apiserver/Cargo.toml index 7de069e..7b45baa 100644 --- a/crates/reddwarf-apiserver/Cargo.toml +++ b/crates/reddwarf-apiserver/Cargo.toml @@ -25,6 +25,7 @@ tracing-subscriber = { workspace = true } uuid = { workspace = true } tokio-stream = { workspace = true } futures-util = { workspace = true } +chrono = { workspace = true } json-patch = "3.0" rcgen = { workspace = true } rustls = { workspace = true } diff --git a/crates/reddwarf-apiserver/src/handlers/pods.rs b/crates/reddwarf-apiserver/src/handlers/pods.rs index 622df9c..7b970a3 100644 --- a/crates/reddwarf-apiserver/src/handlers/pods.rs +++ b/crates/reddwarf-apiserver/src/handlers/pods.rs @@ -14,6 +14,8 @@ use reddwarf_storage::KeyEncoder; use std::sync::Arc; use tracing::info; +const DEFAULT_TERMINATION_GRACE_PERIOD: i64 = 30; + /// GET /api/v1/namespaces/{namespace}/pods/{name} pub async fn get_pod( State(state): State>, @@ -94,12 +96,64 @@ pub async fn replace_pod( } /// DELETE /api/v1/namespaces/{namespace}/pods/{name} +/// +/// Initiates graceful termination: sets deletion_timestamp and phase=Terminating +/// instead of immediately removing the pod from storage. The controller will +/// drive the zone shutdown state machine and call finalize_pod() when cleanup +/// is complete. pub async fn delete_pod( State(state): State>, Path((namespace, name)): Path<(String, String)>, ) -> Result { info!("Deleting pod: {}/{}", namespace, name); + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + let key = ResourceKey::new(gvk, namespace.clone(), name.clone()); + + let mut pod: Pod = get_resource(&state, &key).await?; + + // Idempotent: if deletion_timestamp is already set, return current state + if pod.metadata.deletion_timestamp.is_some() { + info!( + "Pod {}/{} already has deletion_timestamp set, returning current state", + namespace, name + ); + return Ok(ApiResponse::ok(pod).into_response()); + } + + // Set deletion metadata + pod.metadata.deletion_timestamp = Some( + reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()), + ); + + // Grace period from spec, defaulting to 30s + let grace_period = pod + .spec + .as_ref() + .and_then(|s| s.termination_grace_period_seconds) + .unwrap_or(DEFAULT_TERMINATION_GRACE_PERIOD); + pod.metadata.deletion_grace_period_seconds = Some(grace_period); + + // Set phase to Terminating + let status = pod.status.get_or_insert_with(Default::default); + status.phase = Some("Terminating".to_string()); + + // Update resource — emits a MODIFIED event (not DELETED) + let updated = update_resource(&state, pod).await?; + + Ok(ApiResponse::ok(updated).into_response()) +} + +/// POST /api/v1/namespaces/{namespace}/pods/{name}/finalize +/// +/// Performs the actual storage removal of a pod. Called by the controller after +/// zone cleanup is complete. +pub async fn finalize_pod( + State(state): State>, + Path((namespace, name)): Path<(String, String)>, +) -> Result { + info!("Finalizing pod: {}/{}", namespace, name); + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); let key = ResourceKey::new(gvk, namespace, name.clone()); @@ -303,4 +357,128 @@ mod tests { assert!(matches!(event.event_type, WatchEventType::Modified)); assert_eq!(event.resource_key.name, "event-test"); } + + #[tokio::test] + async fn test_delete_pod_sets_deletion_timestamp() { + let state = setup_state().await; + + let pod = make_test_pod("graceful-pod", "default"); + create_resource(&*state, pod).await.unwrap(); + + // Simulate what delete_pod handler does: read, set deletion_timestamp, update + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + let key = ResourceKey::new(gvk.clone(), "default", "graceful-pod"); + + let mut pod: Pod = get_resource(&*state, &key).await.unwrap(); + assert!(pod.metadata.deletion_timestamp.is_none()); + + // Set deletion metadata + pod.metadata.deletion_timestamp = Some( + reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( + chrono::Utc::now(), + ), + ); + pod.metadata.deletion_grace_period_seconds = Some(30); + let status = pod.status.get_or_insert_with(Default::default); + status.phase = Some("Terminating".to_string()); + + let updated = update_resource(&*state, pod).await.unwrap(); + + // Pod should still exist in storage + let retrieved: Pod = get_resource(&*state, &key).await.unwrap(); + assert!(retrieved.metadata.deletion_timestamp.is_some()); + assert_eq!(retrieved.metadata.deletion_grace_period_seconds, Some(30)); + assert_eq!( + retrieved.status.as_ref().unwrap().phase.as_deref(), + Some("Terminating") + ); + + // Should have emitted a MODIFIED event (not DELETED) + assert!(updated.resource_version().is_some()); + } + + #[tokio::test] + async fn test_delete_pod_idempotent() { + let state = setup_state().await; + + let pod = make_test_pod("idem-pod", "default"); + create_resource(&*state, pod).await.unwrap(); + + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + let key = ResourceKey::new(gvk, "default", "idem-pod"); + + // First delete: set deletion_timestamp + let mut pod: Pod = get_resource(&*state, &key).await.unwrap(); + pod.metadata.deletion_timestamp = Some( + reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( + chrono::Utc::now(), + ), + ); + pod.metadata.deletion_grace_period_seconds = Some(30); + pod.status + .get_or_insert_with(Default::default) + .phase = Some("Terminating".to_string()); + update_resource(&*state, pod).await.unwrap(); + + // Second "delete" attempt should see deletion_timestamp is already set + let pod2: Pod = get_resource(&*state, &key).await.unwrap(); + assert!(pod2.metadata.deletion_timestamp.is_some()); + + // Pod should still exist in storage (not removed) + assert_eq!( + pod2.status.as_ref().unwrap().phase.as_deref(), + Some("Terminating") + ); + } + + #[tokio::test] + async fn test_finalize_removes_pod_from_storage() { + let state = setup_state().await; + + let pod = make_test_pod("finalize-pod", "default"); + create_resource(&*state, pod).await.unwrap(); + + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + let key = ResourceKey::new(gvk, "default", "finalize-pod"); + + // Pod should exist + let _: Pod = get_resource(&*state, &key).await.unwrap(); + + // Finalize (actual storage removal) + delete_resource(&state, &key).await.unwrap(); + + // Pod should be gone + let result: std::result::Result = get_resource(&*state, &key).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_delete_fires_modified_not_deleted_event() { + let state = setup_state().await; + + let pod = make_test_pod("event-del-pod", "default"); + create_resource(&*state, pod).await.unwrap(); + + // Subscribe to events + let mut rx = state.subscribe(); + + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + let key = ResourceKey::new(gvk, "default", "event-del-pod"); + + let mut pod: Pod = get_resource(&*state, &key).await.unwrap(); + pod.metadata.deletion_timestamp = Some( + reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( + chrono::Utc::now(), + ), + ); + pod.status + .get_or_insert_with(Default::default) + .phase = Some("Terminating".to_string()); + update_resource(&*state, pod).await.unwrap(); + + // Should get a MODIFIED event + let event = rx.recv().await.unwrap(); + assert!(matches!(event.event_type, WatchEventType::Modified)); + assert_eq!(event.resource_key.name, "event-del-pod"); + } } diff --git a/crates/reddwarf-apiserver/src/server.rs b/crates/reddwarf-apiserver/src/server.rs index 1dd437e..18c76a1 100644 --- a/crates/reddwarf-apiserver/src/server.rs +++ b/crates/reddwarf-apiserver/src/server.rs @@ -72,6 +72,10 @@ impl ApiServer { "/api/v1/namespaces/{namespace}/pods/{name}/status", axum::routing::put(update_pod_status), ) + .route( + "/api/v1/namespaces/{namespace}/pods/{name}/finalize", + axum::routing::post(finalize_pod), + ) .route("/api/v1/pods", get(list_pods)) // Nodes .route("/api/v1/nodes", get(list_nodes).post(create_node)) diff --git a/crates/reddwarf-runtime/src/api_client.rs b/crates/reddwarf-runtime/src/api_client.rs index f185aa9..02d50ba 100644 --- a/crates/reddwarf-runtime/src/api_client.rs +++ b/crates/reddwarf-runtime/src/api_client.rs @@ -227,6 +227,36 @@ impl ApiClient { .map_err(|e| RuntimeError::internal_error(format!("Failed to parse node: {}", e))) } + /// POST /api/v1/namespaces/{namespace}/pods/{name}/finalize + /// + /// Called by the controller after zone cleanup is complete to remove the pod + /// from API server storage. + pub async fn finalize_pod(&self, namespace: &str, name: &str) -> Result<()> { + let url = format!( + "{}/api/v1/namespaces/{}/pods/{}/finalize", + self.base_url, namespace, name + ); + debug!("POST {}", url); + + let resp = self + .client + .post(&url) + .send() + .await + .map_err(|e| RuntimeError::internal_error(format!("HTTP request failed: {}", e)))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(RuntimeError::internal_error(format!( + "POST finalize pod failed with status {}: {}", + status, body + ))); + } + + Ok(()) + } + pub fn base_url(&self) -> &str { &self.base_url } diff --git a/crates/reddwarf-runtime/src/controller.rs b/crates/reddwarf-runtime/src/controller.rs index 3df1e33..6944f1c 100644 --- a/crates/reddwarf-runtime/src/controller.rs +++ b/crates/reddwarf-runtime/src/controller.rs @@ -3,9 +3,11 @@ use crate::error::{Result, RuntimeError}; use crate::network::{vnic_name_for_pod, Ipam}; use crate::traits::ZoneRuntime; use crate::types::*; +use chrono::Utc; use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus}; use reddwarf_core::{ResourceEvent, ResourceQuantities, WatchEventType}; use std::sync::Arc; +use std::time::Duration; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -25,6 +27,8 @@ pub struct PodControllerConfig { pub etherstub_name: String, /// Pod CIDR (e.g., "10.88.0.0/16") pub pod_cidr: String, + /// Interval between periodic full reconciliation cycles + pub reconcile_interval: Duration, } /// Pod controller that watches for Pod events and drives zone lifecycle @@ -69,6 +73,9 @@ impl PodController { } let mut rx = self.event_tx.subscribe(); + let mut reconcile_tick = tokio::time::interval(self.config.reconcile_interval); + // Consume the first tick — we just did reconcile_all() above + reconcile_tick.tick().await; loop { tokio::select! { @@ -76,6 +83,12 @@ impl PodController { info!("Pod controller shutting down"); return Ok(()); } + _ = reconcile_tick.tick() => { + debug!("Periodic reconcile tick"); + if let Err(e) = self.reconcile_all().await { + error!("Periodic reconcile failed: {}", e); + } + } result = rx.recv() => { match result { Ok(event) => { @@ -185,6 +198,11 @@ impl PodController { return Ok(()); } + // If the pod has a deletion_timestamp, drive the termination state machine + if pod.metadata.deletion_timestamp.is_some() { + return self.handle_termination(pod).await; + } + // Check current phase let phase = pod .status @@ -321,7 +339,12 @@ impl PodController { Ok(()) } - /// Handle pod deletion — deprovision the zone and release IP + /// Handle pod deletion — deprovision the zone and release IP. + /// + /// If the pod has a `deletion_timestamp`, the graceful termination state + /// machine (`handle_termination`) is responsible for cleanup, so this method + /// becomes a no-op. Otherwise (e.g. a direct storage delete that bypasses + /// the graceful path), fall back to the original immediate cleanup. pub async fn handle_delete(&self, pod: &Pod) -> Result<()> { let pod_name = pod .metadata @@ -330,6 +353,15 @@ impl PodController { .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); + // If deletion_timestamp is set, handle_termination is driving cleanup + if pod.metadata.deletion_timestamp.is_some() { + debug!( + "Pod {}/{} has deletion_timestamp, skipping handle_delete (handled by termination state machine)", + namespace, pod_name + ); + return Ok(()); + } + // Only deprovision pods assigned to this node if let Some(spec) = &pod.spec { if let Some(node_name) = &spec.node_name { @@ -365,6 +397,142 @@ impl PodController { Ok(()) } + /// Drive the graceful termination state machine for a pod with + /// `deletion_timestamp` set. + /// + /// | Zone State | Grace Expired? | Action | + /// |-----------------|----------------|--------------------------------------------| + /// | Running | No | shutdown_zone() (graceful) | + /// | Running | Yes | halt_zone() (force kill) | + /// | ShuttingDown | No | Wait (next reconcile will re-check) | + /// | ShuttingDown | Yes | halt_zone() (force kill) | + /// | Stopped/Absent | — | deprovision(), release IP, finalize_pod() | + async fn handle_termination(&self, pod: &Pod) -> Result<()> { + let pod_name = pod + .metadata + .name + .as_deref() + .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; + let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); + let zone_name = pod_zone_name(namespace, pod_name); + + let grace_expired = self.is_grace_period_expired(pod); + + // Query actual zone state + let zone_state = match self.runtime.get_zone_state(&zone_name).await { + Ok(state) => state, + Err(RuntimeError::ZoneNotFound { .. }) => ZoneState::Absent, + Err(e) => { + warn!( + "Could not check zone state for {}: {} — treating as absent", + zone_name, e + ); + ZoneState::Absent + } + }; + + debug!( + "Termination state machine for pod {}/{}: zone={}, grace_expired={}", + namespace, pod_name, zone_state, grace_expired + ); + + match zone_state { + ZoneState::Running => { + if grace_expired { + warn!( + "Grace period expired for pod {}/{}, force halting zone {}", + namespace, pod_name, zone_name + ); + if let Err(e) = self.runtime.halt_zone(&zone_name).await { + warn!("Failed to halt zone {}: {}", zone_name, e); + } + // Deprovision will happen on next reconcile when zone is stopped + } else { + info!( + "Initiating graceful shutdown for zone {} (pod {}/{})", + zone_name, namespace, pod_name + ); + if let Err(e) = self.runtime.shutdown_zone(&zone_name).await { + warn!("Failed to shut down zone {}: {}", zone_name, e); + } + // Next reconcile will re-check the zone state + } + } + ZoneState::ShuttingDown => { + if grace_expired { + warn!( + "Grace period expired while zone {} was shutting down, force halting", + zone_name + ); + if let Err(e) = self.runtime.halt_zone(&zone_name).await { + warn!("Failed to halt zone {}: {}", zone_name, e); + } + } else { + debug!( + "Zone {} is gracefully shutting down, waiting for next reconcile", + zone_name + ); + } + } + // Zone is stopped or absent — full cleanup + _ => { + info!( + "Zone {} is stopped/absent, cleaning up pod {}/{}", + zone_name, namespace, pod_name + ); + + // Try to deprovision (cleans up datasets, network, etc.) + // Build a minimal zone config for deprovision — use existing IP if allocated + let zone_config_result = self.pod_to_zone_config(pod); + if let Ok(zone_config) = zone_config_result { + if let Err(e) = self.runtime.deprovision(&zone_config).await { + // Zone may already be fully cleaned up — that's fine + debug!( + "Deprovision for zone {} returned error (may be expected): {}", + zone_name, e + ); + } + } + + // Release the IP allocation + if let Err(e) = self.ipam.release(namespace, pod_name) { + debug!( + "Failed to release IP for pod {}/{}: {} (may already be released)", + namespace, pod_name, e + ); + } + + // Finalize — remove the pod from API server storage + if let Err(e) = self.api_client.finalize_pod(namespace, pod_name).await { + error!( + "Failed to finalize pod {}/{}: {}", + namespace, pod_name, e + ); + } else { + info!("Pod {}/{} finalized and removed", namespace, pod_name); + } + } + } + + Ok(()) + } + + /// Check whether the pod's grace period has expired + fn is_grace_period_expired(&self, pod: &Pod) -> bool { + let deletion_ts = match &pod.metadata.deletion_timestamp { + Some(t) => t.0, + None => return false, + }; + + let grace_secs = pod + .metadata + .deletion_grace_period_seconds + .unwrap_or(30); + + let deadline = deletion_ts + chrono::Duration::seconds(grace_secs); + Utc::now() >= deadline + } + /// Convert a Pod spec to a ZoneConfig with per-pod VNIC and IP fn pod_to_zone_config(&self, pod: &Pod) -> Result { let pod_name = pod @@ -510,6 +678,7 @@ mod tests { use k8s_openapi::api::core::v1::{Container, PodSpec}; use reddwarf_storage::RedbBackend; use std::net::Ipv4Addr; + use std::time::Duration; use tempfile::tempdir; fn make_test_controller() -> (PodController, tempfile::TempDir) { @@ -532,6 +701,7 @@ mod tests { default_brand: ZoneBrand::Reddwarf, etherstub_name: "reddwarf0".to_string(), pod_cidr: "10.88.0.0/16".to_string(), + reconcile_interval: Duration::from_secs(30), }; let controller = PodController::new(runtime, api_client, event_tx, config, ipam); @@ -803,4 +973,172 @@ mod tests { assert_eq!(zone_config.cpu_cap, None); assert_eq!(zone_config.memory_cap, None); } + + #[test] + fn test_grace_period_not_expired() { + let (controller, _dir) = make_test_controller(); + + let mut pod = Pod::default(); + pod.metadata.name = Some("grace-pod".to_string()); + pod.metadata.deletion_timestamp = Some( + k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), + ); + pod.metadata.deletion_grace_period_seconds = Some(30); + + assert!(!controller.is_grace_period_expired(&pod)); + } + + #[test] + fn test_grace_period_expired() { + let (controller, _dir) = make_test_controller(); + + let mut pod = Pod::default(); + pod.metadata.name = Some("expired-pod".to_string()); + // Set deletion_timestamp 60 seconds in the past + pod.metadata.deletion_timestamp = Some( + k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( + Utc::now() - chrono::Duration::seconds(60), + ), + ); + pod.metadata.deletion_grace_period_seconds = Some(30); + + assert!(controller.is_grace_period_expired(&pod)); + } + + #[test] + fn test_grace_period_no_deletion_timestamp() { + let (controller, _dir) = make_test_controller(); + + let pod = Pod::default(); + assert!(!controller.is_grace_period_expired(&pod)); + } + + #[tokio::test] + async fn test_handle_termination_absent_zone_calls_finalize() { + let (controller, _dir) = make_test_controller(); + + // Build a pod with deletion_timestamp that is assigned to our node + let mut pod = Pod::default(); + pod.metadata.name = Some("term-pod".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.metadata.deletion_timestamp = Some( + k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), + ); + pod.metadata.deletion_grace_period_seconds = Some(30); + pod.spec = Some(PodSpec { + node_name: Some("node1".to_string()), + containers: vec![Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + ..Default::default() + }], + ..Default::default() + }); + + // Zone doesn't exist in the mock runtime → should be treated as Absent + // handle_termination will try to deprovision (fail, that's ok) and then + // finalize (will fail since no API server, but the path is exercised) + let result = controller.handle_termination(&pod).await; + // The function should complete Ok (finalize failure is logged, not returned) + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_handle_termination_running_zone_graceful_shutdown() { + let (controller, _dir) = make_test_controller(); + + // First, provision a zone so it's Running + let mut pod = Pod::default(); + pod.metadata.name = Some("running-term".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.spec = Some(PodSpec { + node_name: Some("node1".to_string()), + containers: vec![Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + ..Default::default() + }], + ..Default::default() + }); + + // Provision the zone through the runtime + let zone_config = controller.pod_to_zone_config(&pod).unwrap(); + controller.runtime.provision(&zone_config).await.unwrap(); + + // Verify it's running + let zone_name = pod_zone_name("default", "running-term"); + let state = controller.runtime.get_zone_state(&zone_name).await.unwrap(); + assert_eq!(state, ZoneState::Running); + + // Set deletion_timestamp (grace period NOT expired) + pod.metadata.deletion_timestamp = Some( + k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), + ); + pod.metadata.deletion_grace_period_seconds = Some(30); + + // Handle termination — should call shutdown_zone + let result = controller.handle_termination(&pod).await; + assert!(result.is_ok()); + + // After shutdown, MockRuntime transitions zone to Installed + let state = controller.runtime.get_zone_state(&zone_name).await.unwrap(); + assert_eq!(state, ZoneState::Installed); + } + + #[tokio::test] + async fn test_handle_delete_skips_when_deletion_timestamp_set() { + let (controller, _dir) = make_test_controller(); + + let mut pod = Pod::default(); + pod.metadata.name = Some("skip-pod".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.metadata.deletion_timestamp = Some( + k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), + ); + pod.spec = Some(PodSpec { + node_name: Some("node1".to_string()), + containers: vec![Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + ..Default::default() + }], + ..Default::default() + }); + + // handle_delete should return Ok immediately — skipping deprovision + let result = controller.handle_delete(&pod).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_reconcile_with_deletion_timestamp_uses_termination() { + let (controller, _dir) = make_test_controller(); + + // Build a pod with deletion_timestamp, assigned to our node, phase Terminating + let mut pod = Pod::default(); + pod.metadata.name = Some("recon-term".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.metadata.deletion_timestamp = Some( + k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), + ); + pod.metadata.deletion_grace_period_seconds = Some(30); + pod.spec = Some(PodSpec { + node_name: Some("node1".to_string()), + containers: vec![Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + ..Default::default() + }], + ..Default::default() + }); + pod.status = Some(PodStatus { + phase: Some("Terminating".to_string()), + ..Default::default() + }); + + // reconcile() should detect deletion_timestamp and call handle_termination + // Zone is absent → will try to finalize (will fail, but logged) + let result = controller.reconcile(&pod).await; + assert!(result.is_ok()); + } } diff --git a/crates/reddwarf-runtime/src/lib.rs b/crates/reddwarf-runtime/src/lib.rs index 84680f4..dfb691a 100644 --- a/crates/reddwarf-runtime/src/lib.rs +++ b/crates/reddwarf-runtime/src/lib.rs @@ -11,6 +11,7 @@ pub mod illumos; pub mod mock; pub mod network; pub mod node_agent; +pub mod node_health; pub mod storage; pub mod traits; pub mod types; @@ -35,6 +36,7 @@ pub use storage::{MockStorageEngine, StorageEngine, VolumeInfo}; pub use api_client::ApiClient; pub use controller::{PodController, PodControllerConfig}; pub use node_agent::{NodeAgent, NodeAgentConfig}; +pub use node_health::{NodeHealthChecker, NodeHealthCheckerConfig}; // Conditionally re-export illumos runtime #[cfg(target_os = "illumos")] diff --git a/crates/reddwarf-runtime/src/node_health.rs b/crates/reddwarf-runtime/src/node_health.rs new file mode 100644 index 0000000..18bc1cc --- /dev/null +++ b/crates/reddwarf-runtime/src/node_health.rs @@ -0,0 +1,352 @@ +use crate::api_client::ApiClient; +use crate::error::Result; +use chrono::Utc; +use k8s_openapi::api::core::v1::{Node, NodeCondition}; +use std::sync::Arc; +use std::time::Duration; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +/// Configuration for the node health checker +#[derive(Debug, Clone)] +pub struct NodeHealthCheckerConfig { + /// Interval between health checks + pub check_interval: Duration, + /// How long since the last heartbeat before a node is considered stale + pub heartbeat_timeout: Duration, +} + +impl Default for NodeHealthCheckerConfig { + fn default() -> Self { + Self { + check_interval: Duration::from_secs(15), + // 4x the default heartbeat interval (10s) = 40s + heartbeat_timeout: Duration::from_secs(40), + } + } +} + +/// Periodically checks node heartbeats and marks stale nodes as NotReady +pub struct NodeHealthChecker { + api_client: Arc, + config: NodeHealthCheckerConfig, +} + +impl NodeHealthChecker { + pub fn new(api_client: Arc, config: NodeHealthCheckerConfig) -> Self { + Self { api_client, config } + } + + /// Run the health checker loop + pub async fn run(&self, token: CancellationToken) -> Result<()> { + info!( + "Starting node health checker (interval: {:?}, timeout: {:?})", + self.config.check_interval, self.config.heartbeat_timeout + ); + + let mut interval = tokio::time::interval(self.config.check_interval); + // Consume the first immediate tick — nodes just registered + interval.tick().await; + + loop { + tokio::select! { + _ = token.cancelled() => { + info!("Node health checker shutting down"); + return Ok(()); + } + _ = interval.tick() => { + if let Err(e) = self.check_all_nodes().await { + error!("Node health check failed: {}", e); + } + } + } + } + } + + /// Check all nodes for stale heartbeats + async fn check_all_nodes(&self) -> Result<()> { + debug!("Running node health check"); + + let body = self.api_client.get_json("/api/v1/nodes").await?; + let items = body["items"].as_array().cloned().unwrap_or_default(); + + for item in items { + let node: Node = match serde_json::from_value(item) { + Ok(n) => n, + Err(e) => { + warn!("Failed to parse node from list: {}", e); + continue; + } + }; + + let node_name = match node.metadata.name.as_deref() { + Some(n) => n, + None => continue, + }; + + if let Err(e) = self.check_node(node_name, &node).await { + warn!("Failed to check node {}: {}", node_name, e); + } + } + + Ok(()) + } + + /// Check a single node's heartbeat and mark it NotReady if stale + async fn check_node(&self, node_name: &str, node: &Node) -> Result<()> { + let conditions = match node + .status + .as_ref() + .and_then(|s| s.conditions.as_ref()) + { + Some(c) => c, + None => { + debug!("Node {} has no conditions, skipping", node_name); + return Ok(()); + } + }; + + let ready_condition = match conditions.iter().find(|c| c.type_ == "Ready") { + Some(c) => c, + None => { + debug!("Node {} has no Ready condition, skipping", node_name); + return Ok(()); + } + }; + + // Skip if already marked NotReady by us (avoid re-updating) + if ready_condition.status == "False" + && ready_condition.reason.as_deref() == Some("NodeStatusUnknown") + { + debug!( + "Node {} already marked NotReady by health checker, skipping", + node_name + ); + return Ok(()); + } + + // Check heartbeat staleness + let last_heartbeat = match &ready_condition.last_heartbeat_time { + Some(t) => t.0, + None => { + debug!("Node {} has no last_heartbeat_time, skipping", node_name); + return Ok(()); + } + }; + + let elapsed = Utc::now() - last_heartbeat; + let timeout = chrono::Duration::from_std(self.config.heartbeat_timeout) + .unwrap_or(chrono::Duration::seconds(40)); + + if elapsed <= timeout { + debug!( + "Node {} heartbeat is fresh ({}s ago)", + node_name, + elapsed.num_seconds() + ); + return Ok(()); + } + + // Node is stale — mark it NotReady + warn!( + "Node {} heartbeat is stale ({}s ago, timeout {}s) — marking NotReady", + node_name, + elapsed.num_seconds(), + timeout.num_seconds() + ); + + let mut updated_node = node.clone(); + + // Preserve last_transition_time if the status was already False, + // otherwise set it to now + let last_transition_time = if ready_condition.status == "False" { + ready_condition.last_transition_time.clone() + } else { + Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( + Utc::now(), + )) + }; + + let new_condition = NodeCondition { + type_: "Ready".to_string(), + status: "False".to_string(), + reason: Some("NodeStatusUnknown".to_string()), + message: Some(format!( + "Node heartbeat not received for {}s (timeout: {}s)", + elapsed.num_seconds(), + timeout.num_seconds() + )), + last_heartbeat_time: ready_condition.last_heartbeat_time.clone(), + last_transition_time, + }; + + // Replace the Ready condition in the node status + if let Some(status) = updated_node.status.as_mut() { + if let Some(conditions) = status.conditions.as_mut() { + if let Some(ready) = conditions.iter_mut().find(|c| c.type_ == "Ready") { + *ready = new_condition; + } + } + } + + self.api_client + .update_node_status(node_name, &updated_node) + .await?; + + info!("Node {} marked as NotReady (stale heartbeat)", node_name); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use k8s_openapi::api::core::v1::{NodeCondition, NodeStatus}; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, Time}; + + fn make_node(name: &str, ready_status: &str, heartbeat_age_secs: i64) -> Node { + let heartbeat_time = Utc::now() - chrono::Duration::seconds(heartbeat_age_secs); + Node { + metadata: ObjectMeta { + name: Some(name.to_string()), + ..Default::default() + }, + status: Some(NodeStatus { + conditions: Some(vec![NodeCondition { + type_: "Ready".to_string(), + status: ready_status.to_string(), + reason: Some("KubeletReady".to_string()), + message: Some("node agent is healthy".to_string()), + last_heartbeat_time: Some(Time(heartbeat_time)), + last_transition_time: Some(Time( + Utc::now() - chrono::Duration::seconds(3600), + )), + }]), + ..Default::default() + }), + ..Default::default() + } + } + + fn make_stale_notready_node(name: &str, heartbeat_age_secs: i64) -> Node { + let heartbeat_time = Utc::now() - chrono::Duration::seconds(heartbeat_age_secs); + Node { + metadata: ObjectMeta { + name: Some(name.to_string()), + ..Default::default() + }, + status: Some(NodeStatus { + conditions: Some(vec![NodeCondition { + type_: "Ready".to_string(), + status: "False".to_string(), + reason: Some("NodeStatusUnknown".to_string()), + message: Some("Node heartbeat not received".to_string()), + last_heartbeat_time: Some(Time(heartbeat_time)), + last_transition_time: Some(Time( + Utc::now() - chrono::Duration::seconds(100), + )), + }]), + ..Default::default() + }), + ..Default::default() + } + } + + /// Fresh heartbeat should result in no status change (check_node returns Ok + /// and does not attempt an API call) + #[tokio::test] + async fn test_fresh_heartbeat_is_noop() { + let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); + let config = NodeHealthCheckerConfig { + check_interval: Duration::from_secs(15), + heartbeat_timeout: Duration::from_secs(40), + }; + let checker = NodeHealthChecker::new(api_client, config); + + // 10 seconds ago — well within the 40s timeout + let node = make_node("fresh-node", "True", 10); + + // Should succeed without making any API call (would fail since no server) + let result = checker.check_node("fresh-node", &node).await; + assert!(result.is_ok()); + } + + /// Stale heartbeat on a Ready node should attempt to mark it NotReady. + /// Since we have no real API server the update call will fail, proving the + /// checker detected the staleness and tried to act. + #[tokio::test] + async fn test_stale_heartbeat_triggers_update() { + let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); + let config = NodeHealthCheckerConfig { + check_interval: Duration::from_secs(15), + heartbeat_timeout: Duration::from_secs(40), + }; + let checker = NodeHealthChecker::new(api_client, config); + + // 60 seconds ago — exceeds the 40s timeout + let node = make_node("stale-node", "True", 60); + + // Should fail because there's no real API server — but that proves it + // detected the staleness and attempted to update the node + let result = checker.check_node("stale-node", &node).await; + assert!(result.is_err()); + } + + /// A node already marked NotReady with reason "NodeStatusUnknown" should be + /// skipped (no redundant update) + #[tokio::test] + async fn test_already_notready_is_skipped() { + let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); + let config = NodeHealthCheckerConfig { + check_interval: Duration::from_secs(15), + heartbeat_timeout: Duration::from_secs(40), + }; + let checker = NodeHealthChecker::new(api_client, config); + + // 120 seconds stale but already marked by us + let node = make_stale_notready_node("dead-node", 120); + + // Should return Ok without attempting an API call + let result = checker.check_node("dead-node", &node).await; + assert!(result.is_ok()); + } + + /// last_transition_time should be preserved when a node is already False + /// (but not yet with our reason) + #[tokio::test] + async fn test_transition_time_preserved_when_already_false() { + // Build a node that has status=False but with a different reason + let heartbeat_time = Utc::now() - chrono::Duration::seconds(60); + let original_transition_time = Utc::now() - chrono::Duration::seconds(300); + let node = Node { + metadata: ObjectMeta { + name: Some("failing-node".to_string()), + ..Default::default() + }, + status: Some(NodeStatus { + conditions: Some(vec![NodeCondition { + type_: "Ready".to_string(), + status: "False".to_string(), + reason: Some("SomeOtherReason".to_string()), + message: Some("something else".to_string()), + last_heartbeat_time: Some(Time(heartbeat_time)), + last_transition_time: Some(Time(original_transition_time)), + }]), + ..Default::default() + }), + ..Default::default() + }; + + let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); + let config = NodeHealthCheckerConfig { + check_interval: Duration::from_secs(15), + heartbeat_timeout: Duration::from_secs(40), + }; + let checker = NodeHealthChecker::new(api_client, config); + + // Will fail at the API call, but we can verify the logic by checking that + // the code path was entered (it didn't skip due to already-notready check) + let result = checker.check_node("failing-node", &node).await; + assert!(result.is_err()); // proves it tried to update (different reason) + } +} diff --git a/crates/reddwarf-runtime/src/types.rs b/crates/reddwarf-runtime/src/types.rs index 45e54c8..02c2513 100644 --- a/crates/reddwarf-runtime/src/types.rs +++ b/crates/reddwarf-runtime/src/types.rs @@ -47,7 +47,7 @@ impl ZoneState { ZoneState::Installed => "Pending", ZoneState::Ready => "Pending", ZoneState::Running => "Running", - ZoneState::ShuttingDown => "Succeeded", + ZoneState::ShuttingDown => "Terminating", ZoneState::Down => "Failed", ZoneState::Absent => "Unknown", } @@ -268,7 +268,7 @@ mod tests { assert_eq!(ZoneState::Installed.to_pod_phase(), "Pending"); assert_eq!(ZoneState::Ready.to_pod_phase(), "Pending"); assert_eq!(ZoneState::Running.to_pod_phase(), "Running"); - assert_eq!(ZoneState::ShuttingDown.to_pod_phase(), "Succeeded"); + assert_eq!(ZoneState::ShuttingDown.to_pod_phase(), "Terminating"); assert_eq!(ZoneState::Down.to_pod_phase(), "Failed"); assert_eq!(ZoneState::Absent.to_pod_phase(), "Unknown"); } diff --git a/crates/reddwarf/src/main.rs b/crates/reddwarf/src/main.rs index 5e1e60c..e2248b0 100644 --- a/crates/reddwarf/src/main.rs +++ b/crates/reddwarf/src/main.rs @@ -2,8 +2,9 @@ use clap::{Parser, Subcommand}; use reddwarf_apiserver::{ApiError, ApiServer, AppState, Config as ApiConfig, TlsMode}; use reddwarf_core::Namespace; use reddwarf_runtime::{ - ApiClient, Ipam, MockRuntime, MockStorageEngine, NodeAgent, NodeAgentConfig, PodController, - PodControllerConfig, StorageEngine, StoragePoolConfig, ZoneBrand, + ApiClient, Ipam, MockRuntime, MockStorageEngine, NodeAgent, NodeAgentConfig, + NodeHealthChecker, NodeHealthCheckerConfig, PodController, PodControllerConfig, StorageEngine, + StoragePoolConfig, ZoneBrand, }; use reddwarf_scheduler::scheduler::SchedulerConfig; use reddwarf_scheduler::Scheduler; @@ -318,6 +319,7 @@ async fn run_agent( default_brand: ZoneBrand::Reddwarf, etherstub_name: etherstub_name.to_string(), pod_cidr: pod_cidr.to_string(), + reconcile_interval: std::time::Duration::from_secs(30), }; let controller = PodController::new( @@ -336,7 +338,7 @@ async fn run_agent( // 6. Spawn node agent let node_agent_config = NodeAgentConfig::new(node_name.to_string(), api_url); - let node_agent = NodeAgent::new(api_client, node_agent_config); + let node_agent = NodeAgent::new(api_client.clone(), node_agent_config); let agent_token = token.clone(); let node_agent_handle = tokio::spawn(async move { if let Err(e) = node_agent.run(agent_token).await { @@ -344,6 +346,15 @@ async fn run_agent( } }); + // 7. Spawn node health checker + let health_checker = NodeHealthChecker::new(api_client, NodeHealthCheckerConfig::default()); + let health_token = token.clone(); + let health_handle = tokio::spawn(async move { + if let Err(e) = health_checker.run(health_token).await { + error!("Node health checker error: {}", e); + } + }); + info!( "All components started. API server on {}, node name: {}, pod CIDR: {}", bind, node_name, pod_cidr @@ -362,6 +373,7 @@ async fn run_agent( scheduler_handle, controller_handle, node_agent_handle, + health_handle, ); }) .await;