use crate::api_client::ApiClient; use crate::error::{Result, RuntimeError}; use crate::network::{vnic_name_for_pod, Ipam}; use crate::traits::ZoneRuntime; use crate::types::*; use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus}; use reddwarf_core::{ResourceEvent, WatchEventType}; use std::sync::Arc; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// Configuration for the pod controller #[derive(Debug, Clone)] pub struct PodControllerConfig { /// Only reconcile pods assigned to this node pub node_name: String, /// API server URL (e.g., "http://127.0.0.1:6443") pub api_url: String, /// Prefix for zone root paths (e.g., "/zones") pub zonepath_prefix: String, /// Parent ZFS dataset (e.g., "rpool/zones") pub zfs_parent_dataset: String, /// Default zone brand pub default_brand: ZoneBrand, /// Name of the etherstub for pod networking pub etherstub_name: String, /// Pod CIDR (e.g., "10.88.0.0/16") pub pod_cidr: String, } /// Pod controller that watches for Pod events and drives zone lifecycle pub struct PodController { runtime: Arc, api_client: Arc, event_tx: broadcast::Sender, config: PodControllerConfig, ipam: Ipam, } impl PodController { pub fn new( runtime: Arc, api_client: Arc, event_tx: broadcast::Sender, config: PodControllerConfig, ipam: Ipam, ) -> Self { Self { runtime, api_client, event_tx, config, ipam, } } /// Run the controller — reacts to pod events from the in-process event bus. /// /// On startup, performs a full reconcile to catch up on any pods that were /// scheduled while the controller was down. Then switches to event-driven mode. pub async fn run(&self, token: CancellationToken) -> Result<()> { info!( "Starting pod controller for node '{}'", self.config.node_name ); // Initial full sync if let Err(e) = self.reconcile_all().await { error!("Initial reconcile failed: {}", e); } let mut rx = self.event_tx.subscribe(); loop { tokio::select! { _ = token.cancelled() => { info!("Pod controller shutting down"); return Ok(()); } result = rx.recv() => { match result { Ok(event) => { if event.gvk.kind != "Pod" { continue; } match event.event_type { WatchEventType::Added | WatchEventType::Modified => { match serde_json::from_value::(event.object) { Ok(pod) => { if let Err(e) = self.reconcile(&pod).await { let name = pod.metadata.name.as_deref().unwrap_or(""); error!("Failed to reconcile pod {}: {}", name, e); } } Err(e) => { warn!("Failed to parse pod from event: {}", e); } } } WatchEventType::Deleted => { match serde_json::from_value::(event.object) { Ok(pod) => { if let Err(e) = self.handle_delete(&pod).await { let name = pod.metadata.name.as_deref().unwrap_or(""); error!("Failed to handle pod deletion {}: {}", name, e); } } Err(e) => { warn!("Failed to parse pod from delete event: {}", e); } } } _ => {} } } Err(broadcast::error::RecvError::Lagged(n)) => { warn!("Missed {} events, doing full resync", n); if let Err(e) = self.reconcile_all().await { error!("Resync after lag failed: {}", e); } } Err(broadcast::error::RecvError::Closed) => { info!("Event bus closed, stopping pod controller"); return Ok(()); } } } } } } /// Reconcile all pods assigned to this node async fn reconcile_all(&self) -> Result<()> { debug!("Running pod controller reconcile cycle"); // List all pods via the API let url = format!("{}/api/v1/pods", self.api_client.base_url()); let resp = reqwest::get(&url) .await .map_err(|e| RuntimeError::internal_error(format!("Failed to list pods: {}", e)))?; if !resp.status().is_success() { return Err(RuntimeError::internal_error("Failed to list pods")); } let body: serde_json::Value = resp.json().await.map_err(|e| { RuntimeError::internal_error(format!("Failed to parse pod list: {}", e)) })?; let items = body["items"].as_array().cloned().unwrap_or_default(); for item in items { let pod: Pod = match serde_json::from_value(item) { Ok(p) => p, Err(e) => { warn!("Failed to parse pod from list: {}", e); continue; } }; if let Err(e) = self.reconcile(&pod).await { let pod_name = pod.metadata.name.as_deref().unwrap_or(""); error!("Failed to reconcile pod {}: {}", pod_name, e); } } Ok(()) } /// Reconcile a single Pod event pub async fn reconcile(&self, pod: &Pod) -> Result<()> { let pod_name = pod .metadata .name .as_deref() .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); let spec = match &pod.spec { Some(s) => s, None => { debug!("Skipping pod {} — no spec", pod_name); return Ok(()); } }; // Only reconcile pods assigned to this node let node_name = match &spec.node_name { Some(n) => n.as_str(), None => { debug!("Skipping pod {} — not yet scheduled", pod_name); return Ok(()); } }; if node_name != self.config.node_name { return Ok(()); } // Check current phase let phase = pod .status .as_ref() .and_then(|s| s.phase.as_deref()) .unwrap_or(""); let zone_name = pod_zone_name(namespace, pod_name); match phase { "" | "Pending" => { // Pod is assigned to us but has no phase — provision it info!("Provisioning zone for pod {}/{}", namespace, pod_name); let zone_config = self.pod_to_zone_config(pod)?; match self.runtime.provision(&zone_config).await { Ok(()) => { info!("Zone {} provisioned successfully", zone_name); // Update pod status to Running let status = PodStatus { phase: Some("Running".to_string()), conditions: Some(vec![PodCondition { type_: "Ready".to_string(), status: "True".to_string(), ..Default::default() }]), pod_ip: Some(self.zone_ip(&zone_config)), ..Default::default() }; if let Err(e) = self .api_client .set_pod_status(namespace, pod_name, status) .await { error!("Failed to update pod status to Running: {}", e); } } Err(e) => { // Check if it's already provisioned (zone already exists) if matches!(e, RuntimeError::ZoneAlreadyExists { .. }) { debug!("Zone {} already exists, checking state", zone_name); return Ok(()); } error!("Failed to provision zone {}: {}", zone_name, e); let status = PodStatus { phase: Some("Failed".to_string()), conditions: Some(vec![PodCondition { type_: "Ready".to_string(), status: "False".to_string(), message: Some(format!("Zone provisioning failed: {}", e)), ..Default::default() }]), ..Default::default() }; if let Err(e2) = self .api_client .set_pod_status(namespace, pod_name, status) .await { error!("Failed to update pod status to Failed: {}", e2); } } } } "Running" => { // Check zone health match self.runtime.get_zone_state(&zone_name).await { Ok(ZoneState::Running) => { // All good } Ok(state) => { warn!( "Zone {} is in unexpected state: {} (expected Running)", zone_name, state ); let status = PodStatus { phase: Some("Failed".to_string()), conditions: Some(vec![PodCondition { type_: "Ready".to_string(), status: "False".to_string(), message: Some(format!("Zone is in unexpected state: {}", state)), ..Default::default() }]), ..Default::default() }; if let Err(e) = self .api_client .set_pod_status(namespace, pod_name, status) .await { error!("Failed to update pod status to Failed: {}", e); } } Err(RuntimeError::ZoneNotFound { .. }) => { warn!( "Zone {} not found but pod is Running — marking Failed", zone_name ); let status = PodStatus { phase: Some("Failed".to_string()), conditions: Some(vec![PodCondition { type_: "Ready".to_string(), status: "False".to_string(), message: Some("Zone not found".to_string()), ..Default::default() }]), ..Default::default() }; if let Err(e) = self .api_client .set_pod_status(namespace, pod_name, status) .await { error!("Failed to update pod status to Failed: {}", e); } } Err(e) => { debug!("Could not check zone state for {}: {}", zone_name, e); } } } _ => { debug!( "Pod {}/{} in phase {} — no action needed", namespace, pod_name, phase ); } } Ok(()) } /// Handle pod deletion — deprovision the zone and release IP pub async fn handle_delete(&self, pod: &Pod) -> Result<()> { let pod_name = pod .metadata .name .as_deref() .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); // Only deprovision pods assigned to this node if let Some(spec) = &pod.spec { if let Some(node_name) = &spec.node_name { if node_name != &self.config.node_name { return Ok(()); } } else { return Ok(()); } } let zone_config = self.pod_to_zone_config(pod)?; info!( "Deprovisioning zone for deleted pod {}/{}", namespace, pod_name ); if let Err(e) = self.runtime.deprovision(&zone_config).await { warn!( "Failed to deprovision zone for pod {}/{}: {}", namespace, pod_name, e ); } // Release the IP allocation if let Err(e) = self.ipam.release(namespace, pod_name) { warn!( "Failed to release IP for pod {}/{}: {}", namespace, pod_name, e ); } Ok(()) } /// Convert a Pod spec to a ZoneConfig with per-pod VNIC and IP fn pod_to_zone_config(&self, pod: &Pod) -> Result { let pod_name = pod .metadata .name .as_deref() .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); let spec = pod .spec .as_ref() .ok_or_else(|| RuntimeError::internal_error("Pod has no spec"))?; let zone_name = pod_zone_name(namespace, pod_name); let zonepath = format!("{}/{}", self.config.zonepath_prefix, zone_name); // Allocate a unique VNIC name and IP for this pod let vnic_name = vnic_name_for_pod(namespace, pod_name); let allocation = self.ipam.allocate(namespace, pod_name)?; let network = NetworkMode::Etherstub(EtherstubConfig { etherstub_name: self.config.etherstub_name.clone(), vnic_name, ip_address: allocation.ip_address.to_string(), gateway: allocation.gateway.to_string(), prefix_len: allocation.prefix_len, }); // Map containers to ContainerProcess entries let processes: Vec = spec .containers .iter() .map(|c| { let command = c .command .clone() .unwrap_or_default() .into_iter() .chain(c.args.clone().unwrap_or_default()) .collect::>(); let env = c .env .as_ref() .map(|envs| { envs.iter() .filter_map(|e| e.value.as_ref().map(|v| (e.name.clone(), v.clone()))) .collect::>() }) .unwrap_or_default(); ContainerProcess { name: c.name.clone(), command, working_dir: c.working_dir.clone(), env, } }) .collect(); Ok(ZoneConfig { zone_name, brand: self.config.default_brand.clone(), zonepath, network, zfs: ZfsConfig { parent_dataset: self.config.zfs_parent_dataset.clone(), clone_from: None, quota: None, }, lx_image_path: None, processes, cpu_cap: None, memory_cap: None, fs_mounts: vec![], }) } /// Extract IP address from zone config network fn zone_ip(&self, config: &ZoneConfig) -> String { match &config.network { NetworkMode::Etherstub(e) => e.ip_address.clone(), NetworkMode::Direct(d) => d.ip_address.clone(), } } } /// Generate a zone name from namespace and pod name /// /// Zone names must be valid illumos zone names (alphanumeric, hyphens, max 64 chars). pub fn pod_zone_name(namespace: &str, pod_name: &str) -> String { let raw = format!("reddwarf-{}-{}", namespace, pod_name); // Sanitize: only keep alphanumeric and hyphens, truncate to 64 chars let sanitized: String = raw .chars() .map(|c| { if c.is_ascii_alphanumeric() || c == '-' { c } else { '-' } }) .collect(); if sanitized.len() > 64 { sanitized[..64].to_string() } else { sanitized } } #[cfg(test)] mod tests { use super::*; use crate::network::Ipam; use k8s_openapi::api::core::v1::{Container, PodSpec}; use reddwarf_storage::RedbBackend; use std::net::Ipv4Addr; use tempfile::tempdir; fn make_test_controller() -> (PodController, tempfile::TempDir) { let dir = tempdir().unwrap(); let db_path = dir.path().join("test-controller.redb"); let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); let ipam = Ipam::new(storage, "10.88.0.0/16").unwrap(); let runtime = Arc::new(crate::mock::MockRuntime::new()); let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); let (event_tx, _) = broadcast::channel(16); let config = PodControllerConfig { node_name: "node1".to_string(), api_url: "http://127.0.0.1:6443".to_string(), zonepath_prefix: "/zones".to_string(), zfs_parent_dataset: "rpool/zones".to_string(), default_brand: ZoneBrand::Reddwarf, etherstub_name: "reddwarf0".to_string(), pod_cidr: "10.88.0.0/16".to_string(), }; let controller = PodController::new(runtime, api_client, event_tx, config, ipam); (controller, dir) } #[test] fn test_pod_zone_name_basic() { assert_eq!(pod_zone_name("default", "nginx"), "reddwarf-default-nginx"); } #[test] fn test_pod_zone_name_sanitization() { // Dots get replaced with hyphens assert_eq!( pod_zone_name("my.namespace", "my.pod"), "reddwarf-my-namespace-my-pod" ); } #[test] fn test_pod_zone_name_truncation() { let long_name = "a".repeat(60); let name = pod_zone_name("ns", &long_name); assert!(name.len() <= 64); } #[test] fn test_pod_to_zone_config_maps_containers() { let (controller, _dir) = make_test_controller(); let mut pod = Pod::default(); pod.metadata.name = Some("test-pod".to_string()); pod.metadata.namespace = Some("default".to_string()); pod.spec = Some(PodSpec { containers: vec![ Container { name: "web".to_string(), command: Some(vec!["nginx".to_string()]), args: Some(vec!["-g".to_string(), "daemon off;".to_string()]), ..Default::default() }, Container { name: "sidecar".to_string(), command: Some(vec!["/bin/sh".to_string(), "-c".to_string()]), ..Default::default() }, ], ..Default::default() }); let zone_config = controller.pod_to_zone_config(&pod).unwrap(); assert_eq!(zone_config.zone_name, "reddwarf-default-test-pod"); assert_eq!(zone_config.zonepath, "/zones/reddwarf-default-test-pod"); assert_eq!(zone_config.processes.len(), 2); assert_eq!(zone_config.processes[0].name, "web"); assert_eq!( zone_config.processes[0].command, vec!["nginx", "-g", "daemon off;"] ); assert_eq!(zone_config.processes[1].name, "sidecar"); assert_eq!(zone_config.processes[1].command, vec!["/bin/sh", "-c"]); assert_eq!(zone_config.brand, ZoneBrand::Reddwarf); assert_eq!(zone_config.zfs.parent_dataset, "rpool/zones"); // Verify per-pod networking match &zone_config.network { NetworkMode::Etherstub(cfg) => { assert_eq!(cfg.etherstub_name, "reddwarf0"); assert_eq!(cfg.vnic_name, "vnic_default_test_pod"); assert_eq!(cfg.ip_address, Ipv4Addr::new(10, 88, 0, 2).to_string()); assert_eq!(cfg.gateway, Ipv4Addr::new(10, 88, 0, 1).to_string()); assert_eq!(cfg.prefix_len, 16); } _ => panic!("Expected Etherstub network mode"), } } #[test] fn test_pod_to_zone_config_unique_ips() { let (controller, _dir) = make_test_controller(); let mut pod_a = Pod::default(); pod_a.metadata.name = Some("pod-a".to_string()); pod_a.metadata.namespace = Some("default".to_string()); pod_a.spec = Some(PodSpec { containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), ..Default::default() }], ..Default::default() }); let mut pod_b = Pod::default(); pod_b.metadata.name = Some("pod-b".to_string()); pod_b.metadata.namespace = Some("default".to_string()); pod_b.spec = Some(PodSpec { containers: vec![Container { name: "web".to_string(), command: Some(vec!["/bin/sh".to_string()]), ..Default::default() }], ..Default::default() }); let config_a = controller.pod_to_zone_config(&pod_a).unwrap(); let config_b = controller.pod_to_zone_config(&pod_b).unwrap(); let ip_a = match &config_a.network { NetworkMode::Etherstub(cfg) => cfg.ip_address.clone(), _ => panic!("Expected Etherstub"), }; let ip_b = match &config_b.network { NetworkMode::Etherstub(cfg) => cfg.ip_address.clone(), _ => panic!("Expected Etherstub"), }; assert_ne!(ip_a, ip_b, "Each pod should get a unique IP"); assert_eq!(ip_a, "10.88.0.2"); assert_eq!(ip_b, "10.88.0.3"); } #[test] fn test_pod_to_zone_config_no_spec_returns_error() { let (controller, _dir) = make_test_controller(); let mut pod = Pod::default(); pod.metadata.name = Some("test-pod".to_string()); // No spec set let result = controller.pod_to_zone_config(&pod); assert!(result.is_err()); } }