use crate::api_client::ApiClient; use crate::error::{Result, RuntimeError}; use crate::network::{vnic_name_for_pod, Ipam}; use crate::traits::ZoneRuntime; use crate::types::*; use chrono::Utc; use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus}; use reddwarf_core::{ResourceEvent, ResourceQuantities, WatchEventType}; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// Configuration for the pod controller #[derive(Debug, Clone)] pub struct PodControllerConfig { /// Only reconcile pods assigned to this node pub node_name: String, /// API server URL (e.g., "http://127.0.0.1:6443") pub api_url: String, /// Prefix for zone root paths (e.g., "/zones") pub zonepath_prefix: String, /// Default zone brand pub default_brand: ZoneBrand, /// Name of the etherstub for pod networking pub etherstub_name: String, /// Pod CIDR (e.g., "10.88.0.0/16") pub pod_cidr: String, /// Interval between periodic full reconciliation cycles pub reconcile_interval: Duration, } /// Pod controller that watches for Pod events and drives zone lifecycle pub struct PodController { runtime: Arc, api_client: Arc, event_tx: broadcast::Sender, config: PodControllerConfig, ipam: Ipam, } impl PodController { pub fn new( runtime: Arc, api_client: Arc, event_tx: broadcast::Sender, config: PodControllerConfig, ipam: Ipam, ) -> Self { Self { runtime, api_client, event_tx, config, ipam, } } /// Run the controller — reacts to pod events from the in-process event bus. /// /// On startup, performs a full reconcile to catch up on any pods that were /// scheduled while the controller was down. Then switches to event-driven mode. pub async fn run(&self, token: CancellationToken) -> Result<()> { info!( "Starting pod controller for node '{}'", self.config.node_name ); // Initial full sync if let Err(e) = self.reconcile_all().await { error!("Initial reconcile failed: {}", e); } let mut rx = self.event_tx.subscribe(); let mut reconcile_tick = tokio::time::interval(self.config.reconcile_interval); // Consume the first tick — we just did reconcile_all() above reconcile_tick.tick().await; loop { tokio::select! { _ = token.cancelled() => { info!("Pod controller shutting down"); return Ok(()); } _ = reconcile_tick.tick() => { debug!("Periodic reconcile tick"); if let Err(e) = self.reconcile_all().await { error!("Periodic reconcile failed: {}", e); } } result = rx.recv() => { match result { Ok(event) => { if event.gvk.kind != "Pod" { continue; } match event.event_type { WatchEventType::Added | WatchEventType::Modified => { match serde_json::from_value::(event.object) { Ok(pod) => { if let Err(e) = self.reconcile(&pod).await { let name = pod.metadata.name.as_deref().unwrap_or(""); error!("Failed to reconcile pod {}: {}", name, e); } } Err(e) => { warn!("Failed to parse pod from event: {}", e); } } } WatchEventType::Deleted => { match serde_json::from_value::(event.object) { Ok(pod) => { if let Err(e) = self.handle_delete(&pod).await { let name = pod.metadata.name.as_deref().unwrap_or(""); error!("Failed to handle pod deletion {}: {}", name, e); } } Err(e) => { warn!("Failed to parse pod from delete event: {}", e); } } } _ => {} } } Err(broadcast::error::RecvError::Lagged(n)) => { warn!("Missed {} events, doing full resync", n); if let Err(e) = self.reconcile_all().await { error!("Resync after lag failed: {}", e); } } Err(broadcast::error::RecvError::Closed) => { info!("Event bus closed, stopping pod controller"); return Ok(()); } } } } } } /// Reconcile all pods assigned to this node async fn reconcile_all(&self) -> Result<()> { debug!("Running pod controller reconcile cycle"); // List all pods via the API client (respects TLS configuration) let body = self.api_client.get_json("/api/v1/pods").await?; let items = body["items"].as_array().cloned().unwrap_or_default(); for item in items { let pod: Pod = match serde_json::from_value(item) { Ok(p) => p, Err(e) => { warn!("Failed to parse pod from list: {}", e); continue; } }; if let Err(e) = self.reconcile(&pod).await { let pod_name = pod.metadata.name.as_deref().unwrap_or(""); error!("Failed to reconcile pod {}: {}", pod_name, e); } } Ok(()) } /// Reconcile a single Pod event pub async fn reconcile(&self, pod: &Pod) -> Result<()> { let pod_name = pod .metadata .name .as_deref() .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); let spec = match &pod.spec { Some(s) => s, None => { debug!("Skipping pod {} — no spec", pod_name); return Ok(()); } }; // Only reconcile pods assigned to this node let node_name = match &spec.node_name { Some(n) => n.as_str(), None => { debug!("Skipping pod {} — not yet scheduled", pod_name); return Ok(()); } }; if node_name != self.config.node_name { return Ok(()); } // If the pod has a deletion_timestamp, drive the termination state machine if pod.metadata.deletion_timestamp.is_some() { return self.handle_termination(pod).await; } // Check current phase let phase = pod .status .as_ref() .and_then(|s| s.phase.as_deref()) .unwrap_or(""); let zone_name = pod_zone_name(namespace, pod_name); match phase { "" | "Pending" => { // Pod is assigned to us but has no phase — provision it info!("Provisioning zone for pod {}/{}", namespace, pod_name); let zone_config = self.pod_to_zone_config(pod)?; match self.runtime.provision(&zone_config).await { Ok(()) => { info!("Zone {} provisioned successfully", zone_name); // Update pod status to Running let status = PodStatus { phase: Some("Running".to_string()), conditions: Some(vec![PodCondition { type_: "Ready".to_string(), status: "True".to_string(), ..Default::default() }]), pod_ip: Some(self.zone_ip(&zone_config)), ..Default::default() }; if let Err(e) = self .api_client .set_pod_status(namespace, pod_name, status) .await { error!("Failed to update pod status to Running: {}", e); } } Err(e) => { // Check if it's already provisioned (zone already exists) if matches!(e, RuntimeError::ZoneAlreadyExists { .. }) { debug!("Zone {} already exists, checking state", zone_name); return Ok(()); } error!("Failed to provision zone {}: {}", zone_name, e); let status = PodStatus { phase: Some("Failed".to_string()), conditions: Some(vec![PodCondition { type_: "Ready".to_string(), status: "False".to_string(), message: Some(format!("Zone provisioning failed: {}", e)), ..Default::default() }]), ..Default::default() }; if let Err(e2) = self .api_client .set_pod_status(namespace, pod_name, status) .await { error!("Failed to update pod status to Failed: {}", e2); } } } } "Running" => { // Check zone health match self.runtime.get_zone_state(&zone_name).await { Ok(ZoneState::Running) => { // All good } Ok(state) => { warn!( "Zone {} is in unexpected state: {} (expected Running)", zone_name, state ); let status = PodStatus { phase: Some("Failed".to_string()), conditions: Some(vec![PodCondition { type_: "Ready".to_string(), status: "False".to_string(), message: Some(format!("Zone is in unexpected state: {}", state)), ..Default::default() }]), ..Default::default() }; if let Err(e) = self .api_client .set_pod_status(namespace, pod_name, status) .await { error!("Failed to update pod status to Failed: {}", e); } } Err(RuntimeError::ZoneNotFound { .. }) => { warn!( "Zone {} not found but pod is Running — marking Failed", zone_name ); let status = PodStatus { phase: Some("Failed".to_string()), conditions: Some(vec![PodCondition { type_: "Ready".to_string(), status: "False".to_string(), message: Some("Zone not found".to_string()), ..Default::default() }]), ..Default::default() }; if let Err(e) = self .api_client .set_pod_status(namespace, pod_name, status) .await { error!("Failed to update pod status to Failed: {}", e); } } Err(e) => { debug!("Could not check zone state for {}: {}", zone_name, e); } } } _ => { debug!( "Pod {}/{} in phase {} — no action needed", namespace, pod_name, phase ); } } Ok(()) } /// Handle pod deletion — deprovision the zone and release IP. /// /// If the pod has a `deletion_timestamp`, the graceful termination state /// machine (`handle_termination`) is responsible for cleanup, so this method /// becomes a no-op. Otherwise (e.g. a direct storage delete that bypasses /// the graceful path), fall back to the original immediate cleanup. pub async fn handle_delete(&self, pod: &Pod) -> Result<()> { let pod_name = pod .metadata .name .as_deref() .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); // If deletion_timestamp is set, handle_termination is driving cleanup if pod.metadata.deletion_timestamp.is_some() { debug!( "Pod {}/{} has deletion_timestamp, skipping handle_delete (handled by termination state machine)", namespace, pod_name ); return Ok(()); } // Only deprovision pods assigned to this node if let Some(spec) = &pod.spec { if let Some(node_name) = &spec.node_name { if node_name != &self.config.node_name { return Ok(()); } } else { return Ok(()); } } let zone_config = self.pod_to_zone_config(pod)?; info!( "Deprovisioning zone for deleted pod {}/{}", namespace, pod_name ); if let Err(e) = self.runtime.deprovision(&zone_config).await { warn!( "Failed to deprovision zone for pod {}/{}: {}", namespace, pod_name, e ); } // Release the IP allocation if let Err(e) = self.ipam.release(namespace, pod_name) { warn!( "Failed to release IP for pod {}/{}: {}", namespace, pod_name, e ); } Ok(()) } /// Drive the graceful termination state machine for a pod with /// `deletion_timestamp` set. /// /// | Zone State | Grace Expired? | Action | /// |-----------------|----------------|--------------------------------------------| /// | Running | No | shutdown_zone() (graceful) | /// | Running | Yes | halt_zone() (force kill) | /// | ShuttingDown | No | Wait (next reconcile will re-check) | /// | ShuttingDown | Yes | halt_zone() (force kill) | /// | Stopped/Absent | — | deprovision(), release IP, finalize_pod() | async fn handle_termination(&self, pod: &Pod) -> Result<()> { let pod_name = pod .metadata .name .as_deref() .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); let zone_name = pod_zone_name(namespace, pod_name); let grace_expired = self.is_grace_period_expired(pod); // Query actual zone state let zone_state = match self.runtime.get_zone_state(&zone_name).await { Ok(state) => state, Err(RuntimeError::ZoneNotFound { .. }) => ZoneState::Absent, Err(e) => { warn!( "Could not check zone state for {}: {} — treating as absent", zone_name, e ); ZoneState::Absent } }; debug!( "Termination state machine for pod {}/{}: zone={}, grace_expired={}", namespace, pod_name, zone_state, grace_expired ); match zone_state { ZoneState::Running => { if grace_expired { warn!( "Grace period expired for pod {}/{}, force halting zone {}", namespace, pod_name, zone_name ); if let Err(e) = self.runtime.halt_zone(&zone_name).await { warn!("Failed to halt zone {}: {}", zone_name, e); } // Deprovision will happen on next reconcile when zone is stopped } else { info!( "Initiating graceful shutdown for zone {} (pod {}/{})", zone_name, namespace, pod_name ); if let Err(e) = self.runtime.shutdown_zone(&zone_name).await { warn!("Failed to shut down zone {}: {}", zone_name, e); } // Next reconcile will re-check the zone state } } ZoneState::ShuttingDown => { if grace_expired { warn!( "Grace period expired while zone {} was shutting down, force halting", zone_name ); if let Err(e) = self.runtime.halt_zone(&zone_name).await { warn!("Failed to halt zone {}: {}", zone_name, e); } } else { debug!( "Zone {} is gracefully shutting down, waiting for next reconcile", zone_name ); } } // Zone is stopped or absent — full cleanup _ => { info!( "Zone {} is stopped/absent, cleaning up pod {}/{}", zone_name, namespace, pod_name ); // Try to deprovision (cleans up datasets, network, etc.) // Build a minimal zone config for deprovision — use existing IP if allocated let zone_config_result = self.pod_to_zone_config(pod); if let Ok(zone_config) = zone_config_result { if let Err(e) = self.runtime.deprovision(&zone_config).await { // Zone may already be fully cleaned up — that's fine debug!( "Deprovision for zone {} returned error (may be expected): {}", zone_name, e ); } } // Release the IP allocation if let Err(e) = self.ipam.release(namespace, pod_name) { debug!( "Failed to release IP for pod {}/{}: {} (may already be released)", namespace, pod_name, e ); } // Finalize — remove the pod from API server storage if let Err(e) = self.api_client.finalize_pod(namespace, pod_name).await { error!( "Failed to finalize pod {}/{}: {}", namespace, pod_name, e ); } else { info!("Pod {}/{} finalized and removed", namespace, pod_name); } } } Ok(()) } /// Check whether the pod's grace period has expired fn is_grace_period_expired(&self, pod: &Pod) -> bool { let deletion_ts = match &pod.metadata.deletion_timestamp { Some(t) => t.0, None => return false, }; let grace_secs = pod .metadata .deletion_grace_period_seconds .unwrap_or(30); let deadline = deletion_ts + chrono::Duration::seconds(grace_secs); Utc::now() >= deadline } /// Convert a Pod spec to a ZoneConfig with per-pod VNIC and IP fn pod_to_zone_config(&self, pod: &Pod) -> Result { let pod_name = pod .metadata .name .as_deref() .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); let spec = pod .spec .as_ref() .ok_or_else(|| RuntimeError::internal_error("Pod has no spec"))?; let zone_name = pod_zone_name(namespace, pod_name); let zonepath = format!("{}/{}", self.config.zonepath_prefix, zone_name); // Allocate a unique VNIC name and IP for this pod let vnic_name = vnic_name_for_pod(namespace, pod_name); let allocation = self.ipam.allocate(namespace, pod_name)?; let network = NetworkMode::Etherstub(EtherstubConfig { etherstub_name: self.config.etherstub_name.clone(), vnic_name, ip_address: allocation.ip_address.to_string(), gateway: allocation.gateway.to_string(), prefix_len: allocation.prefix_len, }); // Map containers to ContainerProcess entries let processes: Vec = spec .containers .iter() .map(|c| { let command = c .command .clone() .unwrap_or_default() .into_iter() .chain(c.args.clone().unwrap_or_default()) .collect::>(); let env = c .env .as_ref() .map(|envs| { envs.iter() .filter_map(|e| e.value.as_ref().map(|v| (e.name.clone(), v.clone()))) .collect::>() }) .unwrap_or_default(); ContainerProcess { name: c.name.clone(), command, working_dir: c.working_dir.clone(), env, } }) .collect(); // Aggregate resource limits across all containers in the pod. // Prefer limits (hard cap) over requests (soft guarantee). let (total_cpu_millicores, total_memory_bytes) = spec.containers.iter().fold((0i64, 0i64), |(cpu, mem), c| { let resources = c.resources.as_ref(); let res_map = resources .and_then(|r| r.limits.as_ref()) .or_else(|| resources.and_then(|r| r.requests.as_ref())); let (c_cpu, c_mem) = match res_map { Some(map) => { let rq = ResourceQuantities::from_k8s_resource_map(map); (rq.cpu_millicores, rq.memory_bytes) } None => (0, 0), }; (cpu + c_cpu, mem + c_mem) }); let cpu_cap = if total_cpu_millicores > 0 { Some(ResourceQuantities::cpu_as_zone_cap(total_cpu_millicores)) } else { None }; let memory_cap = if total_memory_bytes > 0 { Some(ResourceQuantities::memory_as_zone_cap(total_memory_bytes)) } else { None }; Ok(ZoneConfig { zone_name, brand: self.config.default_brand.clone(), zonepath, network, storage: ZoneStorageOpts::default(), lx_image_path: None, processes, cpu_cap, memory_cap, fs_mounts: vec![], }) } /// Extract IP address from zone config network fn zone_ip(&self, config: &ZoneConfig) -> String { match &config.network { NetworkMode::Etherstub(e) => e.ip_address.clone(), NetworkMode::Direct(d) => d.ip_address.clone(), } } } /// Generate a zone name from namespace and pod name /// /// Zone names must be valid illumos zone names (alphanumeric, hyphens, max 64 chars). pub fn pod_zone_name(namespace: &str, pod_name: &str) -> String { let raw = format!("reddwarf-{}-{}", namespace, pod_name); // Sanitize: only keep alphanumeric and hyphens, truncate to 64 chars let sanitized: String = raw .chars() .map(|c| { if c.is_ascii_alphanumeric() || c == '-' { c } else { '-' } }) .collect(); if sanitized.len() > 64 { sanitized[..64].to_string() } else { sanitized } } #[cfg(test)] mod tests { use super::*; use crate::network::Ipam; use k8s_openapi::api::core::v1::{Container, PodSpec}; use reddwarf_storage::RedbBackend; use std::net::Ipv4Addr; use std::time::Duration; use tempfile::tempdir; fn make_test_controller() -> (PodController, tempfile::TempDir) { let dir = tempdir().unwrap(); let db_path = dir.path().join("test-controller.redb"); let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); let ipam = Ipam::new(storage, "10.88.0.0/16").unwrap(); let mock_storage = Arc::new(crate::storage::MockStorageEngine::new( crate::types::StoragePoolConfig::from_pool("rpool"), )); let runtime = Arc::new(crate::mock::MockRuntime::new(mock_storage)); let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); let (event_tx, _) = broadcast::channel(16); let config = PodControllerConfig { node_name: "node1".to_string(), api_url: "http://127.0.0.1:6443".to_string(), zonepath_prefix: "/zones".to_string(), default_brand: ZoneBrand::Reddwarf, etherstub_name: "reddwarf0".to_string(), pod_cidr: "10.88.0.0/16".to_string(), reconcile_interval: Duration::from_secs(30), }; let controller = PodController::new(runtime, api_client, event_tx, config, ipam); (controller, dir) } #[test] fn test_pod_zone_name_basic() { assert_eq!(pod_zone_name("default", "nginx"), "reddwarf-default-nginx"); } #[test] fn test_pod_zone_name_sanitization() { // Dots get replaced with hyphens assert_eq!( pod_zone_name("my.namespace", "my.pod"), "reddwarf-my-namespace-my-pod" ); } #[test] fn test_pod_zone_name_truncation() { let long_name = "a".repeat(60); let name = pod_zone_name("ns", &long_name); assert!(name.len() <= 64); } #[test] fn test_pod_to_zone_config_maps_containers() { let (controller, _dir) = make_test_controller(); let mut pod = Pod::default(); pod.metadata.name = Some("test-pod".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.spec = Some(PodSpec { containers: vec![ Container { name: "web".to_string(), command: Some(vec!["nginx".to_string()]), args: Some(vec!["-g".to_string(), "daemon off;".to_string()]), ..Default::default() }, Container { name: "sidecar".to_string(), command: Some(vec!["/bin/sh".to_string(), "-c".to_string()]), ..Default::default() }, ], ..Default::default() }); let zone_config = controller.pod_to_zone_config(&pod).unwrap(); assert_eq!(zone_config.zone_name, "reddwarf-default-test-pod"); assert_eq!(zone_config.zonepath, "/zones/reddwarf-default-test-pod"); assert_eq!(zone_config.processes.len(), 2); assert_eq!(zone_config.processes[0].name, "web"); assert_eq!( zone_config.processes[0].command, vec!["nginx", "-g", "daemon off;"] ); assert_eq!(zone_config.processes[1].name, "sidecar"); assert_eq!(zone_config.processes[1].command, vec!["/bin/sh", "-c"]); assert_eq!(zone_config.brand, ZoneBrand::Reddwarf); // Verify per-pod networking match &zone_config.network { NetworkMode::Etherstub(cfg) => { assert_eq!(cfg.etherstub_name, "reddwarf0"); assert_eq!(cfg.vnic_name, "vnic_default_test_pod"); assert_eq!(cfg.ip_address, Ipv4Addr::new(10, 88, 0, 2).to_string()); assert_eq!(cfg.gateway, Ipv4Addr::new(10, 88, 0, 1).to_string()); assert_eq!(cfg.prefix_len, 16); } _ => panic!("Expected Etherstub network mode"), } } #[test] fn test_pod_to_zone_config_unique_ips() { let (controller, _dir) = make_test_controller(); let mut pod_a = Pod::default(); pod_a.metadata.name = Some("pod-a".to_string()); pod_a.metadata.namespace = Some("default".to_string()); pod_a.spec = Some(PodSpec { containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), ..Default::default() }], ..Default::default() }); let mut pod_b = Pod::default(); pod_b.metadata.name = Some("pod-b".to_string()); pod_b.metadata.namespace = Some("default".to_string()); pod_b.spec = Some(PodSpec { containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), ..Default::default() }], ..Default::default() }); let config_a = controller.pod_to_zone_config(&pod_a).unwrap(); let config_b = controller.pod_to_zone_config(&pod_b).unwrap(); let ip_a = match &config_a.network { NetworkMode::Etherstub(cfg) => cfg.ip_address.clone(), _ => panic!("Expected Etherstub"), }; let ip_b = match &config_b.network { NetworkMode::Etherstub(cfg) => cfg.ip_address.clone(), _ => panic!("Expected Etherstub"), }; assert_ne!(ip_a, ip_b, "Each pod should get a unique IP"); assert_eq!(ip_a, "10.88.0.2"); assert_eq!(ip_b, "10.88.0.3"); } #[test] fn test_pod_to_zone_config_no_spec_returns_error() { let (controller, _dir) = make_test_controller(); let mut pod = Pod::default(); pod.metadata.name = Some("test-pod".to_string()); // No spec set let result = controller.pod_to_zone_config(&pod); assert!(result.is_err()); } #[test] fn test_pod_to_zone_config_with_cpu_and_memory_limits() { use k8s_openapi::api::core::v1::ResourceRequirements; use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use std::collections::BTreeMap; let (controller, _dir) = make_test_controller(); let mut limits = BTreeMap::new(); limits.insert("cpu".to_string(), Quantity("1".to_string())); limits.insert("memory".to_string(), Quantity("512Mi".to_string())); let mut pod = Pod::default(); pod.metadata.name = Some("capped-pod".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.spec = Some(PodSpec { containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), resources: Some(ResourceRequirements { limits: Some(limits), ..Default::default() }), ..Default::default() }], ..Default::default() }); let zone_config = controller.pod_to_zone_config(&pod).unwrap(); assert_eq!(zone_config.cpu_cap, Some("1.00".to_string())); assert_eq!(zone_config.memory_cap, Some("512M".to_string())); } #[test] fn test_pod_to_zone_config_with_requests_fallback() { use k8s_openapi::api::core::v1::ResourceRequirements; use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use std::collections::BTreeMap; let (controller, _dir) = make_test_controller(); let mut requests = BTreeMap::new(); requests.insert("cpu".to_string(), Quantity("500m".to_string())); requests.insert("memory".to_string(), Quantity("256Mi".to_string())); let mut pod = Pod::default(); pod.metadata.name = Some("req-pod".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.spec = Some(PodSpec { containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), resources: Some(ResourceRequirements { requests: Some(requests), limits: None, ..Default::default() }), ..Default::default() }], ..Default::default() }); let zone_config = controller.pod_to_zone_config(&pod).unwrap(); assert_eq!(zone_config.cpu_cap, Some("0.50".to_string())); assert_eq!(zone_config.memory_cap, Some("256M".to_string())); } #[test] fn test_pod_to_zone_config_aggregates_multiple_containers() { use k8s_openapi::api::core::v1::ResourceRequirements; use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use std::collections::BTreeMap; let (controller, _dir) = make_test_controller(); let make_limits = |cpu: &str, mem: &str| { let mut limits = BTreeMap::new(); limits.insert("cpu".to_string(), Quantity(cpu.to_string())); limits.insert("memory".to_string(), Quantity(mem.to_string())); limits }; let mut pod = Pod::default(); pod.metadata.name = Some("multi-pod".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.spec = Some(PodSpec { containers: vec![ Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), resources: Some(ResourceRequirements { limits: Some(make_limits("500m", "256Mi")), ..Default::default() }), ..Default::default() }, Container { name: "sidecar".to_string(), command: Some(vec!["/bin/sh".to_string()]), resources: Some(ResourceRequirements { limits: Some(make_limits("500m", "256Mi")), ..Default::default() }), ..Default::default() }, ], ..Default::default() }); let zone_config = controller.pod_to_zone_config(&pod).unwrap(); // 500m + 500m = 1000m = 1.00 assert_eq!(zone_config.cpu_cap, Some("1.00".to_string())); // 256Mi + 256Mi = 512Mi assert_eq!(zone_config.memory_cap, Some("512M".to_string())); } #[test] fn test_pod_to_zone_config_no_resources() { let (controller, _dir) = make_test_controller(); let mut pod = Pod::default(); pod.metadata.name = Some("bare-pod".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.spec = Some(PodSpec { containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), ..Default::default() }], ..Default::default() }); let zone_config = controller.pod_to_zone_config(&pod).unwrap(); assert_eq!(zone_config.cpu_cap, None); assert_eq!(zone_config.memory_cap, None); } #[test] fn test_grace_period_not_expired() { let (controller, _dir) = make_test_controller(); let mut pod = Pod::default(); pod.metadata.name = Some("grace-pod".to_string()); pod.metadata.deletion_timestamp = Some( k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), ); pod.metadata.deletion_grace_period_seconds = Some(30); assert!(!controller.is_grace_period_expired(&pod)); } #[test] fn test_grace_period_expired() { let (controller, _dir) = make_test_controller(); let mut pod = Pod::default(); pod.metadata.name = Some("expired-pod".to_string()); // Set deletion_timestamp 60 seconds in the past pod.metadata.deletion_timestamp = Some( k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( Utc::now() - chrono::Duration::seconds(60), ), ); pod.metadata.deletion_grace_period_seconds = Some(30); assert!(controller.is_grace_period_expired(&pod)); } #[test] fn test_grace_period_no_deletion_timestamp() { let (controller, _dir) = make_test_controller(); let pod = Pod::default(); assert!(!controller.is_grace_period_expired(&pod)); } #[tokio::test] async fn test_handle_termination_absent_zone_calls_finalize() { let (controller, _dir) = make_test_controller(); // Build a pod with deletion_timestamp that is assigned to our node let mut pod = Pod::default(); pod.metadata.name = Some("term-pod".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.metadata.deletion_timestamp = Some( k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), ); pod.metadata.deletion_grace_period_seconds = Some(30); pod.spec = Some(PodSpec { node_name: Some("node1".to_string()), containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), ..Default::default() }], ..Default::default() }); // Zone doesn't exist in the mock runtime → should be treated as Absent // handle_termination will try to deprovision (fail, that's ok) and then // finalize (will fail since no API server, but the path is exercised) let result = controller.handle_termination(&pod).await; // The function should complete Ok (finalize failure is logged, not returned) assert!(result.is_ok()); } #[tokio::test] async fn test_handle_termination_running_zone_graceful_shutdown() { let (controller, _dir) = make_test_controller(); // First, provision a zone so it's Running let mut pod = Pod::default(); pod.metadata.name = Some("running-term".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.spec = Some(PodSpec { node_name: Some("node1".to_string()), containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), ..Default::default() }], ..Default::default() }); // Provision the zone through the runtime let zone_config = controller.pod_to_zone_config(&pod).unwrap(); controller.runtime.provision(&zone_config).await.unwrap(); // Verify it's running let zone_name = pod_zone_name("default", "running-term"); let state = controller.runtime.get_zone_state(&zone_name).await.unwrap(); assert_eq!(state, ZoneState::Running); // Set deletion_timestamp (grace period NOT expired) pod.metadata.deletion_timestamp = Some( k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), ); pod.metadata.deletion_grace_period_seconds = Some(30); // Handle termination — should call shutdown_zone let result = controller.handle_termination(&pod).await; assert!(result.is_ok()); // After shutdown, MockRuntime transitions zone to Installed let state = controller.runtime.get_zone_state(&zone_name).await.unwrap(); assert_eq!(state, ZoneState::Installed); } #[tokio::test] async fn test_handle_delete_skips_when_deletion_timestamp_set() { let (controller, _dir) = make_test_controller(); let mut pod = Pod::default(); pod.metadata.name = Some("skip-pod".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.metadata.deletion_timestamp = Some( k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), ); pod.spec = Some(PodSpec { node_name: Some("node1".to_string()), containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), ..Default::default() }], ..Default::default() }); // handle_delete should return Ok immediately — skipping deprovision let result = controller.handle_delete(&pod).await; assert!(result.is_ok()); } #[tokio::test] async fn test_reconcile_with_deletion_timestamp_uses_termination() { let (controller, _dir) = make_test_controller(); // Build a pod with deletion_timestamp, assigned to our node, phase Terminating let mut pod = Pod::default(); pod.metadata.name = Some("recon-term".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.metadata.deletion_timestamp = Some( k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(Utc::now()), ); pod.metadata.deletion_grace_period_seconds = Some(30); pod.spec = Some(PodSpec { node_name: Some("node1".to_string()), containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), ..Default::default() }], ..Default::default() }); pod.status = Some(PodStatus { phase: Some("Terminating".to_string()), ..Default::default() }); // reconcile() should detect deletion_timestamp and call handle_termination // Zone is absent → will try to finalize (will fail, but logged) let result = controller.reconcile(&pod).await; assert!(result.is_ok()); } }