Add health probes (liveness/readiness/startup) with exec, HTTP, and TCP checks

Implement Kubernetes-style health probes that run during the reconcile loop
to detect unhealthy applications inside running zones. Previously the pod
controller only checked zone liveness via get_zone_state(), missing cases
where the zone is running but the application inside has crashed.

- Add exec_in_zone() to ZoneRuntime trait, implemented via zlogin on illumos
  and with configurable mock results for testing
- Add probe type system (ProbeKind, ProbeAction, ContainerProbeConfig) that
  decouples from k8s_openapi and extracts probes from pod container specs
  with proper k8s defaults (period=10s, timeout=1s, failure=3, success=1)
- Add ProbeExecutor for exec/HTTP/TCP checks with tokio timeout support
  (HTTPS falls back to TCP-only with warning)
- Add ProbeTracker state machine that tracks per-pod/container/probe-kind
  state, respects initial delays and periods, gates liveness on startup
  probes, and aggregates results into PodProbeStatus
- Integrate into PodController reconcile loop: on liveness failure set
  phase=Failed with reason LivenessProbeFailure; on readiness failure set
  Ready=False; on all-pass restore Ready=True
- Add ProbeFailed error variant with miette diagnostic

Known v1 limitation: probes execute at reconcile cadence (~30s), not at
their configured periodSeconds.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Till Wegmueller 2026-02-14 22:41:30 +01:00
parent 4c7f50a7a0
commit d79f8ce011
No known key found for this signature in database
11 changed files with 1759 additions and 5 deletions

99
AUDIT.md Normal file
View file

@ -0,0 +1,99 @@
# Reddwarf Production Readiness Audit
**Last updated:** 2026-02-14
**Baseline commit:** `58171c7` (Add periodic reconciliation, node health checker, and graceful pod termination)
---
## 1. Zone Runtime (`reddwarf-runtime`)
| Requirement | Status | Notes |
|---|---|---|
| Pod spec to zonecfg | DONE | `zone/config.rs`, `controller.rs:pod_to_zone_config()` |
| Zone lifecycle (zoneadm) | DONE | `illumos.rs` — create, install, boot, halt, uninstall, delete |
| Container to Zone mapping | DONE | Naming, sanitization, 64-char truncation |
| CPU limits to capped-cpu | DONE | Aggregates across containers, limits preferred over requests |
| Memory limits to capped-memory | DONE | Aggregates across containers, illumos G/M/K suffixes |
| Network to Crossbow VNIC | DONE | `dladm create-etherstub`, `create-vnic`, per-pod VNIC+IP |
| Volumes to ZFS datasets | DONE | Create, destroy, clone, quota, snapshot support |
| Image pull / clone | PARTIAL | ZFS clone works; LX tarball `-s` works. Missing: no image pull/registry, no `.zar` archive, no golden image bootstrap |
| Health probes (zlogin) | DONE | exec-in-zone via `zlogin`, liveness/readiness/startup probes with exec/HTTP/TCP actions, probe tracker state machine integrated into reconcile loop. v1 limitation: probes run at reconcile cadence, not per-probe `periodSeconds` |
## 2. Reconciliation / Controller Loop
| Requirement | Status | Notes |
|---|---|---|
| Event bus / watch | DONE | tokio broadcast channel, SSE watch API, multi-subscriber |
| Pod controller | DONE | Event-driven + full reconcile on lag, provision/deprovision |
| Node controller (NotReady) | DONE | `node_health.rs` — checks every 15s, marks stale (>40s) nodes NotReady with reason NodeStatusUnknown |
| Continuous reconciliation | DONE | `controller.rs` — periodic `reconcile_all()` every 30s via `tokio::time::interval` in select! loop |
| Graceful termination | DONE | DELETE sets `deletion_timestamp` + phase=Terminating; controller drives shutdown state machine; POST `.../finalize` for actual removal |
## 3. Pod Status Tracking
| Requirement | Status | Notes |
|---|---|---|
| Zone state to pod phase | DONE | 8 zone states mapped to pod phases |
| Status subresource (`/status`) | DONE | PUT endpoint, spec/status separation, fires MODIFIED events |
| ShuttingDown mapping | DONE | Fixed in `58171c7` — maps to "Terminating" |
## 4. Node Agent / Heartbeat
| Requirement | Status | Notes |
|---|---|---|
| Self-registration | DONE | Creates Node resource with allocatable CPU/memory |
| Periodic heartbeat | DONE | 10-second interval, Ready condition |
| Report zone states | NOT DONE | Heartbeat doesn't query actual zone states |
| Dynamic resource reporting | DONE | `sysinfo.rs` — detects CPU/memory via `sys-info`, capacity vs allocatable split with configurable reservations (`--system-reserved-cpu`, `--system-reserved-memory`, `--max-pods`). Done in `d3eb0b2` |
## 5. Main Binary
| Requirement | Status | Notes |
|---|---|---|
| API + scheduler + runtime wired | DONE | All 4 components spawned as tokio tasks |
| CLI via clap | DONE | `serve` and `agent` subcommands |
| Graceful shutdown | DONE | SIGINT + CancellationToken + 5s timeout |
| TLS (rustls) | DONE | Auto-generated self-signed CA + server cert, or user-provided PEM. Added in `cb6ca8c` |
| SMF service manifest | DONE | SMF manifest + method script in `smf/`. Added in `cb6ca8c` |
## 6. Networking
| Requirement | Status | Notes |
|---|---|---|
| Etherstub creation | DONE | `dladm create-etherstub` |
| VNIC per zone | DONE | `dladm create-vnic -l etherstub` |
| ipadm IP assignment | PARTIAL | IP set in zonecfg `allowed-address` but no explicit `ipadm create-addr` call |
| IPAM | DONE | Sequential alloc, idempotent, persistent, pool exhaustion handling |
| Service ClusterIP / NAT | NOT DONE | Services stored at API level but no backend controller, no ipnat rules, no proxy, no DNS |
## 7. Scheduler
| Requirement | Status | Notes |
|---|---|---|
| Versioned bind_pod() | DONE | Fixed in `c50ecb2` — creates versioned commits |
| Zone brand constraints | DONE | `ZoneBrandMatch` filter checks `reddwarf.io/zone-brand` annotation vs `reddwarf.io/zone-brands` node label. Done in `4c7f50a` |
| Actual resource usage | NOT DONE | Only compares requests vs static allocatable — no runtime metrics |
---
## Priority Order
### Critical (blocks production)
- [x] TLS — done in `cb6ca8c`
- [x] SMF manifest — done in `cb6ca8c`
### High (limits reliability)
- [x] Node health checker — done in `58171c7`
- [x] Periodic reconciliation — done in `58171c7`
- [x] Graceful pod termination — done in `58171c7`
### Medium (limits functionality)
- [ ] Service networking — no ClusterIP, no NAT/proxy, no DNS
- [x] Health probes — exec/HTTP/TCP liveness/readiness/startup probes via zlogin
- [ ] Image management — no pull/registry, no `.zar` support, no golden image bootstrap
- [x] Dynamic node resources — done in `d3eb0b2`
### Low (nice to have)
- [x] Zone brand scheduling filter — done in `4c7f50a`
- [x] ShuttingDown to Terminating mapping fix — done in `58171c7`
- [ ] bhyve brand — type exists but no implementation

View file

@ -1,14 +1,17 @@
use crate::api_client::ApiClient;
use crate::error::{Result, RuntimeError};
use crate::network::{vnic_name_for_pod, Ipam};
use crate::probes::executor::ProbeExecutor;
use crate::probes::tracker::ProbeTracker;
use crate::probes::types::extract_probes;
use crate::traits::ZoneRuntime;
use crate::types::*;
use chrono::Utc;
use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus};
use reddwarf_core::{ResourceEvent, ResourceQuantities, WatchEventType};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
@ -38,6 +41,7 @@ pub struct PodController {
event_tx: broadcast::Sender<ResourceEvent>,
config: PodControllerConfig,
ipam: Ipam,
probe_tracker: Mutex<ProbeTracker>,
}
impl PodController {
@ -48,12 +52,15 @@ impl PodController {
config: PodControllerConfig,
ipam: Ipam,
) -> Self {
let probe_executor = ProbeExecutor::new(Arc::clone(&runtime));
let probe_tracker = Mutex::new(ProbeTracker::new(probe_executor));
Self {
runtime,
api_client,
event_tx,
config,
ipam,
probe_tracker,
}
}
@ -273,7 +280,124 @@ impl PodController {
// Check zone health
match self.runtime.get_zone_state(&zone_name).await {
Ok(ZoneState::Running) => {
// All good
// Zone is running — execute health probes
let pod_key = format!("{}/{}", namespace, pod_name);
let zone_ip = self.get_pod_ip(pod);
// Extract and register probes (idempotent)
let probes = self.extract_pod_probes(pod);
let started_at = self.pod_start_time(pod);
let mut tracker = self.probe_tracker.lock().await;
tracker.register_pod(&pod_key, probes, started_at);
let status = tracker
.check_pod(&pod_key, &zone_name, &zone_ip)
.await;
drop(tracker);
if status.liveness_failed {
let message = status.failure_message.unwrap_or_else(|| {
"Liveness probe failed".to_string()
});
warn!(
"Liveness probe failed for pod {}/{}: {}",
namespace, pod_name, message
);
let pod_status = PodStatus {
phase: Some("Failed".to_string()),
conditions: Some(vec![PodCondition {
type_: "Ready".to_string(),
status: "False".to_string(),
reason: Some("LivenessProbeFailure".to_string()),
message: Some(message),
..Default::default()
}]),
..Default::default()
};
if let Err(e) = self
.api_client
.set_pod_status(namespace, pod_name, pod_status)
.await
{
error!("Failed to update pod status to Failed: {}", e);
}
// Unregister probes for this pod
let mut tracker = self.probe_tracker.lock().await;
tracker.unregister_pod(&pod_key);
} else if !status.ready {
let message = status.failure_message.unwrap_or_else(|| {
"Readiness probe failed".to_string()
});
debug!(
"Readiness probe not passing for pod {}/{}: {}",
namespace, pod_name, message
);
// Only update if currently marked Ready=True
let currently_ready = pod
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.and_then(|c| c.iter().find(|c| c.type_ == "Ready"))
.map(|c| c.status == "True")
.unwrap_or(false);
if currently_ready {
let pod_status = PodStatus {
phase: Some("Running".to_string()),
conditions: Some(vec![PodCondition {
type_: "Ready".to_string(),
status: "False".to_string(),
reason: Some("ReadinessProbeFailure".to_string()),
message: Some(message),
..Default::default()
}]),
pod_ip: Some(zone_ip),
..Default::default()
};
if let Err(e) = self
.api_client
.set_pod_status(namespace, pod_name, pod_status)
.await
{
error!("Failed to update pod status: {}", e);
}
}
} else {
// All probes pass — set Ready=True if not already
let currently_ready = pod
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.and_then(|c| c.iter().find(|c| c.type_ == "Ready"))
.map(|c| c.status == "True")
.unwrap_or(false);
if !currently_ready {
let pod_status = PodStatus {
phase: Some("Running".to_string()),
conditions: Some(vec![PodCondition {
type_: "Ready".to_string(),
status: "True".to_string(),
..Default::default()
}]),
pod_ip: Some(zone_ip),
..Default::default()
};
if let Err(e) = self
.api_client
.set_pod_status(namespace, pod_name, pod_status)
.await
{
error!("Failed to update pod status: {}", e);
}
}
}
}
Ok(state) => {
warn!(
@ -394,6 +518,11 @@ impl PodController {
);
}
// Unregister probes
let pod_key = format!("{}/{}", namespace, pod_name);
let mut tracker = self.probe_tracker.lock().await;
tracker.unregister_pod(&pod_key);
Ok(())
}
@ -502,6 +631,12 @@ impl PodController {
);
}
// Unregister probes
let pod_key = format!("{}/{}", namespace, pod_name);
let mut tracker = self.probe_tracker.lock().await;
tracker.unregister_pod(&pod_key);
drop(tracker);
// Finalize — remove the pod from API server storage
if let Err(e) = self.api_client.finalize_pod(namespace, pod_name).await {
error!(
@ -658,6 +793,47 @@ impl PodController {
NetworkMode::Direct(d) => d.ip_address.clone(),
}
}
/// Extract probe configurations from all containers in a pod spec
fn extract_pod_probes(&self, pod: &Pod) -> Vec<crate::probes::types::ContainerProbeConfig> {
let spec = match &pod.spec {
Some(s) => s,
None => return vec![],
};
spec.containers
.iter()
.flat_map(|c| extract_probes(c))
.collect()
}
/// Get the pod's IP from its status, falling back to empty string
fn get_pod_ip(&self, pod: &Pod) -> String {
pod.status
.as_ref()
.and_then(|s| s.pod_ip.clone())
.unwrap_or_default()
}
/// Approximate when the pod's containers started.
/// Uses the pod's start_time if available, otherwise uses now.
fn pod_start_time(&self, pod: &Pod) -> Instant {
// We can't convert k8s Time to std Instant directly. Instead, compute
// the elapsed duration since start_time and subtract from Instant::now().
if let Some(start_time) = pod
.status
.as_ref()
.and_then(|s| s.start_time.as_ref())
{
let now_utc = Utc::now();
let started_utc = start_time.0;
let elapsed = now_utc.signed_duration_since(started_utc);
if let Ok(elapsed_std) = elapsed.to_std() {
return Instant::now().checked_sub(elapsed_std).unwrap_or_else(Instant::now);
}
}
Instant::now()
}
}
/// Generate a zone name from namespace and pod name
@ -1168,6 +1344,144 @@ mod tests {
assert_eq!(zone_config.brand, ZoneBrand::Reddwarf);
}
fn make_test_controller_with_runtime() -> (PodController, Arc<crate::mock::MockRuntime>, tempfile::TempDir) {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test-controller-rt.redb");
let storage = Arc::new(RedbBackend::new(&db_path).unwrap());
let ipam = Ipam::new(storage, "10.88.0.0/16").unwrap();
let mock_storage = Arc::new(crate::storage::MockStorageEngine::new(
crate::types::StoragePoolConfig::from_pool("rpool"),
));
let runtime = Arc::new(crate::mock::MockRuntime::new(mock_storage));
let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443"));
let (event_tx, _) = broadcast::channel(16);
let config = PodControllerConfig {
node_name: "node1".to_string(),
api_url: "http://127.0.0.1:6443".to_string(),
zonepath_prefix: "/zones".to_string(),
default_brand: ZoneBrand::Reddwarf,
etherstub_name: "reddwarf0".to_string(),
pod_cidr: "10.88.0.0/16".to_string(),
reconcile_interval: Duration::from_secs(30),
};
let controller = PodController::new(runtime.clone() as Arc<dyn ZoneRuntime>, api_client, event_tx, config, ipam);
(controller, runtime, dir)
}
#[tokio::test]
async fn test_reconcile_running_pod_with_no_probes() {
let (controller, runtime, _dir) = make_test_controller_with_runtime();
// Create a pod that is already Running with a provisioned zone
let mut pod = Pod::default();
pod.metadata.name = Some("no-probe-pod".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.spec = Some(PodSpec {
node_name: Some("node1".to_string()),
containers: vec![Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
..Default::default()
}],
..Default::default()
});
pod.status = Some(PodStatus {
phase: Some("Running".to_string()),
pod_ip: Some("10.88.0.2".to_string()),
conditions: Some(vec![PodCondition {
type_: "Ready".to_string(),
status: "True".to_string(),
..Default::default()
}]),
..Default::default()
});
// Provision the zone so get_zone_state returns Running
let zone_config = controller.pod_to_zone_config(&pod).unwrap();
runtime.provision(&zone_config).await.unwrap();
// Reconcile — should succeed without changing anything (no probes)
let result = controller.reconcile(&pod).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_reconcile_running_pod_liveness_failure() {
let (controller, runtime, _dir) = make_test_controller_with_runtime();
let mut pod = Pod::default();
pod.metadata.name = Some("liveness-pod".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.spec = Some(PodSpec {
node_name: Some("node1".to_string()),
containers: vec![Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
liveness_probe: Some(k8s_openapi::api::core::v1::Probe {
exec: Some(k8s_openapi::api::core::v1::ExecAction {
command: Some(vec!["healthcheck".to_string()]),
}),
period_seconds: Some(0), // Always run
failure_threshold: Some(3),
..Default::default()
}),
..Default::default()
}],
..Default::default()
});
pod.status = Some(PodStatus {
phase: Some("Running".to_string()),
pod_ip: Some("10.88.0.2".to_string()),
conditions: Some(vec![PodCondition {
type_: "Ready".to_string(),
status: "True".to_string(),
..Default::default()
}]),
..Default::default()
});
// Provision the zone
let zone_config = controller.pod_to_zone_config(&pod).unwrap();
runtime.provision(&zone_config).await.unwrap();
// Queue failures for the liveness probe
let zone_name = pod_zone_name("default", "liveness-pod");
for _ in 0..3 {
runtime
.set_exec_result(
&zone_name,
crate::command::CommandOutput {
stdout: String::new(),
stderr: "unhealthy".to_string(),
exit_code: 1,
},
)
.await;
}
// Reconcile 3 times to hit the failure threshold.
// On the 3rd reconcile, liveness failure is detected. The controller
// then unregisters the probes and tries to set the pod status to Failed
// (which fails silently since there's no API server).
for _ in 0..3 {
let _ = controller.reconcile(&pod).await;
}
// Verify the liveness failure path was taken: probes should be unregistered
let pod_key = "default/liveness-pod";
let mut tracker = controller.probe_tracker.lock().await;
let status = tracker
.check_pod(pod_key, &zone_name, "10.88.0.2")
.await;
// No probes registered → default status (ready=true, liveness_failed=false)
// This confirms the unregister happened, which only occurs on liveness failure
assert!(status.ready);
assert!(!status.liveness_failed);
}
#[tokio::test]
async fn test_reconcile_with_deletion_timestamp_uses_termination() {
let (controller, _dir) = make_test_controller();

View file

@ -155,6 +155,25 @@ pub enum RuntimeError {
message: String,
},
/// Health probe failed
#[error("Health probe failed for container '{container_name}' in zone '{zone_name}': {message}")]
#[diagnostic(
code(reddwarf::runtime::probe_failed),
help("Check that the probe target is reachable inside the zone. Failure count: {failure_count}/{failure_threshold}. Verify the application is running and the probe command/port/path is correct")
)]
ProbeFailed {
#[allow(unused)]
zone_name: String,
#[allow(unused)]
container_name: String,
#[allow(unused)]
message: String,
#[allow(unused)]
failure_count: u32,
#[allow(unused)]
failure_threshold: u32,
},
/// Internal error
#[error("Internal runtime error: {message}")]
#[diagnostic(
@ -241,6 +260,22 @@ impl RuntimeError {
}
}
pub fn probe_failed(
zone_name: impl Into<String>,
container_name: impl Into<String>,
message: impl Into<String>,
failure_count: u32,
failure_threshold: u32,
) -> Self {
Self::ProbeFailed {
zone_name: zone_name.into(),
container_name: container_name.into(),
message: message.into(),
failure_count,
failure_threshold,
}
}
pub fn internal_error(message: impl Into<String>) -> Self {
Self::InternalError {
message: message.into(),

View file

@ -1,5 +1,5 @@
use crate::brand::lx::lx_install_args;
use crate::command::exec;
use crate::command::{exec, CommandOutput};
use crate::error::Result;
use crate::storage::StorageEngine;
use crate::traits::ZoneRuntime;
@ -91,6 +91,13 @@ impl ZoneRuntime for IllumosRuntime {
Ok(())
}
async fn exec_in_zone(&self, zone_name: &str, command: &[String]) -> Result<CommandOutput> {
let mut args: Vec<&str> = vec![zone_name];
let str_refs: Vec<&str> = command.iter().map(|s| s.as_str()).collect();
args.extend(str_refs);
crate::command::exec_unchecked("zlogin", &args).await
}
async fn get_zone_state(&self, zone_name: &str) -> Result<ZoneState> {
let output = exec("zoneadm", &["-z", zone_name, "list", "-p"]).await?;
let line = output.stdout.trim();

View file

@ -11,6 +11,7 @@ pub mod illumos;
pub mod mock;
pub mod network;
pub mod node_agent;
pub mod probes;
pub mod node_health;
pub mod storage;
pub mod sysinfo;
@ -38,6 +39,7 @@ pub use api_client::ApiClient;
pub use controller::{PodController, PodControllerConfig};
pub use node_agent::{NodeAgent, NodeAgentConfig};
pub use node_health::{NodeHealthChecker, NodeHealthCheckerConfig};
pub use probes::{ProbeExecutor, ProbeTracker};
// Conditionally re-export illumos runtime
#[cfg(target_os = "illumos")]

View file

@ -1,9 +1,10 @@
use crate::command::CommandOutput;
use crate::error::{Result, RuntimeError};
use crate::storage::StorageEngine;
use crate::traits::ZoneRuntime;
use crate::types::*;
use async_trait::async_trait;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::debug;
@ -25,6 +26,7 @@ pub struct MockRuntime {
zones: Arc<RwLock<HashMap<String, MockZone>>>,
next_id: Arc<RwLock<i32>>,
storage: Arc<dyn StorageEngine>,
exec_results: Arc<RwLock<HashMap<String, VecDeque<CommandOutput>>>>,
}
impl MockRuntime {
@ -33,8 +35,19 @@ impl MockRuntime {
zones: Arc::new(RwLock::new(HashMap::new())),
next_id: Arc::new(RwLock::new(1)),
storage,
exec_results: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Queue a custom exec result for a specific zone.
/// Results are consumed in FIFO order. Once exhausted, falls back to defaults.
pub async fn set_exec_result(&self, zone_name: &str, output: CommandOutput) {
let mut results = self.exec_results.write().await;
results
.entry(zone_name.to_string())
.or_default()
.push_back(output);
}
}
#[async_trait]
@ -181,6 +194,40 @@ impl ZoneRuntime for MockRuntime {
Ok(())
}
async fn exec_in_zone(&self, zone_name: &str, _command: &[String]) -> Result<CommandOutput> {
// Check for queued custom results first
{
let mut results = self.exec_results.write().await;
if let Some(queue) = results.get_mut(zone_name) {
if let Some(output) = queue.pop_front() {
return Ok(output);
}
}
}
// Default behavior: succeed if zone is Running, error otherwise
let zones = self.zones.read().await;
let zone = zones
.get(zone_name)
.ok_or_else(|| RuntimeError::zone_not_found(zone_name))?;
if zone.state == ZoneState::Running {
Ok(CommandOutput {
stdout: String::new(),
stderr: String::new(),
exit_code: 0,
})
} else {
Err(RuntimeError::zone_operation_failed(
zone_name,
format!(
"Cannot exec in zone: zone is in state {} (expected Running)",
zone.state
),
))
}
}
async fn get_zone_state(&self, zone_name: &str) -> Result<ZoneState> {
let zones = self.zones.read().await;
let zone = zones
@ -316,6 +363,77 @@ mod tests {
}
}
#[tokio::test]
async fn test_exec_in_zone_running_default_success() {
let rt = MockRuntime::new(make_test_storage());
let config = make_test_config("exec-zone");
rt.provision(&config).await.unwrap();
let output = rt
.exec_in_zone("exec-zone", &["echo".to_string(), "hello".to_string()])
.await
.unwrap();
assert_eq!(output.exit_code, 0);
}
#[tokio::test]
async fn test_exec_in_zone_not_running_errors() {
let rt = MockRuntime::new(make_test_storage());
let config = make_test_config("stopped-zone");
rt.create_zone(&config).await.unwrap();
rt.install_zone("stopped-zone").await.unwrap();
// Zone is Installed, not Running
let result = rt
.exec_in_zone("stopped-zone", &["echo".to_string()])
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_exec_in_zone_not_found_errors() {
let rt = MockRuntime::new(make_test_storage());
let result = rt
.exec_in_zone("nonexistent", &["echo".to_string()])
.await;
assert!(matches!(
result.unwrap_err(),
RuntimeError::ZoneNotFound { .. }
));
}
#[tokio::test]
async fn test_exec_in_zone_custom_result() {
let rt = MockRuntime::new(make_test_storage());
let config = make_test_config("custom-zone");
rt.provision(&config).await.unwrap();
rt.set_exec_result(
"custom-zone",
crate::command::CommandOutput {
stdout: String::new(),
stderr: "probe failed".to_string(),
exit_code: 1,
},
)
.await;
let output = rt
.exec_in_zone("custom-zone", &["check".to_string()])
.await
.unwrap();
assert_eq!(output.exit_code, 1);
assert_eq!(output.stderr, "probe failed");
// Second call falls back to default (success)
let output2 = rt
.exec_in_zone("custom-zone", &["check".to_string()])
.await
.unwrap();
assert_eq!(output2.exit_code, 0);
}
#[tokio::test]
async fn test_provision_transitions_to_running() {
let rt = MockRuntime::new(make_test_storage());

View file

@ -0,0 +1,358 @@
use crate::probes::types::{ProbeAction, ProbeOutcome, ProbeResult};
use crate::traits::ZoneRuntime;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::warn;
/// Executes individual probe checks against a zone
pub struct ProbeExecutor {
runtime: Arc<dyn ZoneRuntime>,
}
impl ProbeExecutor {
pub fn new(runtime: Arc<dyn ZoneRuntime>) -> Self {
Self { runtime }
}
/// Execute a single probe action and return the result
pub async fn execute(
&self,
zone_name: &str,
zone_ip: &str,
action: &ProbeAction,
timeout: Duration,
) -> ProbeResult {
let start = Instant::now();
let outcome =
match tokio::time::timeout(timeout, self.execute_inner(zone_name, zone_ip, action))
.await
{
Ok(outcome) => outcome,
Err(_) => ProbeOutcome::Failure(format!(
"probe timed out after {}s",
timeout.as_secs()
)),
};
ProbeResult {
outcome,
duration: start.elapsed(),
timestamp: start,
}
}
async fn execute_inner(
&self,
zone_name: &str,
zone_ip: &str,
action: &ProbeAction,
) -> ProbeOutcome {
match action {
ProbeAction::Exec { command } => self.exec_probe(zone_name, command).await,
ProbeAction::HttpGet {
path,
port,
host,
scheme,
} => {
let target_host = if host == "localhost" { zone_ip } else { host };
self.http_probe(target_host, *port, path, scheme).await
}
ProbeAction::TcpSocket { port, host } => {
let target_host = if host == "localhost" { zone_ip } else { host };
self.tcp_probe(target_host, *port).await
}
}
}
async fn exec_probe(&self, zone_name: &str, command: &[String]) -> ProbeOutcome {
match self.runtime.exec_in_zone(zone_name, command).await {
Ok(output) => {
if output.exit_code == 0 {
ProbeOutcome::Success
} else {
ProbeOutcome::Failure(format!(
"command exited with code {} (stderr: {})",
output.exit_code,
output.stderr.trim()
))
}
}
Err(e) => ProbeOutcome::Error(format!("exec failed: {}", e)),
}
}
async fn tcp_probe(&self, host: &str, port: u16) -> ProbeOutcome {
let addr = format!("{}:{}", host, port);
match TcpStream::connect(&addr).await {
Ok(_) => ProbeOutcome::Success,
Err(e) => ProbeOutcome::Failure(format!("TCP connection to {} failed: {}", addr, e)),
}
}
async fn http_probe(&self, host: &str, port: u16, path: &str, scheme: &str) -> ProbeOutcome {
if scheme.eq_ignore_ascii_case("HTTPS") {
// HTTPS falls back to TCP-only check with warning — we don't have
// a TLS client in this context.
warn!(
"HTTPS probe to {}:{}{} falling back to TCP-only check",
host, port, path
);
return self.tcp_probe(host, port).await;
}
let addr = format!("{}:{}", host, port);
let mut stream = match TcpStream::connect(&addr).await {
Ok(s) => s,
Err(e) => {
return ProbeOutcome::Failure(format!(
"HTTP connection to {} failed: {}",
addr, e
))
}
};
let request = format!(
"GET {} HTTP/1.1\r\nHost: {}:{}\r\nConnection: close\r\n\r\n",
path, host, port
);
if let Err(e) = stream.write_all(request.as_bytes()).await {
return ProbeOutcome::Failure(format!("HTTP write failed: {}", e));
}
let mut response = Vec::new();
if let Err(e) = stream.read_to_end(&mut response).await {
return ProbeOutcome::Failure(format!("HTTP read failed: {}", e));
}
let response_str = String::from_utf8_lossy(&response);
// Parse status code from HTTP/1.1 response line
if let Some(status_line) = response_str.lines().next() {
let parts: Vec<&str> = status_line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(status) = parts[1].parse::<u16>() {
if (200..300).contains(&status) {
return ProbeOutcome::Success;
} else {
return ProbeOutcome::Failure(format!(
"HTTP probe returned status {}",
status
));
}
}
}
}
ProbeOutcome::Failure("HTTP probe: could not parse response status".to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::command::CommandOutput;
use crate::mock::MockRuntime;
use crate::storage::MockStorageEngine;
use crate::traits::ZoneRuntime;
use crate::types::{StoragePoolConfig, ZoneBrand, ZoneConfig, ZoneStorageOpts, NetworkMode, EtherstubConfig};
use tokio::net::TcpListener;
fn make_test_runtime() -> Arc<MockRuntime> {
let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool("rpool")));
Arc::new(MockRuntime::new(storage))
}
fn make_zone_config(name: &str) -> ZoneConfig {
ZoneConfig {
zone_name: name.to_string(),
brand: ZoneBrand::Reddwarf,
zonepath: format!("/zones/{}", name),
network: NetworkMode::Etherstub(EtherstubConfig {
etherstub_name: "reddwarf0".to_string(),
vnic_name: format!("vnic_{}", name),
ip_address: "10.0.0.2".to_string(),
gateway: "10.0.0.1".to_string(),
prefix_len: 16,
}),
storage: ZoneStorageOpts::default(),
lx_image_path: None,
processes: vec![],
cpu_cap: None,
memory_cap: None,
fs_mounts: vec![],
}
}
#[tokio::test]
async fn test_exec_probe_success() {
let runtime = make_test_runtime();
let config = make_zone_config("exec-ok");
runtime.provision(&config).await.unwrap();
let executor = ProbeExecutor::new(runtime.clone());
let action = ProbeAction::Exec {
command: vec!["true".to_string()],
};
let result = executor
.execute("exec-ok", "10.0.0.2", &action, Duration::from_secs(5))
.await;
assert_eq!(result.outcome, ProbeOutcome::Success);
}
#[tokio::test]
async fn test_exec_probe_failure() {
let runtime = make_test_runtime();
let config = make_zone_config("exec-fail");
runtime.provision(&config).await.unwrap();
runtime
.set_exec_result(
"exec-fail",
CommandOutput {
stdout: String::new(),
stderr: "unhealthy".to_string(),
exit_code: 1,
},
)
.await;
let executor = ProbeExecutor::new(runtime.clone());
let action = ProbeAction::Exec {
command: vec!["check".to_string()],
};
let result = executor
.execute("exec-fail", "10.0.0.2", &action, Duration::from_secs(5))
.await;
assert!(matches!(result.outcome, ProbeOutcome::Failure(_)));
}
#[tokio::test]
async fn test_exec_probe_timeout() {
let runtime = make_test_runtime();
let config = make_zone_config("exec-timeout");
runtime.provision(&config).await.unwrap();
// The mock exec returns instantly, so we simulate a timeout by using
// an extremely short timeout. However, since mock is instant, we
// test the timeout path by checking that the executor handles timeouts
// We'll test conceptually — a real timeout would require a blocking mock.
// Instead, verify the success path still works with a normal timeout.
let executor = ProbeExecutor::new(runtime.clone());
let action = ProbeAction::Exec {
command: vec!["true".to_string()],
};
let result = executor
.execute("exec-timeout", "10.0.0.2", &action, Duration::from_secs(1))
.await;
assert_eq!(result.outcome, ProbeOutcome::Success);
}
#[tokio::test]
async fn test_tcp_probe_success() {
// Bind a listener so the TCP probe succeeds
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let runtime = make_test_runtime();
let executor = ProbeExecutor::new(runtime);
let action = ProbeAction::TcpSocket {
port,
host: "127.0.0.1".to_string(),
};
let result = executor
.execute("any-zone", "127.0.0.1", &action, Duration::from_secs(5))
.await;
assert_eq!(result.outcome, ProbeOutcome::Success);
drop(listener);
}
#[tokio::test]
async fn test_tcp_probe_failure() {
let runtime = make_test_runtime();
let executor = ProbeExecutor::new(runtime);
// Use a port that is almost certainly not listening
let action = ProbeAction::TcpSocket {
port: 1,
host: "127.0.0.1".to_string(),
};
let result = executor
.execute("any-zone", "127.0.0.1", &action, Duration::from_secs(5))
.await;
assert!(matches!(result.outcome, ProbeOutcome::Failure(_)));
}
#[tokio::test]
async fn test_http_probe_success() {
// Spin up a minimal HTTP server
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let server = tokio::spawn(async move {
if let Ok((mut stream, _)) = listener.accept().await {
let mut buf = [0u8; 1024];
let _ = stream.read(&mut buf).await;
let response = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK";
let _ = stream.write_all(response.as_bytes()).await;
}
});
let runtime = make_test_runtime();
let executor = ProbeExecutor::new(runtime);
let action = ProbeAction::HttpGet {
path: "/healthz".to_string(),
port,
host: "127.0.0.1".to_string(),
scheme: "HTTP".to_string(),
};
let result = executor
.execute("any-zone", "127.0.0.1", &action, Duration::from_secs(5))
.await;
assert_eq!(result.outcome, ProbeOutcome::Success);
server.abort();
}
#[tokio::test]
async fn test_http_probe_non_200() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let server = tokio::spawn(async move {
if let Ok((mut stream, _)) = listener.accept().await {
let mut buf = [0u8; 1024];
let _ = stream.read(&mut buf).await;
let response =
"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 5\r\n\r\nError";
let _ = stream.write_all(response.as_bytes()).await;
}
});
let runtime = make_test_runtime();
let executor = ProbeExecutor::new(runtime);
let action = ProbeAction::HttpGet {
path: "/healthz".to_string(),
port,
host: "127.0.0.1".to_string(),
scheme: "HTTP".to_string(),
};
let result = executor
.execute("any-zone", "127.0.0.1", &action, Duration::from_secs(5))
.await;
assert!(matches!(result.outcome, ProbeOutcome::Failure(_)));
server.abort();
}
}

View file

@ -0,0 +1,9 @@
pub mod executor;
pub mod tracker;
pub mod types;
pub use executor::ProbeExecutor;
pub use tracker::{PodProbeStatus, ProbeTracker};
pub use types::{
ContainerProbeConfig, ProbeAction, ProbeKind, ProbeOutcome, ProbeResult, extract_probes,
};

View file

@ -0,0 +1,515 @@
use crate::probes::executor::ProbeExecutor;
use crate::probes::types::{ContainerProbeConfig, ProbeKind, ProbeOutcome};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tracing::{debug, warn};
/// Composite key for per-probe state
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ProbeKey {
pod_key: String,
container_name: String,
kind: ProbeKind,
}
/// Per-probe mutable state
struct ProbeState {
config: ContainerProbeConfig,
container_started_at: Instant,
last_check: Option<Instant>,
consecutive_successes: u32,
consecutive_failures: u32,
has_succeeded: bool,
}
/// Aggregate probe status for a pod
#[derive(Debug, Clone)]
pub struct PodProbeStatus {
/// All readiness probes pass (or none defined)
pub ready: bool,
/// Any liveness probe has failed past its failure threshold
pub liveness_failed: bool,
/// Diagnostic detail about the failure
pub failure_message: Option<String>,
}
/// Tracks probe state for all pods and drives periodic checks
pub struct ProbeTracker {
states: HashMap<ProbeKey, ProbeState>,
executor: ProbeExecutor,
}
impl ProbeTracker {
pub fn new(executor: ProbeExecutor) -> Self {
Self {
states: HashMap::new(),
executor,
}
}
/// Register (or re-register) probes for a pod. Idempotent — existing state
/// is preserved if the probe key already exists.
pub fn register_pod(
&mut self,
pod_key: &str,
probes: Vec<ContainerProbeConfig>,
started_at: Instant,
) {
for config in probes {
let key = ProbeKey {
pod_key: pod_key.to_string(),
container_name: config.container_name.clone(),
kind: config.kind,
};
// Idempotent: don't overwrite existing tracking state
self.states.entry(key).or_insert(ProbeState {
config,
container_started_at: started_at,
last_check: None,
consecutive_successes: 0,
consecutive_failures: 0,
has_succeeded: false,
});
}
}
/// Remove all probe state for a pod
pub fn unregister_pod(&mut self, pod_key: &str) {
self.states.retain(|k, _| k.pod_key != pod_key);
}
/// Run due probes for a pod and return its aggregate status
pub async fn check_pod(
&mut self,
pod_key: &str,
zone_name: &str,
zone_ip: &str,
) -> PodProbeStatus {
let now = Instant::now();
// Collect keys for this pod
let keys: Vec<ProbeKey> = self
.states
.keys()
.filter(|k| k.pod_key == pod_key)
.cloned()
.collect();
if keys.is_empty() {
// No probes registered — pod is ready by default
return PodProbeStatus {
ready: true,
liveness_failed: false,
failure_message: None,
};
}
// Check whether startup probes have succeeded (gates liveness)
let startup_succeeded: HashMap<String, bool> = {
let mut map = HashMap::new();
for key in &keys {
if key.kind == ProbeKind::Startup {
if let Some(state) = self.states.get(key) {
map.insert(key.container_name.clone(), state.has_succeeded);
}
}
}
map
};
// Run probes
for key in &keys {
let state = match self.states.get(key) {
Some(s) => s,
None => continue,
};
// Skip liveness probes if startup probe hasn't succeeded yet
if key.kind == ProbeKind::Liveness {
if let Some(&startup_done) = startup_succeeded.get(&key.container_name) {
if !startup_done {
debug!(
"Skipping liveness probe for container '{}' — startup probe hasn't passed yet",
key.container_name
);
continue;
}
}
}
// Check initial delay
let elapsed_since_start = now.duration_since(state.container_started_at);
if elapsed_since_start < Duration::from_secs(state.config.initial_delay_seconds as u64)
{
debug!(
"Skipping {} probe for container '{}' — initial delay not elapsed",
key.kind, key.container_name
);
continue;
}
// Check period
if let Some(last) = state.last_check {
let since_last = now.duration_since(last);
if since_last < Duration::from_secs(state.config.period_seconds as u64) {
continue;
}
}
// Execute the probe
let timeout = Duration::from_secs(state.config.timeout_seconds as u64);
let result = self
.executor
.execute(zone_name, zone_ip, &state.config.action, timeout)
.await;
// Update state
let state = self.states.get_mut(key).unwrap();
state.last_check = Some(now);
match result.outcome {
ProbeOutcome::Success => {
state.consecutive_successes += 1;
state.consecutive_failures = 0;
if state.consecutive_successes >= state.config.success_threshold {
state.has_succeeded = true;
}
}
ProbeOutcome::Failure(ref msg) | ProbeOutcome::Error(ref msg) => {
state.consecutive_failures += 1;
state.consecutive_successes = 0;
if state.consecutive_failures >= state.config.failure_threshold {
warn!(
"{} probe failed for container '{}': {} (failures: {}/{})",
key.kind,
key.container_name,
msg,
state.consecutive_failures,
state.config.failure_threshold
);
}
}
}
}
// Compute aggregate status
let mut ready = true;
let mut liveness_failed = false;
let mut failure_message = None;
for key in &keys {
let state = match self.states.get(key) {
Some(s) => s,
None => continue,
};
match key.kind {
ProbeKind::Readiness => {
if !state.has_succeeded
|| state.consecutive_failures >= state.config.failure_threshold
{
ready = false;
if state.consecutive_failures >= state.config.failure_threshold {
failure_message = Some(format!(
"Readiness probe failed for container '{}' ({} consecutive failures)",
key.container_name, state.consecutive_failures
));
}
}
}
ProbeKind::Liveness => {
if state.consecutive_failures >= state.config.failure_threshold {
liveness_failed = true;
failure_message = Some(format!(
"Liveness probe failed for container '{}' ({} consecutive failures)",
key.container_name, state.consecutive_failures
));
}
}
ProbeKind::Startup => {
// Startup probe failure past threshold is treated as liveness failure
if !state.has_succeeded
&& state.consecutive_failures >= state.config.failure_threshold
{
liveness_failed = true;
failure_message = Some(format!(
"Startup probe failed for container '{}' ({} consecutive failures)",
key.container_name, state.consecutive_failures
));
}
// Also gate readiness on startup
if !state.has_succeeded {
ready = false;
}
}
}
}
PodProbeStatus {
ready,
liveness_failed,
failure_message,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::command::CommandOutput;
use crate::mock::MockRuntime;
use crate::probes::types::{ContainerProbeConfig, ProbeAction, ProbeKind};
use crate::storage::MockStorageEngine;
use crate::traits::ZoneRuntime;
use crate::types::{
EtherstubConfig, NetworkMode, StoragePoolConfig, ZoneBrand, ZoneConfig, ZoneStorageOpts,
};
use std::sync::Arc;
fn make_test_runtime() -> Arc<MockRuntime> {
let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool("rpool")));
Arc::new(MockRuntime::new(storage))
}
fn make_zone_config(name: &str) -> ZoneConfig {
ZoneConfig {
zone_name: name.to_string(),
brand: ZoneBrand::Reddwarf,
zonepath: format!("/zones/{}", name),
network: NetworkMode::Etherstub(EtherstubConfig {
etherstub_name: "reddwarf0".to_string(),
vnic_name: format!("vnic_{}", name),
ip_address: "10.0.0.2".to_string(),
gateway: "10.0.0.1".to_string(),
prefix_len: 16,
}),
storage: ZoneStorageOpts::default(),
lx_image_path: None,
processes: vec![],
cpu_cap: None,
memory_cap: None,
fs_mounts: vec![],
}
}
fn exec_probe_config(
container: &str,
kind: ProbeKind,
failure_threshold: u32,
) -> ContainerProbeConfig {
ContainerProbeConfig {
container_name: container.to_string(),
kind,
action: ProbeAction::Exec {
command: vec!["check".to_string()],
},
initial_delay_seconds: 0,
period_seconds: 0, // Always due
timeout_seconds: 5,
failure_threshold,
success_threshold: 1,
}
}
#[tokio::test]
async fn test_register_and_check_success() {
let runtime = make_test_runtime();
let config = make_zone_config("probe-ok");
runtime.provision(&config).await.unwrap();
let executor = ProbeExecutor::new(runtime.clone());
let mut tracker = ProbeTracker::new(executor);
let probes = vec![exec_probe_config("web", ProbeKind::Liveness, 3)];
tracker.register_pod("default/probe-ok", probes, Instant::now());
let status = tracker
.check_pod("default/probe-ok", "probe-ok", "10.0.0.2")
.await;
assert!(!status.liveness_failed);
assert!(status.ready); // No readiness probes → default ready
}
#[tokio::test]
async fn test_liveness_failure_after_threshold() {
let runtime = make_test_runtime();
let config = make_zone_config("liveness-fail");
runtime.provision(&config).await.unwrap();
// Queue 3 failures (threshold is 3)
for _ in 0..3 {
runtime
.set_exec_result(
"liveness-fail",
CommandOutput {
stdout: String::new(),
stderr: "unhealthy".to_string(),
exit_code: 1,
},
)
.await;
}
let executor = ProbeExecutor::new(runtime.clone());
let mut tracker = ProbeTracker::new(executor);
let probes = vec![exec_probe_config("web", ProbeKind::Liveness, 3)];
tracker.register_pod("default/liveness-fail", probes, Instant::now());
// Run probes 3 times to hit the threshold — the 3rd call reaches it
let mut status = PodProbeStatus {
ready: true,
liveness_failed: false,
failure_message: None,
};
for _ in 0..3 {
status = tracker
.check_pod("default/liveness-fail", "liveness-fail", "10.0.0.2")
.await;
}
assert!(status.liveness_failed);
assert!(status.failure_message.is_some());
}
#[tokio::test]
async fn test_readiness_failure_sets_not_ready() {
let runtime = make_test_runtime();
let config = make_zone_config("readiness-fail");
runtime.provision(&config).await.unwrap();
// Queue failures
for _ in 0..3 {
runtime
.set_exec_result(
"readiness-fail",
CommandOutput {
stdout: String::new(),
stderr: "not ready".to_string(),
exit_code: 1,
},
)
.await;
}
let executor = ProbeExecutor::new(runtime.clone());
let mut tracker = ProbeTracker::new(executor);
let probes = vec![exec_probe_config("web", ProbeKind::Readiness, 3)];
tracker.register_pod("default/readiness-fail", probes, Instant::now());
// Run probes 3 times — the 3rd call reaches the threshold
let mut status = PodProbeStatus {
ready: true,
liveness_failed: false,
failure_message: None,
};
for _ in 0..3 {
status = tracker
.check_pod("default/readiness-fail", "readiness-fail", "10.0.0.2")
.await;
}
assert!(!status.ready);
assert!(!status.liveness_failed); // Readiness failure doesn't kill the pod
}
#[tokio::test]
async fn test_initial_delay_respected() {
let runtime = make_test_runtime();
let config = make_zone_config("delay-zone");
runtime.provision(&config).await.unwrap();
// Queue a failure — but probe should not run due to initial delay
runtime
.set_exec_result(
"delay-zone",
CommandOutput {
stdout: String::new(),
stderr: "fail".to_string(),
exit_code: 1,
},
)
.await;
let executor = ProbeExecutor::new(runtime.clone());
let mut tracker = ProbeTracker::new(executor);
let mut probe_cfg = exec_probe_config("web", ProbeKind::Liveness, 1);
probe_cfg.initial_delay_seconds = 3600; // 1 hour delay — won't be reached
tracker.register_pod("default/delay-zone", vec![probe_cfg], Instant::now());
let status = tracker
.check_pod("default/delay-zone", "delay-zone", "10.0.0.2")
.await;
// Probe should have been skipped, so no failure
assert!(!status.liveness_failed);
}
#[tokio::test]
async fn test_startup_gates_liveness() {
let runtime = make_test_runtime();
let config = make_zone_config("startup-gate");
runtime.provision(&config).await.unwrap();
// Startup will fail, liveness should be skipped
runtime
.set_exec_result(
"startup-gate",
CommandOutput {
stdout: String::new(),
stderr: "not started".to_string(),
exit_code: 1,
},
)
.await;
let executor = ProbeExecutor::new(runtime.clone());
let mut tracker = ProbeTracker::new(executor);
let probes = vec![
ContainerProbeConfig {
container_name: "web".to_string(),
kind: ProbeKind::Startup,
action: ProbeAction::Exec {
command: vec!["startup-check".to_string()],
},
initial_delay_seconds: 0,
period_seconds: 0,
timeout_seconds: 5,
failure_threshold: 10, // High threshold so we don't fail yet
success_threshold: 1,
},
exec_probe_config("web", ProbeKind::Liveness, 1),
];
tracker.register_pod("default/startup-gate", probes, Instant::now());
let status = tracker
.check_pod("default/startup-gate", "startup-gate", "10.0.0.2")
.await;
// Startup hasn't succeeded → liveness should be skipped → no liveness failure
assert!(!status.liveness_failed);
// But pod is not ready (startup gate)
assert!(!status.ready);
}
#[tokio::test]
async fn test_unregister_cleans_state() {
let runtime = make_test_runtime();
let executor = ProbeExecutor::new(runtime.clone());
let mut tracker = ProbeTracker::new(executor);
let probes = vec![exec_probe_config("web", ProbeKind::Liveness, 3)];
tracker.register_pod("default/cleanup-pod", probes, Instant::now());
// Verify state exists
assert!(!tracker.states.is_empty());
tracker.unregister_pod("default/cleanup-pod");
// State should be empty
assert!(tracker.states.is_empty());
}
}

View file

@ -0,0 +1,284 @@
use k8s_openapi::api::core::v1::Container;
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use std::time::{Duration, Instant};
/// Which kind of probe this is
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ProbeKind {
Startup,
Liveness,
Readiness,
}
impl std::fmt::Display for ProbeKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ProbeKind::Startup => write!(f, "startup"),
ProbeKind::Liveness => write!(f, "liveness"),
ProbeKind::Readiness => write!(f, "readiness"),
}
}
}
/// The action a probe performs
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeAction {
Exec { command: Vec<String> },
HttpGet { path: String, port: u16, host: String, scheme: String },
TcpSocket { port: u16, host: String },
}
/// Extracted probe configuration for a single container + probe kind
#[derive(Debug, Clone)]
pub struct ContainerProbeConfig {
pub container_name: String,
pub kind: ProbeKind,
pub action: ProbeAction,
pub initial_delay_seconds: u32,
pub period_seconds: u32,
pub timeout_seconds: u32,
pub failure_threshold: u32,
pub success_threshold: u32,
}
/// Outcome of a single probe execution
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeOutcome {
Success,
Failure(String),
Error(String),
}
/// Result of a probe execution with timing metadata
#[derive(Debug, Clone)]
pub struct ProbeResult {
pub outcome: ProbeOutcome,
pub duration: Duration,
pub timestamp: Instant,
}
/// Resolve an IntOrString port to a u16.
/// Named ports are not supported (would require pod spec lookup); they return 0.
fn resolve_port(port: &IntOrString) -> u16 {
match port {
IntOrString::Int(n) => *n as u16,
IntOrString::String(s) => s.parse::<u16>().unwrap_or(0),
}
}
/// Extract all probe configs from a k8s Container
pub fn extract_probes(container: &Container) -> Vec<ContainerProbeConfig> {
let mut probes = Vec::new();
let probe_sources = [
(&container.startup_probe, ProbeKind::Startup),
(&container.liveness_probe, ProbeKind::Liveness),
(&container.readiness_probe, ProbeKind::Readiness),
];
for (probe_opt, kind) in probe_sources {
let probe = match probe_opt {
Some(p) => p,
None => continue,
};
let action = if let Some(exec) = &probe.exec {
match &exec.command {
Some(cmd) if !cmd.is_empty() => ProbeAction::Exec {
command: cmd.clone(),
},
_ => continue, // Empty or missing exec command — skip
}
} else if let Some(http) = &probe.http_get {
let port = resolve_port(&http.port);
if port == 0 {
continue;
}
ProbeAction::HttpGet {
path: http.path.clone().unwrap_or_else(|| "/".to_string()),
port,
host: http.host.clone().unwrap_or_else(|| "localhost".to_string()),
scheme: http.scheme.clone().unwrap_or_else(|| "HTTP".to_string()),
}
} else if let Some(tcp) = &probe.tcp_socket {
let port = resolve_port(&tcp.port);
if port == 0 {
continue;
}
ProbeAction::TcpSocket {
port,
host: tcp.host.clone().unwrap_or_else(|| "localhost".to_string()),
}
} else {
continue; // No recognized action
};
// Apply k8s defaults: period=10, timeout=1, failure=3, success=1, initial_delay=0
probes.push(ContainerProbeConfig {
container_name: container.name.clone(),
kind,
action,
initial_delay_seconds: probe.initial_delay_seconds.unwrap_or(0) as u32,
period_seconds: probe.period_seconds.unwrap_or(10) as u32,
timeout_seconds: probe.timeout_seconds.unwrap_or(1) as u32,
failure_threshold: probe.failure_threshold.unwrap_or(3) as u32,
success_threshold: probe.success_threshold.unwrap_or(1) as u32,
});
}
probes
}
#[cfg(test)]
mod tests {
use super::*;
use k8s_openapi::api::core::v1::{
ExecAction, HTTPGetAction, Probe, TCPSocketAction,
};
#[test]
fn test_extract_exec_probe() {
let container = Container {
name: "web".to_string(),
liveness_probe: Some(Probe {
exec: Some(ExecAction {
command: Some(vec!["/bin/sh".to_string(), "-c".to_string(), "exit 0".to_string()]),
}),
period_seconds: Some(5),
failure_threshold: Some(2),
..Default::default()
}),
..Default::default()
};
let probes = extract_probes(&container);
assert_eq!(probes.len(), 1);
assert_eq!(probes[0].kind, ProbeKind::Liveness);
assert_eq!(
probes[0].action,
ProbeAction::Exec {
command: vec!["/bin/sh".to_string(), "-c".to_string(), "exit 0".to_string()]
}
);
assert_eq!(probes[0].period_seconds, 5);
assert_eq!(probes[0].failure_threshold, 2);
// Defaults applied
assert_eq!(probes[0].timeout_seconds, 1);
assert_eq!(probes[0].success_threshold, 1);
assert_eq!(probes[0].initial_delay_seconds, 0);
}
#[test]
fn test_extract_http_probe() {
let container = Container {
name: "api".to_string(),
readiness_probe: Some(Probe {
http_get: Some(HTTPGetAction {
path: Some("/healthz".to_string()),
port: IntOrString::Int(8080),
host: Some("10.0.0.5".to_string()),
scheme: Some("HTTPS".to_string()),
..Default::default()
}),
initial_delay_seconds: Some(15),
..Default::default()
}),
..Default::default()
};
let probes = extract_probes(&container);
assert_eq!(probes.len(), 1);
assert_eq!(probes[0].kind, ProbeKind::Readiness);
assert_eq!(
probes[0].action,
ProbeAction::HttpGet {
path: "/healthz".to_string(),
port: 8080,
host: "10.0.0.5".to_string(),
scheme: "HTTPS".to_string(),
}
);
assert_eq!(probes[0].initial_delay_seconds, 15);
}
#[test]
fn test_extract_tcp_probe() {
let container = Container {
name: "db".to_string(),
startup_probe: Some(Probe {
tcp_socket: Some(TCPSocketAction {
port: IntOrString::Int(5432),
host: None,
}),
period_seconds: Some(2),
failure_threshold: Some(30),
..Default::default()
}),
..Default::default()
};
let probes = extract_probes(&container);
assert_eq!(probes.len(), 1);
assert_eq!(probes[0].kind, ProbeKind::Startup);
assert_eq!(
probes[0].action,
ProbeAction::TcpSocket {
port: 5432,
host: "localhost".to_string(),
}
);
assert_eq!(probes[0].period_seconds, 2);
assert_eq!(probes[0].failure_threshold, 30);
}
#[test]
fn test_extract_no_probes() {
let container = Container {
name: "bare".to_string(),
..Default::default()
};
let probes = extract_probes(&container);
assert!(probes.is_empty());
}
#[test]
fn test_extract_defaults() {
let container = Container {
name: "defaults".to_string(),
liveness_probe: Some(Probe {
exec: Some(ExecAction {
command: Some(vec!["true".to_string()]),
}),
// All timing fields left as None → should get k8s defaults
..Default::default()
}),
..Default::default()
};
let probes = extract_probes(&container);
assert_eq!(probes.len(), 1);
assert_eq!(probes[0].initial_delay_seconds, 0);
assert_eq!(probes[0].period_seconds, 10);
assert_eq!(probes[0].timeout_seconds, 1);
assert_eq!(probes[0].failure_threshold, 3);
assert_eq!(probes[0].success_threshold, 1);
}
#[test]
fn test_extract_empty_exec_command_skipped() {
let container = Container {
name: "empty-exec".to_string(),
liveness_probe: Some(Probe {
exec: Some(ExecAction {
command: Some(vec![]),
}),
..Default::default()
}),
..Default::default()
};
let probes = extract_probes(&container);
assert!(probes.is_empty());
}
}

View file

@ -46,6 +46,19 @@ pub trait ZoneRuntime: Send + Sync {
/// List all managed zones
async fn list_zones(&self) -> Result<Vec<ZoneInfo>>;
// --- Exec ---
/// Execute a command inside a running zone
///
/// Returns the command output including exit code. A non-zero exit code
/// is NOT treated as an error — callers (e.g. probe executors) interpret
/// the exit code themselves.
async fn exec_in_zone(
&self,
zone_name: &str,
command: &[String],
) -> Result<crate::command::CommandOutput>;
// --- Networking ---
/// Set up network for a zone