From d79f8ce01116d3b35997d5ed3764618182b6b5f5 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sat, 14 Feb 2026 22:41:30 +0100 Subject: [PATCH] Add health probes (liveness/readiness/startup) with exec, HTTP, and TCP checks Implement Kubernetes-style health probes that run during the reconcile loop to detect unhealthy applications inside running zones. Previously the pod controller only checked zone liveness via get_zone_state(), missing cases where the zone is running but the application inside has crashed. - Add exec_in_zone() to ZoneRuntime trait, implemented via zlogin on illumos and with configurable mock results for testing - Add probe type system (ProbeKind, ProbeAction, ContainerProbeConfig) that decouples from k8s_openapi and extracts probes from pod container specs with proper k8s defaults (period=10s, timeout=1s, failure=3, success=1) - Add ProbeExecutor for exec/HTTP/TCP checks with tokio timeout support (HTTPS falls back to TCP-only with warning) - Add ProbeTracker state machine that tracks per-pod/container/probe-kind state, respects initial delays and periods, gates liveness on startup probes, and aggregates results into PodProbeStatus - Integrate into PodController reconcile loop: on liveness failure set phase=Failed with reason LivenessProbeFailure; on readiness failure set Ready=False; on all-pass restore Ready=True - Add ProbeFailed error variant with miette diagnostic Known v1 limitation: probes execute at reconcile cadence (~30s), not at their configured periodSeconds. Co-Authored-By: Claude Opus 4.6 --- AUDIT.md | 99 ++++ crates/reddwarf-runtime/src/controller.rs | 320 ++++++++++- crates/reddwarf-runtime/src/error.rs | 35 ++ crates/reddwarf-runtime/src/illumos.rs | 9 +- crates/reddwarf-runtime/src/lib.rs | 2 + crates/reddwarf-runtime/src/mock.rs | 120 +++- .../reddwarf-runtime/src/probes/executor.rs | 358 ++++++++++++ crates/reddwarf-runtime/src/probes/mod.rs | 9 + crates/reddwarf-runtime/src/probes/tracker.rs | 515 ++++++++++++++++++ crates/reddwarf-runtime/src/probes/types.rs | 284 ++++++++++ crates/reddwarf-runtime/src/traits.rs | 13 + 11 files changed, 1759 insertions(+), 5 deletions(-) create mode 100644 AUDIT.md create mode 100644 crates/reddwarf-runtime/src/probes/executor.rs create mode 100644 crates/reddwarf-runtime/src/probes/mod.rs create mode 100644 crates/reddwarf-runtime/src/probes/tracker.rs create mode 100644 crates/reddwarf-runtime/src/probes/types.rs diff --git a/AUDIT.md b/AUDIT.md new file mode 100644 index 0000000..88c916f --- /dev/null +++ b/AUDIT.md @@ -0,0 +1,99 @@ +# Reddwarf Production Readiness Audit + +**Last updated:** 2026-02-14 +**Baseline commit:** `58171c7` (Add periodic reconciliation, node health checker, and graceful pod termination) + +--- + +## 1. Zone Runtime (`reddwarf-runtime`) + +| Requirement | Status | Notes | +|---|---|---| +| Pod spec to zonecfg | DONE | `zone/config.rs`, `controller.rs:pod_to_zone_config()` | +| Zone lifecycle (zoneadm) | DONE | `illumos.rs` — create, install, boot, halt, uninstall, delete | +| Container to Zone mapping | DONE | Naming, sanitization, 64-char truncation | +| CPU limits to capped-cpu | DONE | Aggregates across containers, limits preferred over requests | +| Memory limits to capped-memory | DONE | Aggregates across containers, illumos G/M/K suffixes | +| Network to Crossbow VNIC | DONE | `dladm create-etherstub`, `create-vnic`, per-pod VNIC+IP | +| Volumes to ZFS datasets | DONE | Create, destroy, clone, quota, snapshot support | +| Image pull / clone | PARTIAL | ZFS clone works; LX tarball `-s` works. Missing: no image pull/registry, no `.zar` archive, no golden image bootstrap | +| Health probes (zlogin) | DONE | exec-in-zone via `zlogin`, liveness/readiness/startup probes with exec/HTTP/TCP actions, probe tracker state machine integrated into reconcile loop. v1 limitation: probes run at reconcile cadence, not per-probe `periodSeconds` | + +## 2. Reconciliation / Controller Loop + +| Requirement | Status | Notes | +|---|---|---| +| Event bus / watch | DONE | tokio broadcast channel, SSE watch API, multi-subscriber | +| Pod controller | DONE | Event-driven + full reconcile on lag, provision/deprovision | +| Node controller (NotReady) | DONE | `node_health.rs` — checks every 15s, marks stale (>40s) nodes NotReady with reason NodeStatusUnknown | +| Continuous reconciliation | DONE | `controller.rs` — periodic `reconcile_all()` every 30s via `tokio::time::interval` in select! loop | +| Graceful termination | DONE | DELETE sets `deletion_timestamp` + phase=Terminating; controller drives shutdown state machine; POST `.../finalize` for actual removal | + +## 3. Pod Status Tracking + +| Requirement | Status | Notes | +|---|---|---| +| Zone state to pod phase | DONE | 8 zone states mapped to pod phases | +| Status subresource (`/status`) | DONE | PUT endpoint, spec/status separation, fires MODIFIED events | +| ShuttingDown mapping | DONE | Fixed in `58171c7` — maps to "Terminating" | + +## 4. Node Agent / Heartbeat + +| Requirement | Status | Notes | +|---|---|---| +| Self-registration | DONE | Creates Node resource with allocatable CPU/memory | +| Periodic heartbeat | DONE | 10-second interval, Ready condition | +| Report zone states | NOT DONE | Heartbeat doesn't query actual zone states | +| Dynamic resource reporting | DONE | `sysinfo.rs` — detects CPU/memory via `sys-info`, capacity vs allocatable split with configurable reservations (`--system-reserved-cpu`, `--system-reserved-memory`, `--max-pods`). Done in `d3eb0b2` | + +## 5. Main Binary + +| Requirement | Status | Notes | +|---|---|---| +| API + scheduler + runtime wired | DONE | All 4 components spawned as tokio tasks | +| CLI via clap | DONE | `serve` and `agent` subcommands | +| Graceful shutdown | DONE | SIGINT + CancellationToken + 5s timeout | +| TLS (rustls) | DONE | Auto-generated self-signed CA + server cert, or user-provided PEM. Added in `cb6ca8c` | +| SMF service manifest | DONE | SMF manifest + method script in `smf/`. Added in `cb6ca8c` | + +## 6. Networking + +| Requirement | Status | Notes | +|---|---|---| +| Etherstub creation | DONE | `dladm create-etherstub` | +| VNIC per zone | DONE | `dladm create-vnic -l etherstub` | +| ipadm IP assignment | PARTIAL | IP set in zonecfg `allowed-address` but no explicit `ipadm create-addr` call | +| IPAM | DONE | Sequential alloc, idempotent, persistent, pool exhaustion handling | +| Service ClusterIP / NAT | NOT DONE | Services stored at API level but no backend controller, no ipnat rules, no proxy, no DNS | + +## 7. Scheduler + +| Requirement | Status | Notes | +|---|---|---| +| Versioned bind_pod() | DONE | Fixed in `c50ecb2` — creates versioned commits | +| Zone brand constraints | DONE | `ZoneBrandMatch` filter checks `reddwarf.io/zone-brand` annotation vs `reddwarf.io/zone-brands` node label. Done in `4c7f50a` | +| Actual resource usage | NOT DONE | Only compares requests vs static allocatable — no runtime metrics | + +--- + +## Priority Order + +### Critical (blocks production) +- [x] TLS — done in `cb6ca8c` +- [x] SMF manifest — done in `cb6ca8c` + +### High (limits reliability) +- [x] Node health checker — done in `58171c7` +- [x] Periodic reconciliation — done in `58171c7` +- [x] Graceful pod termination — done in `58171c7` + +### Medium (limits functionality) +- [ ] Service networking — no ClusterIP, no NAT/proxy, no DNS +- [x] Health probes — exec/HTTP/TCP liveness/readiness/startup probes via zlogin +- [ ] Image management — no pull/registry, no `.zar` support, no golden image bootstrap +- [x] Dynamic node resources — done in `d3eb0b2` + +### Low (nice to have) +- [x] Zone brand scheduling filter — done in `4c7f50a` +- [x] ShuttingDown to Terminating mapping fix — done in `58171c7` +- [ ] bhyve brand — type exists but no implementation diff --git a/crates/reddwarf-runtime/src/controller.rs b/crates/reddwarf-runtime/src/controller.rs index 35c6090..da3b5b2 100644 --- a/crates/reddwarf-runtime/src/controller.rs +++ b/crates/reddwarf-runtime/src/controller.rs @@ -1,14 +1,17 @@ use crate::api_client::ApiClient; use crate::error::{Result, RuntimeError}; use crate::network::{vnic_name_for_pod, Ipam}; +use crate::probes::executor::ProbeExecutor; +use crate::probes::tracker::ProbeTracker; +use crate::probes::types::extract_probes; use crate::traits::ZoneRuntime; use crate::types::*; use chrono::Utc; use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus}; use reddwarf_core::{ResourceEvent, ResourceQuantities, WatchEventType}; use std::sync::Arc; -use std::time::Duration; -use tokio::sync::broadcast; +use std::time::{Duration, Instant}; +use tokio::sync::{broadcast, Mutex}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -38,6 +41,7 @@ pub struct PodController { event_tx: broadcast::Sender, config: PodControllerConfig, ipam: Ipam, + probe_tracker: Mutex, } impl PodController { @@ -48,12 +52,15 @@ impl PodController { config: PodControllerConfig, ipam: Ipam, ) -> Self { + let probe_executor = ProbeExecutor::new(Arc::clone(&runtime)); + let probe_tracker = Mutex::new(ProbeTracker::new(probe_executor)); Self { runtime, api_client, event_tx, config, ipam, + probe_tracker, } } @@ -273,7 +280,124 @@ impl PodController { // Check zone health match self.runtime.get_zone_state(&zone_name).await { Ok(ZoneState::Running) => { - // All good + // Zone is running — execute health probes + let pod_key = format!("{}/{}", namespace, pod_name); + let zone_ip = self.get_pod_ip(pod); + + // Extract and register probes (idempotent) + let probes = self.extract_pod_probes(pod); + let started_at = self.pod_start_time(pod); + + let mut tracker = self.probe_tracker.lock().await; + tracker.register_pod(&pod_key, probes, started_at); + + let status = tracker + .check_pod(&pod_key, &zone_name, &zone_ip) + .await; + drop(tracker); + + if status.liveness_failed { + let message = status.failure_message.unwrap_or_else(|| { + "Liveness probe failed".to_string() + }); + warn!( + "Liveness probe failed for pod {}/{}: {}", + namespace, pod_name, message + ); + let pod_status = PodStatus { + phase: Some("Failed".to_string()), + conditions: Some(vec![PodCondition { + type_: "Ready".to_string(), + status: "False".to_string(), + reason: Some("LivenessProbeFailure".to_string()), + message: Some(message), + ..Default::default() + }]), + ..Default::default() + }; + + if let Err(e) = self + .api_client + .set_pod_status(namespace, pod_name, pod_status) + .await + { + error!("Failed to update pod status to Failed: {}", e); + } + + // Unregister probes for this pod + let mut tracker = self.probe_tracker.lock().await; + tracker.unregister_pod(&pod_key); + } else if !status.ready { + let message = status.failure_message.unwrap_or_else(|| { + "Readiness probe failed".to_string() + }); + debug!( + "Readiness probe not passing for pod {}/{}: {}", + namespace, pod_name, message + ); + + // Only update if currently marked Ready=True + let currently_ready = pod + .status + .as_ref() + .and_then(|s| s.conditions.as_ref()) + .and_then(|c| c.iter().find(|c| c.type_ == "Ready")) + .map(|c| c.status == "True") + .unwrap_or(false); + + if currently_ready { + let pod_status = PodStatus { + phase: Some("Running".to_string()), + conditions: Some(vec![PodCondition { + type_: "Ready".to_string(), + status: "False".to_string(), + reason: Some("ReadinessProbeFailure".to_string()), + message: Some(message), + ..Default::default() + }]), + pod_ip: Some(zone_ip), + ..Default::default() + }; + + if let Err(e) = self + .api_client + .set_pod_status(namespace, pod_name, pod_status) + .await + { + error!("Failed to update pod status: {}", e); + } + } + } else { + // All probes pass — set Ready=True if not already + let currently_ready = pod + .status + .as_ref() + .and_then(|s| s.conditions.as_ref()) + .and_then(|c| c.iter().find(|c| c.type_ == "Ready")) + .map(|c| c.status == "True") + .unwrap_or(false); + + if !currently_ready { + let pod_status = PodStatus { + phase: Some("Running".to_string()), + conditions: Some(vec![PodCondition { + type_: "Ready".to_string(), + status: "True".to_string(), + ..Default::default() + }]), + pod_ip: Some(zone_ip), + ..Default::default() + }; + + if let Err(e) = self + .api_client + .set_pod_status(namespace, pod_name, pod_status) + .await + { + error!("Failed to update pod status: {}", e); + } + } + } } Ok(state) => { warn!( @@ -394,6 +518,11 @@ impl PodController { ); } + // Unregister probes + let pod_key = format!("{}/{}", namespace, pod_name); + let mut tracker = self.probe_tracker.lock().await; + tracker.unregister_pod(&pod_key); + Ok(()) } @@ -502,6 +631,12 @@ impl PodController { ); } + // Unregister probes + let pod_key = format!("{}/{}", namespace, pod_name); + let mut tracker = self.probe_tracker.lock().await; + tracker.unregister_pod(&pod_key); + drop(tracker); + // Finalize — remove the pod from API server storage if let Err(e) = self.api_client.finalize_pod(namespace, pod_name).await { error!( @@ -658,6 +793,47 @@ impl PodController { NetworkMode::Direct(d) => d.ip_address.clone(), } } + + /// Extract probe configurations from all containers in a pod spec + fn extract_pod_probes(&self, pod: &Pod) -> Vec { + let spec = match &pod.spec { + Some(s) => s, + None => return vec![], + }; + + spec.containers + .iter() + .flat_map(|c| extract_probes(c)) + .collect() + } + + /// Get the pod's IP from its status, falling back to empty string + fn get_pod_ip(&self, pod: &Pod) -> String { + pod.status + .as_ref() + .and_then(|s| s.pod_ip.clone()) + .unwrap_or_default() + } + + /// Approximate when the pod's containers started. + /// Uses the pod's start_time if available, otherwise uses now. + fn pod_start_time(&self, pod: &Pod) -> Instant { + // We can't convert k8s Time to std Instant directly. Instead, compute + // the elapsed duration since start_time and subtract from Instant::now(). + if let Some(start_time) = pod + .status + .as_ref() + .and_then(|s| s.start_time.as_ref()) + { + let now_utc = Utc::now(); + let started_utc = start_time.0; + let elapsed = now_utc.signed_duration_since(started_utc); + if let Ok(elapsed_std) = elapsed.to_std() { + return Instant::now().checked_sub(elapsed_std).unwrap_or_else(Instant::now); + } + } + Instant::now() + } } /// Generate a zone name from namespace and pod name @@ -1168,6 +1344,144 @@ mod tests { assert_eq!(zone_config.brand, ZoneBrand::Reddwarf); } + fn make_test_controller_with_runtime() -> (PodController, Arc, tempfile::TempDir) { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test-controller-rt.redb"); + let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); + let ipam = Ipam::new(storage, "10.88.0.0/16").unwrap(); + + let mock_storage = Arc::new(crate::storage::MockStorageEngine::new( + crate::types::StoragePoolConfig::from_pool("rpool"), + )); + let runtime = Arc::new(crate::mock::MockRuntime::new(mock_storage)); + let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); + let (event_tx, _) = broadcast::channel(16); + + let config = PodControllerConfig { + node_name: "node1".to_string(), + api_url: "http://127.0.0.1:6443".to_string(), + zonepath_prefix: "/zones".to_string(), + default_brand: ZoneBrand::Reddwarf, + etherstub_name: "reddwarf0".to_string(), + pod_cidr: "10.88.0.0/16".to_string(), + reconcile_interval: Duration::from_secs(30), + }; + + let controller = PodController::new(runtime.clone() as Arc, api_client, event_tx, config, ipam); + (controller, runtime, dir) + } + + #[tokio::test] + async fn test_reconcile_running_pod_with_no_probes() { + let (controller, runtime, _dir) = make_test_controller_with_runtime(); + + // Create a pod that is already Running with a provisioned zone + let mut pod = Pod::default(); + pod.metadata.name = Some("no-probe-pod".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.spec = Some(PodSpec { + node_name: Some("node1".to_string()), + containers: vec![Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + ..Default::default() + }], + ..Default::default() + }); + pod.status = Some(PodStatus { + phase: Some("Running".to_string()), + pod_ip: Some("10.88.0.2".to_string()), + conditions: Some(vec![PodCondition { + type_: "Ready".to_string(), + status: "True".to_string(), + ..Default::default() + }]), + ..Default::default() + }); + + // Provision the zone so get_zone_state returns Running + let zone_config = controller.pod_to_zone_config(&pod).unwrap(); + runtime.provision(&zone_config).await.unwrap(); + + // Reconcile — should succeed without changing anything (no probes) + let result = controller.reconcile(&pod).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_reconcile_running_pod_liveness_failure() { + let (controller, runtime, _dir) = make_test_controller_with_runtime(); + + let mut pod = Pod::default(); + pod.metadata.name = Some("liveness-pod".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.spec = Some(PodSpec { + node_name: Some("node1".to_string()), + containers: vec![Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + liveness_probe: Some(k8s_openapi::api::core::v1::Probe { + exec: Some(k8s_openapi::api::core::v1::ExecAction { + command: Some(vec!["healthcheck".to_string()]), + }), + period_seconds: Some(0), // Always run + failure_threshold: Some(3), + ..Default::default() + }), + ..Default::default() + }], + ..Default::default() + }); + pod.status = Some(PodStatus { + phase: Some("Running".to_string()), + pod_ip: Some("10.88.0.2".to_string()), + conditions: Some(vec![PodCondition { + type_: "Ready".to_string(), + status: "True".to_string(), + ..Default::default() + }]), + ..Default::default() + }); + + // Provision the zone + let zone_config = controller.pod_to_zone_config(&pod).unwrap(); + runtime.provision(&zone_config).await.unwrap(); + + // Queue failures for the liveness probe + let zone_name = pod_zone_name("default", "liveness-pod"); + for _ in 0..3 { + runtime + .set_exec_result( + &zone_name, + crate::command::CommandOutput { + stdout: String::new(), + stderr: "unhealthy".to_string(), + exit_code: 1, + }, + ) + .await; + } + + // Reconcile 3 times to hit the failure threshold. + // On the 3rd reconcile, liveness failure is detected. The controller + // then unregisters the probes and tries to set the pod status to Failed + // (which fails silently since there's no API server). + for _ in 0..3 { + let _ = controller.reconcile(&pod).await; + } + + // Verify the liveness failure path was taken: probes should be unregistered + let pod_key = "default/liveness-pod"; + let mut tracker = controller.probe_tracker.lock().await; + let status = tracker + .check_pod(pod_key, &zone_name, "10.88.0.2") + .await; + // No probes registered → default status (ready=true, liveness_failed=false) + // This confirms the unregister happened, which only occurs on liveness failure + assert!(status.ready); + assert!(!status.liveness_failed); + } + #[tokio::test] async fn test_reconcile_with_deletion_timestamp_uses_termination() { let (controller, _dir) = make_test_controller(); diff --git a/crates/reddwarf-runtime/src/error.rs b/crates/reddwarf-runtime/src/error.rs index 7c5dd2e..d29a478 100644 --- a/crates/reddwarf-runtime/src/error.rs +++ b/crates/reddwarf-runtime/src/error.rs @@ -155,6 +155,25 @@ pub enum RuntimeError { message: String, }, + /// Health probe failed + #[error("Health probe failed for container '{container_name}' in zone '{zone_name}': {message}")] + #[diagnostic( + code(reddwarf::runtime::probe_failed), + help("Check that the probe target is reachable inside the zone. Failure count: {failure_count}/{failure_threshold}. Verify the application is running and the probe command/port/path is correct") + )] + ProbeFailed { + #[allow(unused)] + zone_name: String, + #[allow(unused)] + container_name: String, + #[allow(unused)] + message: String, + #[allow(unused)] + failure_count: u32, + #[allow(unused)] + failure_threshold: u32, + }, + /// Internal error #[error("Internal runtime error: {message}")] #[diagnostic( @@ -241,6 +260,22 @@ impl RuntimeError { } } + pub fn probe_failed( + zone_name: impl Into, + container_name: impl Into, + message: impl Into, + failure_count: u32, + failure_threshold: u32, + ) -> Self { + Self::ProbeFailed { + zone_name: zone_name.into(), + container_name: container_name.into(), + message: message.into(), + failure_count, + failure_threshold, + } + } + pub fn internal_error(message: impl Into) -> Self { Self::InternalError { message: message.into(), diff --git a/crates/reddwarf-runtime/src/illumos.rs b/crates/reddwarf-runtime/src/illumos.rs index 657252c..87e2b2a 100644 --- a/crates/reddwarf-runtime/src/illumos.rs +++ b/crates/reddwarf-runtime/src/illumos.rs @@ -1,5 +1,5 @@ use crate::brand::lx::lx_install_args; -use crate::command::exec; +use crate::command::{exec, CommandOutput}; use crate::error::Result; use crate::storage::StorageEngine; use crate::traits::ZoneRuntime; @@ -91,6 +91,13 @@ impl ZoneRuntime for IllumosRuntime { Ok(()) } + async fn exec_in_zone(&self, zone_name: &str, command: &[String]) -> Result { + let mut args: Vec<&str> = vec![zone_name]; + let str_refs: Vec<&str> = command.iter().map(|s| s.as_str()).collect(); + args.extend(str_refs); + crate::command::exec_unchecked("zlogin", &args).await + } + async fn get_zone_state(&self, zone_name: &str) -> Result { let output = exec("zoneadm", &["-z", zone_name, "list", "-p"]).await?; let line = output.stdout.trim(); diff --git a/crates/reddwarf-runtime/src/lib.rs b/crates/reddwarf-runtime/src/lib.rs index 3e9e6f8..dfee7bf 100644 --- a/crates/reddwarf-runtime/src/lib.rs +++ b/crates/reddwarf-runtime/src/lib.rs @@ -11,6 +11,7 @@ pub mod illumos; pub mod mock; pub mod network; pub mod node_agent; +pub mod probes; pub mod node_health; pub mod storage; pub mod sysinfo; @@ -38,6 +39,7 @@ pub use api_client::ApiClient; pub use controller::{PodController, PodControllerConfig}; pub use node_agent::{NodeAgent, NodeAgentConfig}; pub use node_health::{NodeHealthChecker, NodeHealthCheckerConfig}; +pub use probes::{ProbeExecutor, ProbeTracker}; // Conditionally re-export illumos runtime #[cfg(target_os = "illumos")] diff --git a/crates/reddwarf-runtime/src/mock.rs b/crates/reddwarf-runtime/src/mock.rs index 9540a28..52fe03b 100644 --- a/crates/reddwarf-runtime/src/mock.rs +++ b/crates/reddwarf-runtime/src/mock.rs @@ -1,9 +1,10 @@ +use crate::command::CommandOutput; use crate::error::{Result, RuntimeError}; use crate::storage::StorageEngine; use crate::traits::ZoneRuntime; use crate::types::*; use async_trait::async_trait; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use tokio::sync::RwLock; use tracing::debug; @@ -25,6 +26,7 @@ pub struct MockRuntime { zones: Arc>>, next_id: Arc>, storage: Arc, + exec_results: Arc>>>, } impl MockRuntime { @@ -33,8 +35,19 @@ impl MockRuntime { zones: Arc::new(RwLock::new(HashMap::new())), next_id: Arc::new(RwLock::new(1)), storage, + exec_results: Arc::new(RwLock::new(HashMap::new())), } } + + /// Queue a custom exec result for a specific zone. + /// Results are consumed in FIFO order. Once exhausted, falls back to defaults. + pub async fn set_exec_result(&self, zone_name: &str, output: CommandOutput) { + let mut results = self.exec_results.write().await; + results + .entry(zone_name.to_string()) + .or_default() + .push_back(output); + } } #[async_trait] @@ -181,6 +194,40 @@ impl ZoneRuntime for MockRuntime { Ok(()) } + async fn exec_in_zone(&self, zone_name: &str, _command: &[String]) -> Result { + // Check for queued custom results first + { + let mut results = self.exec_results.write().await; + if let Some(queue) = results.get_mut(zone_name) { + if let Some(output) = queue.pop_front() { + return Ok(output); + } + } + } + + // Default behavior: succeed if zone is Running, error otherwise + let zones = self.zones.read().await; + let zone = zones + .get(zone_name) + .ok_or_else(|| RuntimeError::zone_not_found(zone_name))?; + + if zone.state == ZoneState::Running { + Ok(CommandOutput { + stdout: String::new(), + stderr: String::new(), + exit_code: 0, + }) + } else { + Err(RuntimeError::zone_operation_failed( + zone_name, + format!( + "Cannot exec in zone: zone is in state {} (expected Running)", + zone.state + ), + )) + } + } + async fn get_zone_state(&self, zone_name: &str) -> Result { let zones = self.zones.read().await; let zone = zones @@ -316,6 +363,77 @@ mod tests { } } + #[tokio::test] + async fn test_exec_in_zone_running_default_success() { + let rt = MockRuntime::new(make_test_storage()); + let config = make_test_config("exec-zone"); + rt.provision(&config).await.unwrap(); + + let output = rt + .exec_in_zone("exec-zone", &["echo".to_string(), "hello".to_string()]) + .await + .unwrap(); + assert_eq!(output.exit_code, 0); + } + + #[tokio::test] + async fn test_exec_in_zone_not_running_errors() { + let rt = MockRuntime::new(make_test_storage()); + let config = make_test_config("stopped-zone"); + rt.create_zone(&config).await.unwrap(); + rt.install_zone("stopped-zone").await.unwrap(); + // Zone is Installed, not Running + + let result = rt + .exec_in_zone("stopped-zone", &["echo".to_string()]) + .await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_exec_in_zone_not_found_errors() { + let rt = MockRuntime::new(make_test_storage()); + + let result = rt + .exec_in_zone("nonexistent", &["echo".to_string()]) + .await; + assert!(matches!( + result.unwrap_err(), + RuntimeError::ZoneNotFound { .. } + )); + } + + #[tokio::test] + async fn test_exec_in_zone_custom_result() { + let rt = MockRuntime::new(make_test_storage()); + let config = make_test_config("custom-zone"); + rt.provision(&config).await.unwrap(); + + rt.set_exec_result( + "custom-zone", + crate::command::CommandOutput { + stdout: String::new(), + stderr: "probe failed".to_string(), + exit_code: 1, + }, + ) + .await; + + let output = rt + .exec_in_zone("custom-zone", &["check".to_string()]) + .await + .unwrap(); + assert_eq!(output.exit_code, 1); + assert_eq!(output.stderr, "probe failed"); + + // Second call falls back to default (success) + let output2 = rt + .exec_in_zone("custom-zone", &["check".to_string()]) + .await + .unwrap(); + assert_eq!(output2.exit_code, 0); + } + #[tokio::test] async fn test_provision_transitions_to_running() { let rt = MockRuntime::new(make_test_storage()); diff --git a/crates/reddwarf-runtime/src/probes/executor.rs b/crates/reddwarf-runtime/src/probes/executor.rs new file mode 100644 index 0000000..1e919de --- /dev/null +++ b/crates/reddwarf-runtime/src/probes/executor.rs @@ -0,0 +1,358 @@ +use crate::probes::types::{ProbeAction, ProbeOutcome, ProbeResult}; +use crate::traits::ZoneRuntime; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tracing::warn; + +/// Executes individual probe checks against a zone +pub struct ProbeExecutor { + runtime: Arc, +} + +impl ProbeExecutor { + pub fn new(runtime: Arc) -> Self { + Self { runtime } + } + + /// Execute a single probe action and return the result + pub async fn execute( + &self, + zone_name: &str, + zone_ip: &str, + action: &ProbeAction, + timeout: Duration, + ) -> ProbeResult { + let start = Instant::now(); + + let outcome = + match tokio::time::timeout(timeout, self.execute_inner(zone_name, zone_ip, action)) + .await + { + Ok(outcome) => outcome, + Err(_) => ProbeOutcome::Failure(format!( + "probe timed out after {}s", + timeout.as_secs() + )), + }; + + ProbeResult { + outcome, + duration: start.elapsed(), + timestamp: start, + } + } + + async fn execute_inner( + &self, + zone_name: &str, + zone_ip: &str, + action: &ProbeAction, + ) -> ProbeOutcome { + match action { + ProbeAction::Exec { command } => self.exec_probe(zone_name, command).await, + ProbeAction::HttpGet { + path, + port, + host, + scheme, + } => { + let target_host = if host == "localhost" { zone_ip } else { host }; + self.http_probe(target_host, *port, path, scheme).await + } + ProbeAction::TcpSocket { port, host } => { + let target_host = if host == "localhost" { zone_ip } else { host }; + self.tcp_probe(target_host, *port).await + } + } + } + + async fn exec_probe(&self, zone_name: &str, command: &[String]) -> ProbeOutcome { + match self.runtime.exec_in_zone(zone_name, command).await { + Ok(output) => { + if output.exit_code == 0 { + ProbeOutcome::Success + } else { + ProbeOutcome::Failure(format!( + "command exited with code {} (stderr: {})", + output.exit_code, + output.stderr.trim() + )) + } + } + Err(e) => ProbeOutcome::Error(format!("exec failed: {}", e)), + } + } + + async fn tcp_probe(&self, host: &str, port: u16) -> ProbeOutcome { + let addr = format!("{}:{}", host, port); + match TcpStream::connect(&addr).await { + Ok(_) => ProbeOutcome::Success, + Err(e) => ProbeOutcome::Failure(format!("TCP connection to {} failed: {}", addr, e)), + } + } + + async fn http_probe(&self, host: &str, port: u16, path: &str, scheme: &str) -> ProbeOutcome { + if scheme.eq_ignore_ascii_case("HTTPS") { + // HTTPS falls back to TCP-only check with warning — we don't have + // a TLS client in this context. + warn!( + "HTTPS probe to {}:{}{} falling back to TCP-only check", + host, port, path + ); + return self.tcp_probe(host, port).await; + } + + let addr = format!("{}:{}", host, port); + let mut stream = match TcpStream::connect(&addr).await { + Ok(s) => s, + Err(e) => { + return ProbeOutcome::Failure(format!( + "HTTP connection to {} failed: {}", + addr, e + )) + } + }; + + let request = format!( + "GET {} HTTP/1.1\r\nHost: {}:{}\r\nConnection: close\r\n\r\n", + path, host, port + ); + + if let Err(e) = stream.write_all(request.as_bytes()).await { + return ProbeOutcome::Failure(format!("HTTP write failed: {}", e)); + } + + let mut response = Vec::new(); + if let Err(e) = stream.read_to_end(&mut response).await { + return ProbeOutcome::Failure(format!("HTTP read failed: {}", e)); + } + + let response_str = String::from_utf8_lossy(&response); + + // Parse status code from HTTP/1.1 response line + if let Some(status_line) = response_str.lines().next() { + let parts: Vec<&str> = status_line.split_whitespace().collect(); + if parts.len() >= 2 { + if let Ok(status) = parts[1].parse::() { + if (200..300).contains(&status) { + return ProbeOutcome::Success; + } else { + return ProbeOutcome::Failure(format!( + "HTTP probe returned status {}", + status + )); + } + } + } + } + + ProbeOutcome::Failure("HTTP probe: could not parse response status".to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::command::CommandOutput; + use crate::mock::MockRuntime; + use crate::storage::MockStorageEngine; + use crate::traits::ZoneRuntime; + use crate::types::{StoragePoolConfig, ZoneBrand, ZoneConfig, ZoneStorageOpts, NetworkMode, EtherstubConfig}; + use tokio::net::TcpListener; + + fn make_test_runtime() -> Arc { + let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool("rpool"))); + Arc::new(MockRuntime::new(storage)) + } + + fn make_zone_config(name: &str) -> ZoneConfig { + ZoneConfig { + zone_name: name.to_string(), + brand: ZoneBrand::Reddwarf, + zonepath: format!("/zones/{}", name), + network: NetworkMode::Etherstub(EtherstubConfig { + etherstub_name: "reddwarf0".to_string(), + vnic_name: format!("vnic_{}", name), + ip_address: "10.0.0.2".to_string(), + gateway: "10.0.0.1".to_string(), + prefix_len: 16, + }), + storage: ZoneStorageOpts::default(), + lx_image_path: None, + processes: vec![], + cpu_cap: None, + memory_cap: None, + fs_mounts: vec![], + } + } + + #[tokio::test] + async fn test_exec_probe_success() { + let runtime = make_test_runtime(); + let config = make_zone_config("exec-ok"); + runtime.provision(&config).await.unwrap(); + + let executor = ProbeExecutor::new(runtime.clone()); + let action = ProbeAction::Exec { + command: vec!["true".to_string()], + }; + + let result = executor + .execute("exec-ok", "10.0.0.2", &action, Duration::from_secs(5)) + .await; + assert_eq!(result.outcome, ProbeOutcome::Success); + } + + #[tokio::test] + async fn test_exec_probe_failure() { + let runtime = make_test_runtime(); + let config = make_zone_config("exec-fail"); + runtime.provision(&config).await.unwrap(); + + runtime + .set_exec_result( + "exec-fail", + CommandOutput { + stdout: String::new(), + stderr: "unhealthy".to_string(), + exit_code: 1, + }, + ) + .await; + + let executor = ProbeExecutor::new(runtime.clone()); + let action = ProbeAction::Exec { + command: vec!["check".to_string()], + }; + + let result = executor + .execute("exec-fail", "10.0.0.2", &action, Duration::from_secs(5)) + .await; + assert!(matches!(result.outcome, ProbeOutcome::Failure(_))); + } + + #[tokio::test] + async fn test_exec_probe_timeout() { + let runtime = make_test_runtime(); + let config = make_zone_config("exec-timeout"); + runtime.provision(&config).await.unwrap(); + + // The mock exec returns instantly, so we simulate a timeout by using + // an extremely short timeout. However, since mock is instant, we + // test the timeout path by checking that the executor handles timeouts + // We'll test conceptually — a real timeout would require a blocking mock. + // Instead, verify the success path still works with a normal timeout. + let executor = ProbeExecutor::new(runtime.clone()); + let action = ProbeAction::Exec { + command: vec!["true".to_string()], + }; + + let result = executor + .execute("exec-timeout", "10.0.0.2", &action, Duration::from_secs(1)) + .await; + assert_eq!(result.outcome, ProbeOutcome::Success); + } + + #[tokio::test] + async fn test_tcp_probe_success() { + // Bind a listener so the TCP probe succeeds + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + let runtime = make_test_runtime(); + let executor = ProbeExecutor::new(runtime); + let action = ProbeAction::TcpSocket { + port, + host: "127.0.0.1".to_string(), + }; + + let result = executor + .execute("any-zone", "127.0.0.1", &action, Duration::from_secs(5)) + .await; + assert_eq!(result.outcome, ProbeOutcome::Success); + + drop(listener); + } + + #[tokio::test] + async fn test_tcp_probe_failure() { + let runtime = make_test_runtime(); + let executor = ProbeExecutor::new(runtime); + // Use a port that is almost certainly not listening + let action = ProbeAction::TcpSocket { + port: 1, + host: "127.0.0.1".to_string(), + }; + + let result = executor + .execute("any-zone", "127.0.0.1", &action, Duration::from_secs(5)) + .await; + assert!(matches!(result.outcome, ProbeOutcome::Failure(_))); + } + + #[tokio::test] + async fn test_http_probe_success() { + // Spin up a minimal HTTP server + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + let server = tokio::spawn(async move { + if let Ok((mut stream, _)) = listener.accept().await { + let mut buf = [0u8; 1024]; + let _ = stream.read(&mut buf).await; + let response = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK"; + let _ = stream.write_all(response.as_bytes()).await; + } + }); + + let runtime = make_test_runtime(); + let executor = ProbeExecutor::new(runtime); + let action = ProbeAction::HttpGet { + path: "/healthz".to_string(), + port, + host: "127.0.0.1".to_string(), + scheme: "HTTP".to_string(), + }; + + let result = executor + .execute("any-zone", "127.0.0.1", &action, Duration::from_secs(5)) + .await; + assert_eq!(result.outcome, ProbeOutcome::Success); + + server.abort(); + } + + #[tokio::test] + async fn test_http_probe_non_200() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + let server = tokio::spawn(async move { + if let Ok((mut stream, _)) = listener.accept().await { + let mut buf = [0u8; 1024]; + let _ = stream.read(&mut buf).await; + let response = + "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 5\r\n\r\nError"; + let _ = stream.write_all(response.as_bytes()).await; + } + }); + + let runtime = make_test_runtime(); + let executor = ProbeExecutor::new(runtime); + let action = ProbeAction::HttpGet { + path: "/healthz".to_string(), + port, + host: "127.0.0.1".to_string(), + scheme: "HTTP".to_string(), + }; + + let result = executor + .execute("any-zone", "127.0.0.1", &action, Duration::from_secs(5)) + .await; + assert!(matches!(result.outcome, ProbeOutcome::Failure(_))); + + server.abort(); + } +} diff --git a/crates/reddwarf-runtime/src/probes/mod.rs b/crates/reddwarf-runtime/src/probes/mod.rs new file mode 100644 index 0000000..2e3e4cd --- /dev/null +++ b/crates/reddwarf-runtime/src/probes/mod.rs @@ -0,0 +1,9 @@ +pub mod executor; +pub mod tracker; +pub mod types; + +pub use executor::ProbeExecutor; +pub use tracker::{PodProbeStatus, ProbeTracker}; +pub use types::{ + ContainerProbeConfig, ProbeAction, ProbeKind, ProbeOutcome, ProbeResult, extract_probes, +}; diff --git a/crates/reddwarf-runtime/src/probes/tracker.rs b/crates/reddwarf-runtime/src/probes/tracker.rs new file mode 100644 index 0000000..8dd54ea --- /dev/null +++ b/crates/reddwarf-runtime/src/probes/tracker.rs @@ -0,0 +1,515 @@ +use crate::probes::executor::ProbeExecutor; +use crate::probes::types::{ContainerProbeConfig, ProbeKind, ProbeOutcome}; +use std::collections::HashMap; +use std::time::{Duration, Instant}; +use tracing::{debug, warn}; + +/// Composite key for per-probe state +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct ProbeKey { + pod_key: String, + container_name: String, + kind: ProbeKind, +} + +/// Per-probe mutable state +struct ProbeState { + config: ContainerProbeConfig, + container_started_at: Instant, + last_check: Option, + consecutive_successes: u32, + consecutive_failures: u32, + has_succeeded: bool, +} + +/// Aggregate probe status for a pod +#[derive(Debug, Clone)] +pub struct PodProbeStatus { + /// All readiness probes pass (or none defined) + pub ready: bool, + /// Any liveness probe has failed past its failure threshold + pub liveness_failed: bool, + /// Diagnostic detail about the failure + pub failure_message: Option, +} + +/// Tracks probe state for all pods and drives periodic checks +pub struct ProbeTracker { + states: HashMap, + executor: ProbeExecutor, +} + +impl ProbeTracker { + pub fn new(executor: ProbeExecutor) -> Self { + Self { + states: HashMap::new(), + executor, + } + } + + /// Register (or re-register) probes for a pod. Idempotent — existing state + /// is preserved if the probe key already exists. + pub fn register_pod( + &mut self, + pod_key: &str, + probes: Vec, + started_at: Instant, + ) { + for config in probes { + let key = ProbeKey { + pod_key: pod_key.to_string(), + container_name: config.container_name.clone(), + kind: config.kind, + }; + + // Idempotent: don't overwrite existing tracking state + self.states.entry(key).or_insert(ProbeState { + config, + container_started_at: started_at, + last_check: None, + consecutive_successes: 0, + consecutive_failures: 0, + has_succeeded: false, + }); + } + } + + /// Remove all probe state for a pod + pub fn unregister_pod(&mut self, pod_key: &str) { + self.states.retain(|k, _| k.pod_key != pod_key); + } + + /// Run due probes for a pod and return its aggregate status + pub async fn check_pod( + &mut self, + pod_key: &str, + zone_name: &str, + zone_ip: &str, + ) -> PodProbeStatus { + let now = Instant::now(); + + // Collect keys for this pod + let keys: Vec = self + .states + .keys() + .filter(|k| k.pod_key == pod_key) + .cloned() + .collect(); + + if keys.is_empty() { + // No probes registered — pod is ready by default + return PodProbeStatus { + ready: true, + liveness_failed: false, + failure_message: None, + }; + } + + // Check whether startup probes have succeeded (gates liveness) + let startup_succeeded: HashMap = { + let mut map = HashMap::new(); + for key in &keys { + if key.kind == ProbeKind::Startup { + if let Some(state) = self.states.get(key) { + map.insert(key.container_name.clone(), state.has_succeeded); + } + } + } + map + }; + + // Run probes + for key in &keys { + let state = match self.states.get(key) { + Some(s) => s, + None => continue, + }; + + // Skip liveness probes if startup probe hasn't succeeded yet + if key.kind == ProbeKind::Liveness { + if let Some(&startup_done) = startup_succeeded.get(&key.container_name) { + if !startup_done { + debug!( + "Skipping liveness probe for container '{}' — startup probe hasn't passed yet", + key.container_name + ); + continue; + } + } + } + + // Check initial delay + let elapsed_since_start = now.duration_since(state.container_started_at); + if elapsed_since_start < Duration::from_secs(state.config.initial_delay_seconds as u64) + { + debug!( + "Skipping {} probe for container '{}' — initial delay not elapsed", + key.kind, key.container_name + ); + continue; + } + + // Check period + if let Some(last) = state.last_check { + let since_last = now.duration_since(last); + if since_last < Duration::from_secs(state.config.period_seconds as u64) { + continue; + } + } + + // Execute the probe + let timeout = Duration::from_secs(state.config.timeout_seconds as u64); + let result = self + .executor + .execute(zone_name, zone_ip, &state.config.action, timeout) + .await; + + // Update state + let state = self.states.get_mut(key).unwrap(); + state.last_check = Some(now); + + match result.outcome { + ProbeOutcome::Success => { + state.consecutive_successes += 1; + state.consecutive_failures = 0; + if state.consecutive_successes >= state.config.success_threshold { + state.has_succeeded = true; + } + } + ProbeOutcome::Failure(ref msg) | ProbeOutcome::Error(ref msg) => { + state.consecutive_failures += 1; + state.consecutive_successes = 0; + if state.consecutive_failures >= state.config.failure_threshold { + warn!( + "{} probe failed for container '{}': {} (failures: {}/{})", + key.kind, + key.container_name, + msg, + state.consecutive_failures, + state.config.failure_threshold + ); + } + } + } + } + + // Compute aggregate status + let mut ready = true; + let mut liveness_failed = false; + let mut failure_message = None; + + for key in &keys { + let state = match self.states.get(key) { + Some(s) => s, + None => continue, + }; + + match key.kind { + ProbeKind::Readiness => { + if !state.has_succeeded + || state.consecutive_failures >= state.config.failure_threshold + { + ready = false; + if state.consecutive_failures >= state.config.failure_threshold { + failure_message = Some(format!( + "Readiness probe failed for container '{}' ({} consecutive failures)", + key.container_name, state.consecutive_failures + )); + } + } + } + ProbeKind::Liveness => { + if state.consecutive_failures >= state.config.failure_threshold { + liveness_failed = true; + failure_message = Some(format!( + "Liveness probe failed for container '{}' ({} consecutive failures)", + key.container_name, state.consecutive_failures + )); + } + } + ProbeKind::Startup => { + // Startup probe failure past threshold is treated as liveness failure + if !state.has_succeeded + && state.consecutive_failures >= state.config.failure_threshold + { + liveness_failed = true; + failure_message = Some(format!( + "Startup probe failed for container '{}' ({} consecutive failures)", + key.container_name, state.consecutive_failures + )); + } + // Also gate readiness on startup + if !state.has_succeeded { + ready = false; + } + } + } + } + + PodProbeStatus { + ready, + liveness_failed, + failure_message, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::command::CommandOutput; + use crate::mock::MockRuntime; + use crate::probes::types::{ContainerProbeConfig, ProbeAction, ProbeKind}; + use crate::storage::MockStorageEngine; + use crate::traits::ZoneRuntime; + use crate::types::{ + EtherstubConfig, NetworkMode, StoragePoolConfig, ZoneBrand, ZoneConfig, ZoneStorageOpts, + }; + use std::sync::Arc; + + fn make_test_runtime() -> Arc { + let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool("rpool"))); + Arc::new(MockRuntime::new(storage)) + } + + fn make_zone_config(name: &str) -> ZoneConfig { + ZoneConfig { + zone_name: name.to_string(), + brand: ZoneBrand::Reddwarf, + zonepath: format!("/zones/{}", name), + network: NetworkMode::Etherstub(EtherstubConfig { + etherstub_name: "reddwarf0".to_string(), + vnic_name: format!("vnic_{}", name), + ip_address: "10.0.0.2".to_string(), + gateway: "10.0.0.1".to_string(), + prefix_len: 16, + }), + storage: ZoneStorageOpts::default(), + lx_image_path: None, + processes: vec![], + cpu_cap: None, + memory_cap: None, + fs_mounts: vec![], + } + } + + fn exec_probe_config( + container: &str, + kind: ProbeKind, + failure_threshold: u32, + ) -> ContainerProbeConfig { + ContainerProbeConfig { + container_name: container.to_string(), + kind, + action: ProbeAction::Exec { + command: vec!["check".to_string()], + }, + initial_delay_seconds: 0, + period_seconds: 0, // Always due + timeout_seconds: 5, + failure_threshold, + success_threshold: 1, + } + } + + #[tokio::test] + async fn test_register_and_check_success() { + let runtime = make_test_runtime(); + let config = make_zone_config("probe-ok"); + runtime.provision(&config).await.unwrap(); + + let executor = ProbeExecutor::new(runtime.clone()); + let mut tracker = ProbeTracker::new(executor); + + let probes = vec![exec_probe_config("web", ProbeKind::Liveness, 3)]; + tracker.register_pod("default/probe-ok", probes, Instant::now()); + + let status = tracker + .check_pod("default/probe-ok", "probe-ok", "10.0.0.2") + .await; + assert!(!status.liveness_failed); + assert!(status.ready); // No readiness probes → default ready + } + + #[tokio::test] + async fn test_liveness_failure_after_threshold() { + let runtime = make_test_runtime(); + let config = make_zone_config("liveness-fail"); + runtime.provision(&config).await.unwrap(); + + // Queue 3 failures (threshold is 3) + for _ in 0..3 { + runtime + .set_exec_result( + "liveness-fail", + CommandOutput { + stdout: String::new(), + stderr: "unhealthy".to_string(), + exit_code: 1, + }, + ) + .await; + } + + let executor = ProbeExecutor::new(runtime.clone()); + let mut tracker = ProbeTracker::new(executor); + + let probes = vec![exec_probe_config("web", ProbeKind::Liveness, 3)]; + tracker.register_pod("default/liveness-fail", probes, Instant::now()); + + // Run probes 3 times to hit the threshold — the 3rd call reaches it + let mut status = PodProbeStatus { + ready: true, + liveness_failed: false, + failure_message: None, + }; + for _ in 0..3 { + status = tracker + .check_pod("default/liveness-fail", "liveness-fail", "10.0.0.2") + .await; + } + + assert!(status.liveness_failed); + assert!(status.failure_message.is_some()); + } + + #[tokio::test] + async fn test_readiness_failure_sets_not_ready() { + let runtime = make_test_runtime(); + let config = make_zone_config("readiness-fail"); + runtime.provision(&config).await.unwrap(); + + // Queue failures + for _ in 0..3 { + runtime + .set_exec_result( + "readiness-fail", + CommandOutput { + stdout: String::new(), + stderr: "not ready".to_string(), + exit_code: 1, + }, + ) + .await; + } + + let executor = ProbeExecutor::new(runtime.clone()); + let mut tracker = ProbeTracker::new(executor); + + let probes = vec![exec_probe_config("web", ProbeKind::Readiness, 3)]; + tracker.register_pod("default/readiness-fail", probes, Instant::now()); + + // Run probes 3 times — the 3rd call reaches the threshold + let mut status = PodProbeStatus { + ready: true, + liveness_failed: false, + failure_message: None, + }; + for _ in 0..3 { + status = tracker + .check_pod("default/readiness-fail", "readiness-fail", "10.0.0.2") + .await; + } + + assert!(!status.ready); + assert!(!status.liveness_failed); // Readiness failure doesn't kill the pod + } + + #[tokio::test] + async fn test_initial_delay_respected() { + let runtime = make_test_runtime(); + let config = make_zone_config("delay-zone"); + runtime.provision(&config).await.unwrap(); + + // Queue a failure — but probe should not run due to initial delay + runtime + .set_exec_result( + "delay-zone", + CommandOutput { + stdout: String::new(), + stderr: "fail".to_string(), + exit_code: 1, + }, + ) + .await; + + let executor = ProbeExecutor::new(runtime.clone()); + let mut tracker = ProbeTracker::new(executor); + + let mut probe_cfg = exec_probe_config("web", ProbeKind::Liveness, 1); + probe_cfg.initial_delay_seconds = 3600; // 1 hour delay — won't be reached + + tracker.register_pod("default/delay-zone", vec![probe_cfg], Instant::now()); + + let status = tracker + .check_pod("default/delay-zone", "delay-zone", "10.0.0.2") + .await; + // Probe should have been skipped, so no failure + assert!(!status.liveness_failed); + } + + #[tokio::test] + async fn test_startup_gates_liveness() { + let runtime = make_test_runtime(); + let config = make_zone_config("startup-gate"); + runtime.provision(&config).await.unwrap(); + + // Startup will fail, liveness should be skipped + runtime + .set_exec_result( + "startup-gate", + CommandOutput { + stdout: String::new(), + stderr: "not started".to_string(), + exit_code: 1, + }, + ) + .await; + + let executor = ProbeExecutor::new(runtime.clone()); + let mut tracker = ProbeTracker::new(executor); + + let probes = vec![ + ContainerProbeConfig { + container_name: "web".to_string(), + kind: ProbeKind::Startup, + action: ProbeAction::Exec { + command: vec!["startup-check".to_string()], + }, + initial_delay_seconds: 0, + period_seconds: 0, + timeout_seconds: 5, + failure_threshold: 10, // High threshold so we don't fail yet + success_threshold: 1, + }, + exec_probe_config("web", ProbeKind::Liveness, 1), + ]; + tracker.register_pod("default/startup-gate", probes, Instant::now()); + + let status = tracker + .check_pod("default/startup-gate", "startup-gate", "10.0.0.2") + .await; + // Startup hasn't succeeded → liveness should be skipped → no liveness failure + assert!(!status.liveness_failed); + // But pod is not ready (startup gate) + assert!(!status.ready); + } + + #[tokio::test] + async fn test_unregister_cleans_state() { + let runtime = make_test_runtime(); + let executor = ProbeExecutor::new(runtime.clone()); + let mut tracker = ProbeTracker::new(executor); + + let probes = vec![exec_probe_config("web", ProbeKind::Liveness, 3)]; + tracker.register_pod("default/cleanup-pod", probes, Instant::now()); + + // Verify state exists + assert!(!tracker.states.is_empty()); + + tracker.unregister_pod("default/cleanup-pod"); + + // State should be empty + assert!(tracker.states.is_empty()); + } +} diff --git a/crates/reddwarf-runtime/src/probes/types.rs b/crates/reddwarf-runtime/src/probes/types.rs new file mode 100644 index 0000000..8d28e97 --- /dev/null +++ b/crates/reddwarf-runtime/src/probes/types.rs @@ -0,0 +1,284 @@ +use k8s_openapi::api::core::v1::Container; +use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; +use std::time::{Duration, Instant}; + +/// Which kind of probe this is +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ProbeKind { + Startup, + Liveness, + Readiness, +} + +impl std::fmt::Display for ProbeKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ProbeKind::Startup => write!(f, "startup"), + ProbeKind::Liveness => write!(f, "liveness"), + ProbeKind::Readiness => write!(f, "readiness"), + } + } +} + +/// The action a probe performs +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProbeAction { + Exec { command: Vec }, + HttpGet { path: String, port: u16, host: String, scheme: String }, + TcpSocket { port: u16, host: String }, +} + +/// Extracted probe configuration for a single container + probe kind +#[derive(Debug, Clone)] +pub struct ContainerProbeConfig { + pub container_name: String, + pub kind: ProbeKind, + pub action: ProbeAction, + pub initial_delay_seconds: u32, + pub period_seconds: u32, + pub timeout_seconds: u32, + pub failure_threshold: u32, + pub success_threshold: u32, +} + +/// Outcome of a single probe execution +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProbeOutcome { + Success, + Failure(String), + Error(String), +} + +/// Result of a probe execution with timing metadata +#[derive(Debug, Clone)] +pub struct ProbeResult { + pub outcome: ProbeOutcome, + pub duration: Duration, + pub timestamp: Instant, +} + +/// Resolve an IntOrString port to a u16. +/// Named ports are not supported (would require pod spec lookup); they return 0. +fn resolve_port(port: &IntOrString) -> u16 { + match port { + IntOrString::Int(n) => *n as u16, + IntOrString::String(s) => s.parse::().unwrap_or(0), + } +} + +/// Extract all probe configs from a k8s Container +pub fn extract_probes(container: &Container) -> Vec { + let mut probes = Vec::new(); + + let probe_sources = [ + (&container.startup_probe, ProbeKind::Startup), + (&container.liveness_probe, ProbeKind::Liveness), + (&container.readiness_probe, ProbeKind::Readiness), + ]; + + for (probe_opt, kind) in probe_sources { + let probe = match probe_opt { + Some(p) => p, + None => continue, + }; + + let action = if let Some(exec) = &probe.exec { + match &exec.command { + Some(cmd) if !cmd.is_empty() => ProbeAction::Exec { + command: cmd.clone(), + }, + _ => continue, // Empty or missing exec command — skip + } + } else if let Some(http) = &probe.http_get { + let port = resolve_port(&http.port); + if port == 0 { + continue; + } + ProbeAction::HttpGet { + path: http.path.clone().unwrap_or_else(|| "/".to_string()), + port, + host: http.host.clone().unwrap_or_else(|| "localhost".to_string()), + scheme: http.scheme.clone().unwrap_or_else(|| "HTTP".to_string()), + } + } else if let Some(tcp) = &probe.tcp_socket { + let port = resolve_port(&tcp.port); + if port == 0 { + continue; + } + ProbeAction::TcpSocket { + port, + host: tcp.host.clone().unwrap_or_else(|| "localhost".to_string()), + } + } else { + continue; // No recognized action + }; + + // Apply k8s defaults: period=10, timeout=1, failure=3, success=1, initial_delay=0 + probes.push(ContainerProbeConfig { + container_name: container.name.clone(), + kind, + action, + initial_delay_seconds: probe.initial_delay_seconds.unwrap_or(0) as u32, + period_seconds: probe.period_seconds.unwrap_or(10) as u32, + timeout_seconds: probe.timeout_seconds.unwrap_or(1) as u32, + failure_threshold: probe.failure_threshold.unwrap_or(3) as u32, + success_threshold: probe.success_threshold.unwrap_or(1) as u32, + }); + } + + probes +} + +#[cfg(test)] +mod tests { + use super::*; + use k8s_openapi::api::core::v1::{ + ExecAction, HTTPGetAction, Probe, TCPSocketAction, + }; + + #[test] + fn test_extract_exec_probe() { + let container = Container { + name: "web".to_string(), + liveness_probe: Some(Probe { + exec: Some(ExecAction { + command: Some(vec!["/bin/sh".to_string(), "-c".to_string(), "exit 0".to_string()]), + }), + period_seconds: Some(5), + failure_threshold: Some(2), + ..Default::default() + }), + ..Default::default() + }; + + let probes = extract_probes(&container); + assert_eq!(probes.len(), 1); + assert_eq!(probes[0].kind, ProbeKind::Liveness); + assert_eq!( + probes[0].action, + ProbeAction::Exec { + command: vec!["/bin/sh".to_string(), "-c".to_string(), "exit 0".to_string()] + } + ); + assert_eq!(probes[0].period_seconds, 5); + assert_eq!(probes[0].failure_threshold, 2); + // Defaults applied + assert_eq!(probes[0].timeout_seconds, 1); + assert_eq!(probes[0].success_threshold, 1); + assert_eq!(probes[0].initial_delay_seconds, 0); + } + + #[test] + fn test_extract_http_probe() { + let container = Container { + name: "api".to_string(), + readiness_probe: Some(Probe { + http_get: Some(HTTPGetAction { + path: Some("/healthz".to_string()), + port: IntOrString::Int(8080), + host: Some("10.0.0.5".to_string()), + scheme: Some("HTTPS".to_string()), + ..Default::default() + }), + initial_delay_seconds: Some(15), + ..Default::default() + }), + ..Default::default() + }; + + let probes = extract_probes(&container); + assert_eq!(probes.len(), 1); + assert_eq!(probes[0].kind, ProbeKind::Readiness); + assert_eq!( + probes[0].action, + ProbeAction::HttpGet { + path: "/healthz".to_string(), + port: 8080, + host: "10.0.0.5".to_string(), + scheme: "HTTPS".to_string(), + } + ); + assert_eq!(probes[0].initial_delay_seconds, 15); + } + + #[test] + fn test_extract_tcp_probe() { + let container = Container { + name: "db".to_string(), + startup_probe: Some(Probe { + tcp_socket: Some(TCPSocketAction { + port: IntOrString::Int(5432), + host: None, + }), + period_seconds: Some(2), + failure_threshold: Some(30), + ..Default::default() + }), + ..Default::default() + }; + + let probes = extract_probes(&container); + assert_eq!(probes.len(), 1); + assert_eq!(probes[0].kind, ProbeKind::Startup); + assert_eq!( + probes[0].action, + ProbeAction::TcpSocket { + port: 5432, + host: "localhost".to_string(), + } + ); + assert_eq!(probes[0].period_seconds, 2); + assert_eq!(probes[0].failure_threshold, 30); + } + + #[test] + fn test_extract_no_probes() { + let container = Container { + name: "bare".to_string(), + ..Default::default() + }; + + let probes = extract_probes(&container); + assert!(probes.is_empty()); + } + + #[test] + fn test_extract_defaults() { + let container = Container { + name: "defaults".to_string(), + liveness_probe: Some(Probe { + exec: Some(ExecAction { + command: Some(vec!["true".to_string()]), + }), + // All timing fields left as None → should get k8s defaults + ..Default::default() + }), + ..Default::default() + }; + + let probes = extract_probes(&container); + assert_eq!(probes.len(), 1); + assert_eq!(probes[0].initial_delay_seconds, 0); + assert_eq!(probes[0].period_seconds, 10); + assert_eq!(probes[0].timeout_seconds, 1); + assert_eq!(probes[0].failure_threshold, 3); + assert_eq!(probes[0].success_threshold, 1); + } + + #[test] + fn test_extract_empty_exec_command_skipped() { + let container = Container { + name: "empty-exec".to_string(), + liveness_probe: Some(Probe { + exec: Some(ExecAction { + command: Some(vec![]), + }), + ..Default::default() + }), + ..Default::default() + }; + + let probes = extract_probes(&container); + assert!(probes.is_empty()); + } +} diff --git a/crates/reddwarf-runtime/src/traits.rs b/crates/reddwarf-runtime/src/traits.rs index e5cb7f5..a86be2d 100644 --- a/crates/reddwarf-runtime/src/traits.rs +++ b/crates/reddwarf-runtime/src/traits.rs @@ -46,6 +46,19 @@ pub trait ZoneRuntime: Send + Sync { /// List all managed zones async fn list_zones(&self) -> Result>; + // --- Exec --- + + /// Execute a command inside a running zone + /// + /// Returns the command output including exit code. A non-zero exit code + /// is NOT treated as an error — callers (e.g. probe executors) interpret + /// the exit code themselves. + async fn exec_in_zone( + &self, + zone_name: &str, + command: &[String], + ) -> Result; + // --- Networking --- /// Set up network for a zone