From d8425ad85dbb5605032c674c7551c37d3eb24328 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 19 Mar 2026 20:28:40 +0000 Subject: [PATCH] Add service networking, bhyve brand, ipadm IP config, and zone state reporting Service networking: - ClusterIP IPAM allocation on service create/delete via reusable Ipam with_prefix() - ServiceController watches Pod/Service events + periodic reconcile to track endpoints - NatManager generates ipnat rdr rules for ClusterIP -> pod IP forwarding - Embedded DNS server resolves {svc}.{ns}.svc.cluster.local to ClusterIP - New CLI flags: --service-cidr (default 10.96.0.0/12), --cluster-dns (default 0.0.0.0:10053) Quick wins: - ipadm IP assignment: configure_zone_ip() runs ipadm/route inside zone via zlogin after boot - Node heartbeat zone state reporting: reddwarf.io/zone-count and zone-summary annotations - bhyve brand support: ZoneBrand::Bhyve, install args, zonecfg device generation, controller integration 189 tests passing, clippy clean. https://claude.ai/code/session_016QLFjAyYGzMPbBjEGMe75j --- AUDIT.md | 12 +- Cargo.lock | 1 + crates/reddwarf-apiserver/Cargo.toml | 1 + .../reddwarf-apiserver/src/handlers/pods.rs | 20 +- .../src/handlers/services.rs | 48 +- crates/reddwarf-apiserver/src/server.rs | 12 +- crates/reddwarf-apiserver/src/state.rs | 5 + crates/reddwarf-apiserver/src/tls.rs | 21 +- .../reddwarf-core/src/resources/quantities.rs | 5 +- crates/reddwarf-runtime/src/brand/bhyve.rs | 91 ++++ crates/reddwarf-runtime/src/brand/lx.rs | 1 + crates/reddwarf-runtime/src/brand/mod.rs | 1 + crates/reddwarf-runtime/src/controller.rs | 68 ++- crates/reddwarf-runtime/src/dns.rs | 461 ++++++++++++++++++ crates/reddwarf-runtime/src/error.rs | 4 +- crates/reddwarf-runtime/src/illumos.rs | 103 +++- crates/reddwarf-runtime/src/lib.rs | 8 +- crates/reddwarf-runtime/src/mock.rs | 16 +- crates/reddwarf-runtime/src/network/ipam.rs | 108 ++-- crates/reddwarf-runtime/src/network/mod.rs | 2 + crates/reddwarf-runtime/src/network/nat.rs | 218 +++++++++ crates/reddwarf-runtime/src/node_agent.rs | 171 ++++++- crates/reddwarf-runtime/src/node_health.rs | 14 +- .../reddwarf-runtime/src/probes/executor.rs | 24 +- crates/reddwarf-runtime/src/probes/mod.rs | 2 +- crates/reddwarf-runtime/src/probes/tracker.rs | 5 +- crates/reddwarf-runtime/src/probes/types.rs | 32 +- .../src/service_controller.rs | 236 +++++++++ crates/reddwarf-runtime/src/sysinfo.rs | 14 +- crates/reddwarf-runtime/src/traits.rs | 6 + crates/reddwarf-runtime/src/types.rs | 6 + crates/reddwarf-runtime/src/zone/config.rs | 12 +- crates/reddwarf-scheduler/src/filter.rs | 5 +- crates/reddwarf/src/main.rs | 95 +++- 34 files changed, 1608 insertions(+), 220 deletions(-) create mode 100644 crates/reddwarf-runtime/src/brand/bhyve.rs create mode 100644 crates/reddwarf-runtime/src/dns.rs create mode 100644 crates/reddwarf-runtime/src/network/nat.rs create mode 100644 crates/reddwarf-runtime/src/service_controller.rs diff --git a/AUDIT.md b/AUDIT.md index 88c916f..9c179fa 100644 --- a/AUDIT.md +++ b/AUDIT.md @@ -1,6 +1,6 @@ # Reddwarf Production Readiness Audit -**Last updated:** 2026-02-14 +**Last updated:** 2026-03-19 **Baseline commit:** `58171c7` (Add periodic reconciliation, node health checker, and graceful pod termination) --- @@ -43,7 +43,7 @@ |---|---|---| | Self-registration | DONE | Creates Node resource with allocatable CPU/memory | | Periodic heartbeat | DONE | 10-second interval, Ready condition | -| Report zone states | NOT DONE | Heartbeat doesn't query actual zone states | +| Report zone states | DONE | Heartbeat queries `list_zones()`, reports `reddwarf.io/zone-count` and `reddwarf.io/zone-summary` annotations | | Dynamic resource reporting | DONE | `sysinfo.rs` — detects CPU/memory via `sys-info`, capacity vs allocatable split with configurable reservations (`--system-reserved-cpu`, `--system-reserved-memory`, `--max-pods`). Done in `d3eb0b2` | ## 5. Main Binary @@ -62,9 +62,9 @@ |---|---|---| | Etherstub creation | DONE | `dladm create-etherstub` | | VNIC per zone | DONE | `dladm create-vnic -l etherstub` | -| ipadm IP assignment | PARTIAL | IP set in zonecfg `allowed-address` but no explicit `ipadm create-addr` call | +| ipadm IP assignment | DONE | `configure_zone_ip()` runs `ipadm create-if`, `ipadm create-addr`, `route add default` inside zone via `zlogin` after boot | | IPAM | DONE | Sequential alloc, idempotent, persistent, pool exhaustion handling | -| Service ClusterIP / NAT | NOT DONE | Services stored at API level but no backend controller, no ipnat rules, no proxy, no DNS | +| Service ClusterIP / NAT | DONE | ClusterIP IPAM allocation on service create/delete, `ServiceController` watches events + periodic reconcile, `NatManager` generates ipnat rdr rules, embedded `DnsServer` resolves `{svc}.{ns}.svc.cluster.local` → ClusterIP | ## 7. Scheduler @@ -88,7 +88,7 @@ - [x] Graceful pod termination — done in `58171c7` ### Medium (limits functionality) -- [ ] Service networking — no ClusterIP, no NAT/proxy, no DNS +- [x] Service networking — ClusterIP IPAM, ServiceController endpoint tracking, ipnat NAT rules, embedded DNS server - [x] Health probes — exec/HTTP/TCP liveness/readiness/startup probes via zlogin - [ ] Image management — no pull/registry, no `.zar` support, no golden image bootstrap - [x] Dynamic node resources — done in `d3eb0b2` @@ -96,4 +96,4 @@ ### Low (nice to have) - [x] Zone brand scheduling filter — done in `4c7f50a` - [x] ShuttingDown to Terminating mapping fix — done in `58171c7` -- [ ] bhyve brand — type exists but no implementation +- [x] bhyve brand — `ZoneBrand::Bhyve`, install args, zonecfg device generation, controller brand selection diff --git a/Cargo.lock b/Cargo.lock index 95c8c3e..7e47eb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1495,6 +1495,7 @@ dependencies = [ "miette", "rcgen", "reddwarf-core", + "reddwarf-runtime", "reddwarf-storage", "reddwarf-versioning", "rustls", diff --git a/crates/reddwarf-apiserver/Cargo.toml b/crates/reddwarf-apiserver/Cargo.toml index 7b45baa..9fb0124 100644 --- a/crates/reddwarf-apiserver/Cargo.toml +++ b/crates/reddwarf-apiserver/Cargo.toml @@ -9,6 +9,7 @@ rust-version.workspace = true [dependencies] reddwarf-core = { workspace = true } +reddwarf-runtime = { workspace = true } reddwarf-storage = { workspace = true } reddwarf-versioning = { workspace = true } axum = { workspace = true } diff --git a/crates/reddwarf-apiserver/src/handlers/pods.rs b/crates/reddwarf-apiserver/src/handlers/pods.rs index 7b970a3..da6aaf2 100644 --- a/crates/reddwarf-apiserver/src/handlers/pods.rs +++ b/crates/reddwarf-apiserver/src/handlers/pods.rs @@ -374,9 +374,7 @@ mod tests { // Set deletion metadata pod.metadata.deletion_timestamp = Some( - reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( - chrono::Utc::now(), - ), + reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()), ); pod.metadata.deletion_grace_period_seconds = Some(30); let status = pod.status.get_or_insert_with(Default::default); @@ -410,14 +408,10 @@ mod tests { // First delete: set deletion_timestamp let mut pod: Pod = get_resource(&*state, &key).await.unwrap(); pod.metadata.deletion_timestamp = Some( - reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( - chrono::Utc::now(), - ), + reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()), ); pod.metadata.deletion_grace_period_seconds = Some(30); - pod.status - .get_or_insert_with(Default::default) - .phase = Some("Terminating".to_string()); + pod.status.get_or_insert_with(Default::default).phase = Some("Terminating".to_string()); update_resource(&*state, pod).await.unwrap(); // Second "delete" attempt should see deletion_timestamp is already set @@ -467,13 +461,9 @@ mod tests { let mut pod: Pod = get_resource(&*state, &key).await.unwrap(); pod.metadata.deletion_timestamp = Some( - reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( - chrono::Utc::now(), - ), + reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()), ); - pod.status - .get_or_insert_with(Default::default) - .phase = Some("Terminating".to_string()); + pod.status.get_or_insert_with(Default::default).phase = Some("Terminating".to_string()); update_resource(&*state, pod).await.unwrap(); // Should get a MODIFIED event diff --git a/crates/reddwarf-apiserver/src/handlers/services.rs b/crates/reddwarf-apiserver/src/handlers/services.rs index 933c625..68359aa 100644 --- a/crates/reddwarf-apiserver/src/handlers/services.rs +++ b/crates/reddwarf-apiserver/src/handlers/services.rs @@ -53,9 +53,41 @@ pub async fn create_service( ) -> Result { info!("Creating service in namespace: {}", namespace); - service.metadata.namespace = Some(namespace); + service.metadata.namespace = Some(namespace.clone()); validate_resource(&service)?; + // Allocate a ClusterIP if service IPAM is configured and no ClusterIP is set + if let Some(ref ipam) = state.service_ipam { + let name = service.metadata.name.as_deref().unwrap_or("unknown"); + let has_cluster_ip = service + .spec + .as_ref() + .and_then(|s| s.cluster_ip.as_ref()) + .is_some_and(|ip| !ip.is_empty() && ip != "None"); + + if !has_cluster_ip { + match ipam.allocate(&namespace, name) { + Ok(alloc) => { + let spec = service.spec.get_or_insert_with(Default::default); + spec.cluster_ip = Some(alloc.ip_address.to_string()); + spec.cluster_ips = Some(vec![alloc.ip_address.to_string()]); + info!( + "Allocated ClusterIP {} for service {}/{}", + alloc.ip_address, namespace, name + ); + } + Err(e) => { + tracing::warn!( + "Failed to allocate ClusterIP for {}/{}: {}", + namespace, + name, + e + ); + } + } + } + } + let created = create_resource(&state, service).await?; Ok(ApiResponse::created(created).into_response()) @@ -86,9 +118,21 @@ pub async fn delete_service( info!("Deleting service: {}/{}", namespace, name); let gvk = GroupVersionKind::from_api_version_kind("v1", "Service"); - let key = ResourceKey::new(gvk, namespace, name.clone()); + let key = ResourceKey::new(gvk, namespace.clone(), name.clone()); delete_resource(&state, &key).await?; + // Release ClusterIP allocation + if let Some(ref ipam) = state.service_ipam { + if let Err(e) = ipam.release(&namespace, &name) { + tracing::warn!( + "Failed to release ClusterIP for {}/{}: {}", + namespace, + name, + e + ); + } + } + Ok(status_deleted(&name, "Service")) } diff --git a/crates/reddwarf-apiserver/src/server.rs b/crates/reddwarf-apiserver/src/server.rs index 18c76a1..f5326d3 100644 --- a/crates/reddwarf-apiserver/src/server.rs +++ b/crates/reddwarf-apiserver/src/server.rs @@ -134,26 +134,20 @@ impl ApiServer { .await } Some(material) => { - info!( - "Starting API server on {} (HTTPS)", - self.config.listen_addr - ); + info!("Starting API server on {} (HTTPS)", self.config.listen_addr); let rustls_config = axum_server::tls_rustls::RustlsConfig::from_pem( material.cert_pem, material.key_pem, ) .await - .map_err(|e| { - std::io::Error::other(format!("failed to build RustlsConfig: {e}")) - })?; + .map_err(|e| std::io::Error::other(format!("failed to build RustlsConfig: {e}")))?; let handle = axum_server::Handle::new(); let shutdown_handle = handle.clone(); tokio::spawn(async move { token.cancelled().await; - shutdown_handle - .graceful_shutdown(Some(std::time::Duration::from_secs(10))); + shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10))); }); axum_server::bind_rustls(self.config.listen_addr, rustls_config) diff --git a/crates/reddwarf-apiserver/src/state.rs b/crates/reddwarf-apiserver/src/state.rs index 60f05e7..a6101d1 100644 --- a/crates/reddwarf-apiserver/src/state.rs +++ b/crates/reddwarf-apiserver/src/state.rs @@ -1,4 +1,5 @@ use crate::event_bus::{EventBusConfig, ResourceEvent}; +use reddwarf_runtime::Ipam; use reddwarf_storage::RedbBackend; use reddwarf_versioning::VersionStore; use std::sync::Arc; @@ -15,6 +16,9 @@ pub struct AppState { /// Event bus sender — broadcast channel for resource mutation events pub event_tx: broadcast::Sender, + + /// Service CIDR IPAM for ClusterIP allocation (None if not configured) + pub service_ipam: Option>, } impl AppState { @@ -34,6 +38,7 @@ impl AppState { storage, version_store, event_tx, + service_ipam: None, } } diff --git a/crates/reddwarf-apiserver/src/tls.rs b/crates/reddwarf-apiserver/src/tls.rs index 03b1b6e..f75ec40 100644 --- a/crates/reddwarf-apiserver/src/tls.rs +++ b/crates/reddwarf-apiserver/src/tls.rs @@ -46,7 +46,10 @@ pub fn resolve_tls(mode: &TlsMode) -> miette::Result> { let key_path = data_dir.join("server-key.pem"); if ca_path.exists() && cert_path.exists() && key_path.exists() { - info!("Loading existing TLS certificates from {}", data_dir.display()); + info!( + "Loading existing TLS certificates from {}", + data_dir.display() + ); let ca_pem = std::fs::read(&ca_path) .into_diagnostic() .wrap_err_with(|| format!("failed to read CA cert at {}", ca_path.display()))?; @@ -80,9 +83,7 @@ pub fn resolve_tls(mode: &TlsMode) -> miette::Result> { } => { let cert_pem = std::fs::read(cert_path) .into_diagnostic() - .wrap_err_with(|| { - format!("failed to read TLS cert at {}", cert_path.display()) - })?; + .wrap_err_with(|| format!("failed to read TLS cert at {}", cert_path.display()))?; let key_pem = std::fs::read(key_path) .into_diagnostic() .wrap_err_with(|| format!("failed to read TLS key at {}", key_path.display()))?; @@ -103,7 +104,9 @@ fn generate_self_signed(data_dir: &Path, san_entries: &[String]) -> miette::Resu .wrap_err_with(|| format!("failed to create TLS directory {}", data_dir.display()))?; // --- CA --- - let ca_key = KeyPair::generate().into_diagnostic().wrap_err("failed to generate CA key pair")?; + let ca_key = KeyPair::generate() + .into_diagnostic() + .wrap_err("failed to generate CA key pair")?; let mut ca_params = CertificateParams::new(vec!["Reddwarf CA".to_string()]) .into_diagnostic() @@ -177,7 +180,9 @@ mod tests { san_entries: vec!["localhost".to_string(), "127.0.0.1".to_string()], }; - let material = resolve_tls(&mode).unwrap().expect("should produce material"); + let material = resolve_tls(&mode) + .unwrap() + .expect("should produce material"); assert!(!material.cert_pem.is_empty()); assert!(!material.key_pem.is_empty()); @@ -227,7 +232,9 @@ mod tests { key_path: tls_dir.join("server-key.pem"), }; - let material = resolve_tls(&mode).unwrap().expect("should produce material"); + let material = resolve_tls(&mode) + .unwrap() + .expect("should produce material"); assert!(!material.cert_pem.is_empty()); assert!(!material.key_pem.is_empty()); assert!(material.ca_pem.is_none()); diff --git a/crates/reddwarf-core/src/resources/quantities.rs b/crates/reddwarf-core/src/resources/quantities.rs index 45b4a40..b19a8e7 100644 --- a/crates/reddwarf-core/src/resources/quantities.rs +++ b/crates/reddwarf-core/src/resources/quantities.rs @@ -154,10 +154,7 @@ mod tests { "512M" ); // Exact KiB - assert_eq!( - ResourceQuantities::memory_as_zone_cap(256 * 1024), - "256K" - ); + assert_eq!(ResourceQuantities::memory_as_zone_cap(256 * 1024), "256K"); // 1500 MiB = not a clean GiB, falls to MiB assert_eq!( ResourceQuantities::memory_as_zone_cap(1500 * 1024 * 1024), diff --git a/crates/reddwarf-runtime/src/brand/bhyve.rs b/crates/reddwarf-runtime/src/brand/bhyve.rs new file mode 100644 index 0000000..adbf4e1 --- /dev/null +++ b/crates/reddwarf-runtime/src/brand/bhyve.rs @@ -0,0 +1,91 @@ +use crate::error::Result; +use crate::types::ZoneConfig; + +/// Get the install arguments for a bhyve brand zone +/// +/// Bhyve zones use hardware virtualization. If a disk image is specified, +/// it is passed as a boot disk device. +pub fn bhyve_install_args(config: &ZoneConfig) -> Result> { + // bhyve brand zones typically install without extra arguments. + // If a disk image is provided, it can be used as the boot disk. + if let Some(ref _disk_image) = config.bhyve_disk_image { + // The disk device is configured in zonecfg, not install args. + // Standard install is sufficient. + Ok(vec![]) + } else { + Ok(vec![]) + } +} + +/// Generate bhyve-specific zonecfg lines for device passthrough +pub fn bhyve_zonecfg_lines(config: &ZoneConfig) -> Vec { + let mut lines = Vec::new(); + + // Add disk device if specified (typically a ZFS zvol) + if let Some(ref disk_image) = config.bhyve_disk_image { + lines.push("add device".to_string()); + lines.push(format!("set match={}", disk_image)); + lines.push("end".to_string()); + } + + lines +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::*; + + fn make_bhyve_config(disk_image: Option) -> ZoneConfig { + ZoneConfig { + zone_name: "bhyve-test".to_string(), + brand: ZoneBrand::Bhyve, + zonepath: "/zones/bhyve-test".to_string(), + network: NetworkMode::Etherstub(EtherstubConfig { + etherstub_name: "stub0".to_string(), + vnic_name: "vnic0".to_string(), + ip_address: "10.0.0.2".to_string(), + gateway: "10.0.0.1".to_string(), + prefix_len: 16, + }), + storage: ZoneStorageOpts::default(), + lx_image_path: None, + bhyve_disk_image: disk_image, + processes: vec![], + cpu_cap: None, + memory_cap: None, + fs_mounts: vec![], + } + } + + #[test] + fn test_bhyve_install_args_no_disk() { + let config = make_bhyve_config(None); + let args = bhyve_install_args(&config).unwrap(); + assert!(args.is_empty()); + } + + #[test] + fn test_bhyve_install_args_with_disk() { + let config = make_bhyve_config(Some("/dev/zvol/rpool/bhyve-disk".to_string())); + let args = bhyve_install_args(&config).unwrap(); + assert!(args.is_empty()); // disk is in zonecfg, not install args + } + + #[test] + fn test_bhyve_zonecfg_lines_no_disk() { + let config = make_bhyve_config(None); + let lines = bhyve_zonecfg_lines(&config); + assert!(lines.is_empty()); + } + + #[test] + fn test_bhyve_zonecfg_lines_with_disk() { + let config = make_bhyve_config(Some("/dev/zvol/rpool/bhyve-disk".to_string())); + let lines = bhyve_zonecfg_lines(&config); + assert_eq!(lines.len(), 3); + assert!(lines[0].contains("add device")); + assert!(lines[1].contains("/dev/zvol/rpool/bhyve-disk")); + assert!(lines[2].contains("end")); + } +} diff --git a/crates/reddwarf-runtime/src/brand/lx.rs b/crates/reddwarf-runtime/src/brand/lx.rs index e0457a5..b40a4b8 100644 --- a/crates/reddwarf-runtime/src/brand/lx.rs +++ b/crates/reddwarf-runtime/src/brand/lx.rs @@ -32,6 +32,7 @@ mod tests { }), storage: ZoneStorageOpts::default(), lx_image_path: image_path, + bhyve_disk_image: None, processes: vec![], cpu_cap: None, memory_cap: None, diff --git a/crates/reddwarf-runtime/src/brand/mod.rs b/crates/reddwarf-runtime/src/brand/mod.rs index 41fc298..d550e24 100644 --- a/crates/reddwarf-runtime/src/brand/mod.rs +++ b/crates/reddwarf-runtime/src/brand/mod.rs @@ -1,2 +1,3 @@ +pub mod bhyve; pub mod custom; pub mod lx; diff --git a/crates/reddwarf-runtime/src/controller.rs b/crates/reddwarf-runtime/src/controller.rs index da3b5b2..d72ae47 100644 --- a/crates/reddwarf-runtime/src/controller.rs +++ b/crates/reddwarf-runtime/src/controller.rs @@ -291,15 +291,13 @@ impl PodController { let mut tracker = self.probe_tracker.lock().await; tracker.register_pod(&pod_key, probes, started_at); - let status = tracker - .check_pod(&pod_key, &zone_name, &zone_ip) - .await; + let status = tracker.check_pod(&pod_key, &zone_name, &zone_ip).await; drop(tracker); if status.liveness_failed { - let message = status.failure_message.unwrap_or_else(|| { - "Liveness probe failed".to_string() - }); + let message = status + .failure_message + .unwrap_or_else(|| "Liveness probe failed".to_string()); warn!( "Liveness probe failed for pod {}/{}: {}", namespace, pod_name, message @@ -328,9 +326,9 @@ impl PodController { let mut tracker = self.probe_tracker.lock().await; tracker.unregister_pod(&pod_key); } else if !status.ready { - let message = status.failure_message.unwrap_or_else(|| { - "Readiness probe failed".to_string() - }); + let message = status + .failure_message + .unwrap_or_else(|| "Readiness probe failed".to_string()); debug!( "Readiness probe not passing for pod {}/{}: {}", namespace, pod_name, message @@ -639,10 +637,7 @@ impl PodController { // Finalize — remove the pod from API server storage if let Err(e) = self.api_client.finalize_pod(namespace, pod_name).await { - error!( - "Failed to finalize pod {}/{}: {}", - namespace, pod_name, e - ); + error!("Failed to finalize pod {}/{}: {}", namespace, pod_name, e); } else { info!("Pod {}/{} finalized and removed", namespace, pod_name); } @@ -659,10 +654,7 @@ impl PodController { None => return false, }; - let grace_secs = pod - .metadata - .deletion_grace_period_seconds - .unwrap_or(30); + let grace_secs = pod.metadata.deletion_grace_period_seconds.unwrap_or(30); let deadline = deletion_ts + chrono::Duration::seconds(grace_secs); Utc::now() >= deadline @@ -768,6 +760,7 @@ impl PodController { .and_then(|v| match v.as_str() { "lx" => Some(ZoneBrand::Lx), "reddwarf" => Some(ZoneBrand::Reddwarf), + "bhyve" => Some(ZoneBrand::Bhyve), _ => None, }) .unwrap_or_else(|| self.config.default_brand.clone()); @@ -779,6 +772,7 @@ impl PodController { network, storage: ZoneStorageOpts::default(), lx_image_path: None, + bhyve_disk_image: None, processes, cpu_cap, memory_cap, @@ -801,10 +795,7 @@ impl PodController { None => return vec![], }; - spec.containers - .iter() - .flat_map(|c| extract_probes(c)) - .collect() + spec.containers.iter().flat_map(extract_probes).collect() } /// Get the pod's IP from its status, falling back to empty string @@ -820,16 +811,14 @@ impl PodController { fn pod_start_time(&self, pod: &Pod) -> Instant { // We can't convert k8s Time to std Instant directly. Instead, compute // the elapsed duration since start_time and subtract from Instant::now(). - if let Some(start_time) = pod - .status - .as_ref() - .and_then(|s| s.start_time.as_ref()) - { + if let Some(start_time) = pod.status.as_ref().and_then(|s| s.start_time.as_ref()) { let now_utc = Utc::now(); let started_utc = start_time.0; let elapsed = now_utc.signed_duration_since(started_utc); if let Ok(elapsed_std) = elapsed.to_std() { - return Instant::now().checked_sub(elapsed_std).unwrap_or_else(Instant::now); + return Instant::now() + .checked_sub(elapsed_std) + .unwrap_or_else(Instant::now); } } Instant::now() @@ -1183,11 +1172,10 @@ mod tests { let mut pod = Pod::default(); pod.metadata.name = Some("expired-pod".to_string()); // Set deletion_timestamp 60 seconds in the past - pod.metadata.deletion_timestamp = Some( - k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( + pod.metadata.deletion_timestamp = + Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( Utc::now() - chrono::Duration::seconds(60), - ), - ); + )); pod.metadata.deletion_grace_period_seconds = Some(30); assert!(controller.is_grace_period_expired(&pod)); @@ -1344,7 +1332,11 @@ mod tests { assert_eq!(zone_config.brand, ZoneBrand::Reddwarf); } - fn make_test_controller_with_runtime() -> (PodController, Arc, tempfile::TempDir) { + fn make_test_controller_with_runtime() -> ( + PodController, + Arc, + tempfile::TempDir, + ) { let dir = tempdir().unwrap(); let db_path = dir.path().join("test-controller-rt.redb"); let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); @@ -1367,7 +1359,13 @@ mod tests { reconcile_interval: Duration::from_secs(30), }; - let controller = PodController::new(runtime.clone() as Arc, api_client, event_tx, config, ipam); + let controller = PodController::new( + runtime.clone() as Arc, + api_client, + event_tx, + config, + ipam, + ); (controller, runtime, dir) } @@ -1473,9 +1471,7 @@ mod tests { // Verify the liveness failure path was taken: probes should be unregistered let pod_key = "default/liveness-pod"; let mut tracker = controller.probe_tracker.lock().await; - let status = tracker - .check_pod(pod_key, &zone_name, "10.88.0.2") - .await; + let status = tracker.check_pod(pod_key, &zone_name, "10.88.0.2").await; // No probes registered → default status (ready=true, liveness_failed=false) // This confirms the unregister happened, which only occurs on liveness failure assert!(status.ready); diff --git a/crates/reddwarf-runtime/src/dns.rs b/crates/reddwarf-runtime/src/dns.rs new file mode 100644 index 0000000..1166cef --- /dev/null +++ b/crates/reddwarf-runtime/src/dns.rs @@ -0,0 +1,461 @@ +use reddwarf_storage::{KVStore, KeyEncoder, RedbBackend}; +use std::collections::HashMap; +use std::net::Ipv4Addr; +use std::sync::{Arc, RwLock}; +use tokio::net::UdpSocket; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +/// Configuration for the embedded DNS server +#[derive(Debug, Clone)] +pub struct DnsServerConfig { + /// Address to listen on (e.g. "10.96.0.10:53" or "0.0.0.0:10053") + pub listen_addr: String, + /// Cluster domain suffix + pub cluster_domain: String, + /// How often to refresh the service map from storage (seconds) + pub refresh_interval_secs: u64, +} + +impl Default for DnsServerConfig { + fn default() -> Self { + Self { + listen_addr: "0.0.0.0:10053".to_string(), + cluster_domain: "cluster.local".to_string(), + refresh_interval_secs: 5, + } + } +} + +/// A minimal DNS server that resolves `{service}.{namespace}.svc.cluster.local` +/// to the corresponding ClusterIP by reading services from storage. +pub struct DnsServer { + storage: Arc, + config: DnsServerConfig, + /// Cache of service name -> ClusterIP, keyed by "{name}.{namespace}" + records: Arc>>, +} + +impl DnsServer { + pub fn new(storage: Arc, config: DnsServerConfig) -> Self { + Self { + storage, + config, + records: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Run the DNS server + pub async fn run(&self, token: CancellationToken) -> crate::error::Result<()> { + // Refresh records before starting + self.refresh_records(); + + let socket = UdpSocket::bind(&self.config.listen_addr) + .await + .map_err(|e| { + crate::error::RuntimeError::network_error(format!( + "Failed to bind DNS server on {}: {}", + self.config.listen_addr, e + )) + })?; + + info!("DNS server listening on {}", self.config.listen_addr); + + let mut buf = [0u8; 512]; + let mut refresh_interval = tokio::time::interval(std::time::Duration::from_secs( + self.config.refresh_interval_secs, + )); + refresh_interval.tick().await; // consume first tick + + loop { + tokio::select! { + _ = token.cancelled() => { + info!("DNS server shutting down"); + return Ok(()); + } + _ = refresh_interval.tick() => { + self.refresh_records(); + } + result = socket.recv_from(&mut buf) => { + match result { + Ok((len, src)) => { + let query = &buf[..len]; + match self.handle_query(query) { + Some(response) => { + if let Err(e) = socket.send_to(&response, src).await { + warn!("Failed to send DNS response to {}: {}", src, e); + } + } + None => { + // Send NXDOMAIN / SERVFAIL + if let Some(response) = build_nxdomain(query) { + let _ = socket.send_to(&response, src).await; + } + } + } + } + Err(e) => { + warn!("DNS recv error: {}", e); + } + } + } + } + } + } + + /// Refresh the service -> ClusterIP map from storage + fn refresh_records(&self) { + let prefix = KeyEncoder::encode_prefix("v1", "Service", None); + let entries = match self.storage.scan(prefix.as_bytes()) { + Ok(entries) => entries, + Err(e) => { + error!("DNS: failed to scan services: {}", e); + return; + } + }; + + let mut new_records = HashMap::new(); + + for (_key, value) in entries { + if let Ok(svc) = serde_json::from_slice::(&value) { + let name = match svc.metadata.name.as_deref() { + Some(n) => n, + None => continue, + }; + let namespace = svc.metadata.namespace.as_deref().unwrap_or("default"); + let cluster_ip = match svc.spec.as_ref().and_then(|s| s.cluster_ip.as_ref()) { + Some(ip) if !ip.is_empty() && ip != "None" => ip, + _ => continue, + }; + + if let Ok(ip) = cluster_ip.parse::() { + let key = format!("{}.{}", name, namespace); + new_records.insert(key, ip); + } + } + } + + debug!("DNS: refreshed {} service records", new_records.len()); + + if let Ok(mut records) = self.records.write() { + *records = new_records; + } + } + + /// Handle a DNS query, returning a response if we can answer it + fn handle_query(&self, query: &[u8]) -> Option> { + // Minimum DNS header is 12 bytes + if query.len() < 12 { + return None; + } + + // Parse the question name + let (qname, qname_end) = parse_dns_name(query, 12)?; + + // Verify we have enough bytes for qtype + qclass (4 bytes after name) + if qname_end + 4 > query.len() { + return None; + } + + let qtype = u16::from_be_bytes([query[qname_end], query[qname_end + 1]]); + // Only handle A records (type 1) + if qtype != 1 { + return None; + } + + // Try to resolve: expect format "{service}.{namespace}.svc.{cluster_domain}" + let suffix = format!(".svc.{}", self.config.cluster_domain); + let name_lower = qname.to_lowercase(); + let without_suffix = name_lower.strip_suffix(&suffix)?; + + // Split into service.namespace + let parts: Vec<&str> = without_suffix.splitn(2, '.').collect(); + if parts.len() != 2 { + return None; + } + let (svc_name, ns_name) = (parts[0], parts[1]); + let lookup_key = format!("{}.{}", svc_name, ns_name); + + let ip = { + let records = self.records.read().ok()?; + records.get(&lookup_key).copied()? + }; + + debug!("DNS: resolved {} -> {}", qname, ip); + + // Build response + Some(build_a_response(query, qname_end + 4, ip)) + } +} + +/// Parse a DNS wire-format name starting at `offset` in `buf`. +/// Returns (dotted-name, offset-after-name). +fn parse_dns_name(buf: &[u8], mut offset: usize) -> Option<(String, usize)> { + let mut parts = Vec::new(); + let mut total_len = 0usize; + + loop { + if offset >= buf.len() { + return None; + } + + let len = buf[offset] as usize; + if len == 0 { + offset += 1; + break; + } + + // Check for compression pointer (top 2 bits set) + if len & 0xC0 == 0xC0 { + // We don't follow compression pointers for simplicity + // (queries usually don't use them) + return None; + } + + offset += 1; + if offset + len > buf.len() { + return None; + } + + total_len += len + 1; + if total_len > 253 { + return None; // Name too long + } + + let label = std::str::from_utf8(&buf[offset..offset + len]).ok()?; + parts.push(label.to_string()); + offset += len; + } + + if parts.is_empty() { + return None; + } + + Some((parts.join("."), offset)) +} + +/// Build a DNS A record response +fn build_a_response(query: &[u8], question_end: usize, ip: Ipv4Addr) -> Vec { + let mut resp = Vec::with_capacity(question_end + 16); + + // Copy header + resp.extend_from_slice(&query[..2]); // Transaction ID + + // Flags: QR=1 (response), AA=1 (authoritative), RA=1 (recursion available) + resp.push(0x84); // QR=1, Opcode=0, AA=1, TC=0, RD=0 + resp.push(0x00); // RA=0, Z=0, RCODE=0 (no error) + + // QDCOUNT = 1 + resp.push(0x00); + resp.push(0x01); + // ANCOUNT = 1 + resp.push(0x00); + resp.push(0x01); + // NSCOUNT = 0 + resp.push(0x00); + resp.push(0x00); + // ARCOUNT = 0 + resp.push(0x00); + resp.push(0x00); + + // Copy question section + resp.extend_from_slice(&query[12..question_end]); + + // Answer section: pointer to name in question + resp.push(0xC0); // Compression pointer + resp.push(0x0C); // Offset 12 (start of question name) + + // Type A + resp.push(0x00); + resp.push(0x01); + // Class IN + resp.push(0x00); + resp.push(0x01); + // TTL = 30 seconds + resp.push(0x00); + resp.push(0x00); + resp.push(0x00); + resp.push(0x1E); + // RDLENGTH = 4 + resp.push(0x00); + resp.push(0x04); + // RDATA: IPv4 address + let octets = ip.octets(); + resp.extend_from_slice(&octets); + + resp +} + +/// Build an NXDOMAIN response for unresolvable queries +fn build_nxdomain(query: &[u8]) -> Option> { + if query.len() < 12 { + return None; + } + + let mut resp = Vec::with_capacity(query.len()); + + // Copy transaction ID + resp.extend_from_slice(&query[..2]); + + // Flags: QR=1, AA=1, RCODE=3 (NXDOMAIN) + resp.push(0x84); + resp.push(0x03); + + // QDCOUNT = 1, ANCOUNT = 0, NSCOUNT = 0, ARCOUNT = 0 + resp.push(0x00); + resp.push(0x01); + resp.push(0x00); + resp.push(0x00); + resp.push(0x00); + resp.push(0x00); + resp.push(0x00); + resp.push(0x00); + + // Copy question section from original query + resp.extend_from_slice(&query[12..]); + + Some(resp) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Build a DNS query for a given name (A record, IN class) + fn build_dns_query(name: &str) -> Vec { + let mut buf = Vec::new(); + + // Header + buf.extend_from_slice(&[0xAB, 0xCD]); // Transaction ID + buf.extend_from_slice(&[0x01, 0x00]); // Flags: RD=1 + buf.extend_from_slice(&[0x00, 0x01]); // QDCOUNT=1 + buf.extend_from_slice(&[0x00, 0x00]); // ANCOUNT=0 + buf.extend_from_slice(&[0x00, 0x00]); // NSCOUNT=0 + buf.extend_from_slice(&[0x00, 0x00]); // ARCOUNT=0 + + // Question: encode name + for label in name.split('.') { + buf.push(label.len() as u8); + buf.extend_from_slice(label.as_bytes()); + } + buf.push(0x00); // End of name + + // Type A (1) + buf.extend_from_slice(&[0x00, 0x01]); + // Class IN (1) + buf.extend_from_slice(&[0x00, 0x01]); + + buf + } + + #[test] + fn test_parse_dns_name() { + let query = build_dns_query("my-svc.default.svc.cluster.local"); + let (name, end) = parse_dns_name(&query, 12).unwrap(); + assert_eq!(name, "my-svc.default.svc.cluster.local"); + // After name: 1 + 6 + 1 + 7 + 1 + 3 + 1 + 7 + 1 + 5 + 1 = label data + null + assert!(end > 12); + } + + #[test] + fn test_build_a_response() { + let query = build_dns_query("my-svc.default.svc.cluster.local"); + let (_, qname_end) = parse_dns_name(&query, 12).unwrap(); + let question_end = qname_end + 4; + let ip = Ipv4Addr::new(10, 96, 0, 1); + + let resp = build_a_response(&query, question_end, ip); + + // Check transaction ID preserved + assert_eq!(resp[0], 0xAB); + assert_eq!(resp[1], 0xCD); + // Check QR=1 in flags + assert!(resp[2] & 0x80 != 0); + // Check ANCOUNT=1 + assert_eq!(resp[6], 0x00); + assert_eq!(resp[7], 0x01); + // Check IP is in last 4 bytes + let len = resp.len(); + assert_eq!(resp[len - 4], 10); + assert_eq!(resp[len - 3], 96); + assert_eq!(resp[len - 2], 0); + assert_eq!(resp[len - 1], 1); + } + + #[test] + fn test_build_nxdomain() { + let query = build_dns_query("nonexistent.default.svc.cluster.local"); + let resp = build_nxdomain(&query).unwrap(); + + // Check transaction ID preserved + assert_eq!(resp[0], 0xAB); + assert_eq!(resp[1], 0xCD); + // Check RCODE=3 (NXDOMAIN) + assert_eq!(resp[3] & 0x0F, 3); + } + + #[test] + fn test_handle_query_resolves() { + let storage = Arc::new( + reddwarf_storage::RedbBackend::new( + &tempfile::tempdir().unwrap().path().join("dns-test.redb"), + ) + .unwrap(), + ); + let dns = DnsServer::new(storage, DnsServerConfig::default()); + + // Manually insert a record + { + let mut records = dns.records.write().unwrap(); + records.insert("my-svc.default".to_string(), Ipv4Addr::new(10, 96, 0, 1)); + } + + let query = build_dns_query("my-svc.default.svc.cluster.local"); + let resp = dns.handle_query(&query); + assert!(resp.is_some()); + + let resp = resp.unwrap(); + let len = resp.len(); + // Check the IP in the response + assert_eq!(resp[len - 4], 10); + assert_eq!(resp[len - 3], 96); + assert_eq!(resp[len - 2], 0); + assert_eq!(resp[len - 1], 1); + } + + #[test] + fn test_handle_query_nxdomain() { + let storage = Arc::new( + reddwarf_storage::RedbBackend::new( + &tempfile::tempdir().unwrap().path().join("dns-test2.redb"), + ) + .unwrap(), + ); + let dns = DnsServer::new(storage, DnsServerConfig::default()); + + let query = build_dns_query("nonexistent.default.svc.cluster.local"); + let resp = dns.handle_query(&query); + assert!(resp.is_none()); // No record found + } + + #[test] + fn test_handle_query_wrong_domain() { + let storage = Arc::new( + reddwarf_storage::RedbBackend::new( + &tempfile::tempdir().unwrap().path().join("dns-test3.redb"), + ) + .unwrap(), + ); + let dns = DnsServer::new(storage, DnsServerConfig::default()); + + let query = build_dns_query("google.com"); + let resp = dns.handle_query(&query); + assert!(resp.is_none()); + } + + #[test] + fn test_parse_short_query() { + // Too short for DNS header + assert!(parse_dns_name(&[0; 5], 12).is_none()); + } +} diff --git a/crates/reddwarf-runtime/src/error.rs b/crates/reddwarf-runtime/src/error.rs index d29a478..91ca3d8 100644 --- a/crates/reddwarf-runtime/src/error.rs +++ b/crates/reddwarf-runtime/src/error.rs @@ -156,7 +156,9 @@ pub enum RuntimeError { }, /// Health probe failed - #[error("Health probe failed for container '{container_name}' in zone '{zone_name}': {message}")] + #[error( + "Health probe failed for container '{container_name}' in zone '{zone_name}': {message}" + )] #[diagnostic( code(reddwarf::runtime::probe_failed), help("Check that the probe target is reachable inside the zone. Failure count: {failure_count}/{failure_threshold}. Verify the application is running and the probe command/port/path is correct") diff --git a/crates/reddwarf-runtime/src/illumos.rs b/crates/reddwarf-runtime/src/illumos.rs index 87e2b2a..e05d34b 100644 --- a/crates/reddwarf-runtime/src/illumos.rs +++ b/crates/reddwarf-runtime/src/illumos.rs @@ -1,3 +1,4 @@ +use crate::brand::bhyve::bhyve_install_args; use crate::brand::lx::lx_install_args; use crate::command::{exec, CommandOutput}; use crate::error::Result; @@ -159,6 +160,74 @@ impl ZoneRuntime for IllumosRuntime { Ok(()) } + async fn configure_zone_ip(&self, zone_name: &str, network: &NetworkMode) -> Result<()> { + let (vnic_name, ip_address, prefix_len, gateway) = match network { + NetworkMode::Etherstub(cfg) => ( + &cfg.vnic_name, + &cfg.ip_address, + cfg.prefix_len, + &cfg.gateway, + ), + NetworkMode::Direct(cfg) => ( + &cfg.vnic_name, + &cfg.ip_address, + cfg.prefix_len, + &cfg.gateway, + ), + }; + + info!( + "Configuring IP {} on {} in zone {}", + ip_address, vnic_name, zone_name + ); + + // Create the IP interface + self.exec_in_zone( + zone_name, + &[ + "ipadm".to_string(), + "create-if".to_string(), + "-t".to_string(), + vnic_name.clone(), + ], + ) + .await + .map_err(|e| RuntimeError::network_error(format!("ipadm create-if failed: {}", e)))?; + + // Assign a static IP address + self.exec_in_zone( + zone_name, + &[ + "ipadm".to_string(), + "create-addr".to_string(), + "-T".to_string(), + "static".to_string(), + "-a".to_string(), + format!("{}/{}", ip_address, prefix_len), + format!("{}/v4", vnic_name), + ], + ) + .await + .map_err(|e| RuntimeError::network_error(format!("ipadm create-addr failed: {}", e)))?; + + // Add default route + self.exec_in_zone( + zone_name, + &[ + "route".to_string(), + "-p".to_string(), + "add".to_string(), + "default".to_string(), + gateway.clone(), + ], + ) + .await + .map_err(|e| RuntimeError::network_error(format!("route add default failed: {}", e)))?; + + info!("IP configuration complete for zone: {}", zone_name); + Ok(()) + } + async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()> { info!("Tearing down network for zone: {}", zone_name); @@ -183,19 +252,35 @@ impl ZoneRuntime for IllumosRuntime { .await?; self.create_zone(config).await?; - // LX brand needs image path for install - if config.brand == ZoneBrand::Lx { - let args = lx_install_args(config)?; - let mut cmd_args = vec!["-z", &config.zone_name, "install"]; - let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - cmd_args.extend(str_args); - exec("zoneadm", &cmd_args).await?; - } else { - self.install_zone(&config.zone_name).await?; + // Brand-specific install + match config.brand { + ZoneBrand::Lx => { + let args = lx_install_args(config)?; + let mut cmd_args = vec!["-z", &config.zone_name, "install"]; + let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + cmd_args.extend(str_args); + exec("zoneadm", &cmd_args).await?; + } + ZoneBrand::Bhyve => { + let args = bhyve_install_args(config)?; + let mut cmd_args = vec!["-z", &config.zone_name, "install"]; + let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + cmd_args.extend(str_args); + exec("zoneadm", &cmd_args).await?; + } + ZoneBrand::Reddwarf => { + self.install_zone(&config.zone_name).await?; + } } self.boot_zone(&config.zone_name).await?; + // Brief pause to let the zone finish booting before configuring IP + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + self.configure_zone_ip(&config.zone_name, &config.network) + .await?; + info!("Zone provisioned: {}", config.zone_name); Ok(()) } diff --git a/crates/reddwarf-runtime/src/lib.rs b/crates/reddwarf-runtime/src/lib.rs index dfee7bf..582736c 100644 --- a/crates/reddwarf-runtime/src/lib.rs +++ b/crates/reddwarf-runtime/src/lib.rs @@ -5,14 +5,16 @@ pub mod api_client; pub mod brand; pub mod command; pub mod controller; +pub mod dns; pub mod error; #[cfg(target_os = "illumos")] pub mod illumos; pub mod mock; pub mod network; pub mod node_agent; -pub mod probes; pub mod node_health; +pub mod probes; +pub mod service_controller; pub mod storage; pub mod sysinfo; pub mod traits; @@ -20,9 +22,11 @@ pub mod types; pub mod zone; // Re-export primary types +pub use dns::{DnsServer, DnsServerConfig}; pub use error::{Result, RuntimeError}; pub use mock::MockRuntime; -pub use network::{CidrConfig, IpAllocation, Ipam}; +pub use network::{CidrConfig, IpAllocation, Ipam, NatManager}; +pub use service_controller::{ServiceController, ServiceControllerConfig}; pub use traits::ZoneRuntime; pub use types::{ ContainerProcess, DirectNicConfig, EtherstubConfig, FsMount, NetworkMode, StoragePoolConfig, diff --git a/crates/reddwarf-runtime/src/mock.rs b/crates/reddwarf-runtime/src/mock.rs index 52fe03b..743e7eb 100644 --- a/crates/reddwarf-runtime/src/mock.rs +++ b/crates/reddwarf-runtime/src/mock.rs @@ -275,6 +275,11 @@ impl ZoneRuntime for MockRuntime { Ok(()) } + async fn configure_zone_ip(&self, zone_name: &str, _network: &NetworkMode) -> Result<()> { + debug!("Mock: IP configured for zone: {}", zone_name); + Ok(()) + } + async fn teardown_network(&self, zone_name: &str, _network: &NetworkMode) -> Result<()> { debug!("Mock: network teardown for zone: {}", zone_name); Ok(()) @@ -289,6 +294,8 @@ impl ZoneRuntime for MockRuntime { self.create_zone(config).await?; self.install_zone(&config.zone_name).await?; self.boot_zone(&config.zone_name).await?; + self.configure_zone_ip(&config.zone_name, &config.network) + .await?; Ok(()) } @@ -356,6 +363,7 @@ mod tests { }), storage: ZoneStorageOpts::default(), lx_image_path: None, + bhyve_disk_image: None, processes: vec![], cpu_cap: None, memory_cap: None, @@ -384,9 +392,7 @@ mod tests { rt.install_zone("stopped-zone").await.unwrap(); // Zone is Installed, not Running - let result = rt - .exec_in_zone("stopped-zone", &["echo".to_string()]) - .await; + let result = rt.exec_in_zone("stopped-zone", &["echo".to_string()]).await; assert!(result.is_err()); } @@ -394,9 +400,7 @@ mod tests { async fn test_exec_in_zone_not_found_errors() { let rt = MockRuntime::new(make_test_storage()); - let result = rt - .exec_in_zone("nonexistent", &["echo".to_string()]) - .await; + let result = rt.exec_in_zone("nonexistent", &["echo".to_string()]).await; assert!(matches!( result.unwrap_err(), RuntimeError::ZoneNotFound { .. } diff --git a/crates/reddwarf-runtime/src/network/ipam.rs b/crates/reddwarf-runtime/src/network/ipam.rs index 18eaac9..6141ae4 100644 --- a/crates/reddwarf-runtime/src/network/ipam.rs +++ b/crates/reddwarf-runtime/src/network/ipam.rs @@ -30,47 +30,65 @@ pub struct IpAllocation { /// IPAM (IP Address Management) backed by a KVStore /// -/// Storage keys: -/// - `ipam/_cidr` → the CIDR string (e.g. "10.88.0.0/16") -/// - `ipam/alloc/{ip}` → `"{namespace}/{pod_name}"` +/// Storage keys (with default "ipam" prefix): +/// - `{prefix}/_cidr` → the CIDR string (e.g. "10.88.0.0/16") +/// - `{prefix}/alloc/{ip}` → `"{namespace}/{name}"` pub struct Ipam { storage: Arc, cidr: CidrConfig, + /// Storage key for CIDR config + #[allow(dead_code)] + cidr_key: Vec, + /// Storage key prefix for allocations + alloc_prefix: Vec, } -const IPAM_CIDR_KEY: &[u8] = b"ipam/_cidr"; -const IPAM_ALLOC_PREFIX: &[u8] = b"ipam/alloc/"; - impl Ipam { - /// Create a new IPAM instance, persisting the CIDR config + /// Create a new IPAM instance with default "ipam" prefix, persisting the CIDR config pub fn new(storage: Arc, cidr_str: &str) -> Result { - let cidr = parse_cidr(cidr_str)?; - - // Persist the CIDR configuration - storage.put(IPAM_CIDR_KEY, cidr_str.as_bytes())?; - - debug!( - "IPAM initialized: network={}, gateway={}, first_host={}, broadcast={}, prefix_len={}", - cidr.network, cidr.gateway, cidr.first_host, cidr.broadcast, cidr.prefix_len - ); - - Ok(Self { storage, cidr }) + Self::with_prefix(storage, cidr_str, "ipam") } - /// Allocate an IP for a pod. Idempotent: returns existing allocation if one exists. - pub fn allocate(&self, namespace: &str, pod_name: &str) -> Result { - let pod_key = format!("{}/{}", namespace, pod_name); + /// Create a new IPAM instance with a custom storage key prefix + pub fn with_prefix(storage: Arc, cidr_str: &str, prefix: &str) -> Result { + let cidr = parse_cidr(cidr_str)?; - // Check if this pod already has an allocation - let allocations = self.storage.scan(IPAM_ALLOC_PREFIX)?; + let cidr_key = format!("{}/_cidr", prefix).into_bytes(); + let alloc_prefix = format!("{}/alloc/", prefix).into_bytes(); + + // Persist the CIDR configuration + storage.put(&cidr_key, cidr_str.as_bytes())?; + + debug!( + "IPAM ({}) initialized: network={}, gateway={}, first_host={}, broadcast={}, prefix_len={}", + prefix, cidr.network, cidr.gateway, cidr.first_host, cidr.broadcast, cidr.prefix_len + ); + + Ok(Self { + storage, + cidr, + cidr_key, + alloc_prefix, + }) + } + + /// Allocate an IP for a resource. Idempotent: returns existing allocation if one exists. + pub fn allocate(&self, namespace: &str, name: &str) -> Result { + let resource_key = format!("{}/{}", namespace, name); + let prefix_len = self.alloc_prefix.len(); + + // Check if this resource already has an allocation + let allocations = self.storage.scan(&self.alloc_prefix)?; for (key, value) in &allocations { - let existing_pod = String::from_utf8_lossy(value); - if existing_pod == pod_key { - // Parse the IP from the key: "ipam/alloc/{ip}" + let existing = String::from_utf8_lossy(value); + if existing == resource_key { let key_str = String::from_utf8_lossy(key); - let ip_str = &key_str[IPAM_ALLOC_PREFIX.len()..]; + let ip_str = &key_str[prefix_len..]; if let Ok(ip) = ip_str.parse::() { - debug!("IPAM: returning existing allocation {} for {}", ip, pod_key); + debug!( + "IPAM: returning existing allocation {} for {}", + ip, resource_key + ); return Ok(IpAllocation { ip_address: ip, gateway: self.cidr.gateway, @@ -85,7 +103,7 @@ impl Ipam { .iter() .filter_map(|(key, _)| { let key_str = String::from_utf8_lossy(key); - let ip_str = &key_str[IPAM_ALLOC_PREFIX.len()..]; + let ip_str = &key_str[prefix_len..]; ip_str.parse::().ok() }) .collect(); @@ -101,10 +119,12 @@ impl Ipam { if !allocated.contains(&candidate) { // Allocate this IP - let alloc_key = format!("ipam/alloc/{}", candidate); - self.storage.put(alloc_key.as_bytes(), pod_key.as_bytes())?; + let alloc_key_str = String::from_utf8_lossy(&self.alloc_prefix); + let alloc_key = format!("{}{}", alloc_key_str, candidate); + self.storage + .put(alloc_key.as_bytes(), resource_key.as_bytes())?; - debug!("IPAM: allocated {} for {}", candidate, pod_key); + debug!("IPAM: allocated {} for {}", candidate, resource_key); return Ok(IpAllocation { ip_address: candidate, gateway: self.cidr.gateway, @@ -116,36 +136,38 @@ impl Ipam { } } - /// Release the IP allocated to a pod - pub fn release(&self, namespace: &str, pod_name: &str) -> Result> { - let pod_key = format!("{}/{}", namespace, pod_name); + /// Release the IP allocated to a resource + pub fn release(&self, namespace: &str, name: &str) -> Result> { + let resource_key = format!("{}/{}", namespace, name); + let prefix_len = self.alloc_prefix.len(); - let allocations = self.storage.scan(IPAM_ALLOC_PREFIX)?; + let allocations = self.storage.scan(&self.alloc_prefix)?; for (key, value) in &allocations { - let existing_pod = String::from_utf8_lossy(value); - if existing_pod == pod_key { + let existing = String::from_utf8_lossy(value); + if existing == resource_key { let key_str = String::from_utf8_lossy(key); - let ip_str = &key_str[IPAM_ALLOC_PREFIX.len()..]; + let ip_str = &key_str[prefix_len..]; let ip = ip_str.parse::().ok(); self.storage.delete(key)?; - debug!("IPAM: released {:?} for {}", ip, pod_key); + debug!("IPAM: released {:?} for {}", ip, resource_key); return Ok(ip); } } - debug!("IPAM: no allocation found for {}", pod_key); + debug!("IPAM: no allocation found for {}", resource_key); Ok(None) } /// Get all current allocations pub fn get_all_allocations(&self) -> Result> { - let allocations = self.storage.scan(IPAM_ALLOC_PREFIX)?; + let prefix_len = self.alloc_prefix.len(); + let allocations = self.storage.scan(&self.alloc_prefix)?; let mut result = BTreeMap::new(); for (key, value) in &allocations { let key_str = String::from_utf8_lossy(key); - let ip_str = &key_str[IPAM_ALLOC_PREFIX.len()..]; + let ip_str = &key_str[prefix_len..]; if let Ok(ip) = ip_str.parse::() { result.insert(ip, String::from_utf8_lossy(value).into_owned()); } diff --git a/crates/reddwarf-runtime/src/network/mod.rs b/crates/reddwarf-runtime/src/network/mod.rs index baf26f5..356b066 100644 --- a/crates/reddwarf-runtime/src/network/mod.rs +++ b/crates/reddwarf-runtime/src/network/mod.rs @@ -1,8 +1,10 @@ pub mod ipam; +pub mod nat; pub mod types; pub use crate::types::{DirectNicConfig, EtherstubConfig, NetworkMode}; pub use ipam::{CidrConfig, IpAllocation, Ipam}; +pub use nat::NatManager; /// Generate a VNIC name from pod namespace and name pub fn vnic_name_for_pod(namespace: &str, pod_name: &str) -> String { diff --git a/crates/reddwarf-runtime/src/network/nat.rs b/crates/reddwarf-runtime/src/network/nat.rs new file mode 100644 index 0000000..d918d66 --- /dev/null +++ b/crates/reddwarf-runtime/src/network/nat.rs @@ -0,0 +1,218 @@ +use crate::service_controller::ServiceEndpoints; +use tracing::debug; + +/// Manages NAT (Network Address Translation) rules for service traffic routing. +/// +/// On illumos, this generates ipnat rdr (redirect) rules so that traffic destined +/// for a service ClusterIP:port is forwarded to one of the backing pod IPs. +/// On non-illumos platforms, this is a no-op that only logs the desired state. +pub struct NatManager { + /// Name of the network interface to apply rules on (e.g. "reddwarf0") + interface: String, +} + +impl NatManager { + pub fn new(interface: &str) -> Self { + Self { + interface: interface.to_string(), + } + } + + /// Synchronize all NAT rules to match the desired service endpoints. + /// + /// This is a full reconcile: it replaces all existing rules with the new set. + pub fn sync_rules(&self, endpoints: &[ServiceEndpoints]) { + let rules = self.generate_rules(endpoints); + + if rules.is_empty() { + debug!("NAT sync: no rules to apply (no services with endpoints)"); + return; + } + + debug!( + "NAT sync: {} rules for {} services", + rules.len(), + endpoints.len() + ); + + #[cfg(target_os = "illumos")] + { + if let Err(e) = self.apply_rules_illumos(&rules) { + tracing::error!("Failed to apply NAT rules: {}", e); + } + } + + #[cfg(not(target_os = "illumos"))] + { + for rule in &rules { + debug!("NAT rule (mock): {}", rule); + } + } + } + + /// Generate ipnat rdr rules for all service endpoints. + /// + /// For each service port, generates one rdr rule per backing pod IP. + /// Multiple rdr rules for the same destination provide round-robin load balancing + /// in ipnat. + fn generate_rules(&self, endpoints: &[ServiceEndpoints]) -> Vec { + let mut rules = Vec::new(); + + for ep in endpoints { + if ep.pod_ips.is_empty() { + continue; + } + + for &(port, target_port) in &ep.ports { + for pod_ip in &ep.pod_ips { + // ipnat rdr rule format: + // rdr /32 port -> port tcp + let rule = format!( + "rdr {} {}/32 port {} -> {} port {} tcp", + self.interface, ep.cluster_ip, port, pod_ip, target_port + ); + rules.push(rule); + } + } + } + + rules + } + + /// Apply rules on illumos by writing to ipnat config and reloading. + #[cfg(target_os = "illumos")] + fn apply_rules_illumos(&self, rules: &[String]) -> std::io::Result<()> { + use std::fmt::Write; + + let conf_path = "/etc/ipf/reddwarf-ipnat.conf"; + + // Write rules to config file + let mut content = String::new(); + let _ = writeln!(content, "# Auto-generated by reddwarf service controller"); + let _ = writeln!(content, "# Do not edit manually"); + for rule in rules { + let _ = writeln!(content, "{}", rule); + } + + std::fs::write(conf_path, &content)?; + info!("Wrote {} NAT rules to {}", rules.len(), conf_path); + + // Reload ipnat rules + let output = std::process::Command::new("ipnat") + .args(["-CF", "-f", conf_path]) + .output()?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::error!("ipnat reload failed: {}", stderr); + } else { + info!("NAT rules reloaded successfully"); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_endpoints( + name: &str, + cluster_ip: &str, + ports: Vec<(i32, i32)>, + pod_ips: Vec<&str>, + ) -> ServiceEndpoints { + ServiceEndpoints { + name: name.to_string(), + namespace: "default".to_string(), + cluster_ip: cluster_ip.to_string(), + ports, + pod_ips: pod_ips.into_iter().map(String::from).collect(), + } + } + + #[test] + fn test_generate_rules_single_pod() { + let mgr = NatManager::new("reddwarf0"); + let eps = vec![make_endpoints( + "my-svc", + "10.96.0.1", + vec![(80, 8080)], + vec!["10.88.0.2"], + )]; + + let rules = mgr.generate_rules(&eps); + assert_eq!(rules.len(), 1); + assert_eq!( + rules[0], + "rdr reddwarf0 10.96.0.1/32 port 80 -> 10.88.0.2 port 8080 tcp" + ); + } + + #[test] + fn test_generate_rules_multiple_pods() { + let mgr = NatManager::new("reddwarf0"); + let eps = vec![make_endpoints( + "my-svc", + "10.96.0.1", + vec![(80, 8080)], + vec!["10.88.0.2", "10.88.0.3"], + )]; + + let rules = mgr.generate_rules(&eps); + assert_eq!(rules.len(), 2); + assert!(rules[0].contains("10.88.0.2")); + assert!(rules[1].contains("10.88.0.3")); + } + + #[test] + fn test_generate_rules_multiple_ports() { + let mgr = NatManager::new("reddwarf0"); + let eps = vec![make_endpoints( + "my-svc", + "10.96.0.1", + vec![(80, 8080), (443, 8443)], + vec!["10.88.0.2"], + )]; + + let rules = mgr.generate_rules(&eps); + assert_eq!(rules.len(), 2); + assert!(rules[0].contains("port 80")); + assert!(rules[1].contains("port 443")); + } + + #[test] + fn test_generate_rules_no_pods() { + let mgr = NatManager::new("reddwarf0"); + let eps = vec![make_endpoints( + "my-svc", + "10.96.0.1", + vec![(80, 8080)], + vec![], + )]; + + let rules = mgr.generate_rules(&eps); + assert!(rules.is_empty()); + } + + #[test] + fn test_generate_rules_empty_endpoints() { + let mgr = NatManager::new("reddwarf0"); + let rules = mgr.generate_rules(&[]); + assert!(rules.is_empty()); + } + + #[test] + fn test_sync_rules_no_panic() { + let mgr = NatManager::new("reddwarf0"); + let eps = vec![make_endpoints( + "my-svc", + "10.96.0.1", + vec![(80, 8080)], + vec!["10.88.0.2"], + )]; + // Should not panic + mgr.sync_rules(&eps); + } +} diff --git a/crates/reddwarf-runtime/src/node_agent.rs b/crates/reddwarf-runtime/src/node_agent.rs index aae27f9..30b91b3 100644 --- a/crates/reddwarf-runtime/src/node_agent.rs +++ b/crates/reddwarf-runtime/src/node_agent.rs @@ -3,12 +3,15 @@ use crate::error::{Result, RuntimeError}; use crate::sysinfo::{ compute_node_resources, format_memory_quantity, NodeResources, ResourceReservation, }; +use crate::traits::ZoneRuntime; +use crate::types::ZoneInfo; use k8s_openapi::api::core::v1::{Node, NodeAddress, NodeCondition, NodeStatus}; use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; @@ -51,10 +54,23 @@ pub struct NodeAgent { config: NodeAgentConfig, /// Detected system resources (None if detection failed at startup). detected: Option, + /// Zone runtime for querying zone states during heartbeat + runtime: Option>, + /// Cached zone info from last heartbeat + cached_zones: Mutex>, } impl NodeAgent { pub fn new(api_client: Arc, config: NodeAgentConfig) -> Self { + Self::with_runtime(api_client, config, None) + } + + /// Create a node agent with a zone runtime for zone state reporting + pub fn with_runtime( + api_client: Arc, + config: NodeAgentConfig, + runtime: Option>, + ) -> Self { let reservation = ResourceReservation { cpu_millicores: config.system_reserved_cpu_millicores, memory_bytes: config.system_reserved_memory_bytes, @@ -85,6 +101,8 @@ impl NodeAgent { api_client, config, detected, + runtime, + cached_zones: Mutex::new(Vec::new()), } } @@ -98,6 +116,24 @@ impl NodeAgent { api_client, config, detected, + runtime: None, + cached_zones: Mutex::new(Vec::new()), + } + } + + #[cfg(test)] + fn new_with_runtime_and_detected( + api_client: Arc, + config: NodeAgentConfig, + detected: Option, + runtime: Option>, + ) -> Self { + Self { + api_client, + config, + detected, + runtime, + cached_zones: Mutex::new(Vec::new()), } } @@ -105,7 +141,9 @@ impl NodeAgent { pub async fn register(&self) -> Result<()> { info!("Registering node '{}'", self.config.node_name); - let node = self.build_node(); + let zones = self.cached_zones.lock().await; + let node = self.build_node(&zones); + drop(zones); match self.api_client.create_node(&node).await { Ok(_) => { @@ -154,7 +192,23 @@ impl NodeAgent { /// Send a heartbeat by updating node status async fn heartbeat(&self) -> Result<()> { - let node = self.build_node(); + // Query zone states if runtime is available + if let Some(ref runtime) = self.runtime { + match runtime.list_zones().await { + Ok(zones) => { + let mut cached = self.cached_zones.lock().await; + *cached = zones; + } + Err(e) => { + warn!("Failed to list zones for heartbeat: {}", e); + // Keep previous cached state + } + } + } + + let zones = self.cached_zones.lock().await; + let node = self.build_node(&zones); + drop(zones); self.api_client .update_node_status(&self.config.node_name, &node) @@ -165,7 +219,7 @@ impl NodeAgent { } /// Build the Node resource with current status - fn build_node(&self) -> Node { + fn build_node(&self, zones: &[ZoneInfo]) -> Node { let hostname = self.config.node_name.clone(); let (capacity, allocatable) = if let Some(ref nr) = self.detected { @@ -210,6 +264,28 @@ impl NodeAgent { (capacity, allocatable) }; + // Build zone summary annotations + let annotations = if !zones.is_empty() { + let mut state_counts: BTreeMap = BTreeMap::new(); + for zone in zones { + *state_counts.entry(zone.state.to_string()).or_insert(0) += 1; + } + let summary = serde_json::to_string(&state_counts).unwrap_or_default(); + Some( + [ + ( + "reddwarf.io/zone-count".to_string(), + zones.len().to_string(), + ), + ("reddwarf.io/zone-summary".to_string(), summary), + ] + .into_iter() + .collect(), + ) + } else { + None + }; + Node { metadata: ObjectMeta { name: Some(self.config.node_name.clone()), @@ -228,6 +304,7 @@ impl NodeAgent { .into_iter() .collect(), ), + annotations, ..Default::default() }, status: Some(NodeStatus { @@ -279,7 +356,7 @@ mod tests { NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); let agent = NodeAgent::new(api_client, config); - let node = agent.build_node(); + let node = agent.build_node(&[]); assert_eq!(node.metadata.name, Some("test-node".to_string())); let status = node.status.unwrap(); @@ -297,7 +374,7 @@ mod tests { NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); let agent = NodeAgent::new(api_client, config); - let node = agent.build_node(); + let node = agent.build_node(&[]); let status = node.status.unwrap(); // Check allocatable has all keys @@ -325,9 +402,12 @@ mod tests { let agent = NodeAgent::new(api_client, config); // Agent should have detected resources (we're on a real host) - assert!(agent.detected.is_some(), "detection should succeed in tests"); + assert!( + agent.detected.is_some(), + "detection should succeed in tests" + ); - let node = agent.build_node(); + let node = agent.build_node(&[]); let status = node.status.unwrap(); let cap = status.capacity.unwrap(); let alloc = status.allocatable.unwrap(); @@ -335,9 +415,8 @@ mod tests { // Allocatable CPU (millicores) should be less than capacity CPU (whole cores) let cap_cpu_m = reddwarf_core::resources::ResourceQuantities::parse_cpu(&cap["cpu"].0) .expect("valid cpu"); - let alloc_cpu_m = - reddwarf_core::resources::ResourceQuantities::parse_cpu(&alloc["cpu"].0) - .expect("valid cpu"); + let alloc_cpu_m = reddwarf_core::resources::ResourceQuantities::parse_cpu(&alloc["cpu"].0) + .expect("valid cpu"); assert!( alloc_cpu_m < cap_cpu_m, "allocatable CPU {}m should be less than capacity {}m", @@ -346,9 +425,8 @@ mod tests { ); // Allocatable memory should be less than capacity memory - let cap_mem = - reddwarf_core::resources::ResourceQuantities::parse_memory(&cap["memory"].0) - .expect("valid mem"); + let cap_mem = reddwarf_core::resources::ResourceQuantities::parse_memory(&cap["memory"].0) + .expect("valid mem"); let alloc_mem = reddwarf_core::resources::ResourceQuantities::parse_memory(&alloc["memory"].0) .expect("valid mem"); @@ -368,10 +446,13 @@ mod tests { config.supported_brands = vec!["reddwarf".into(), "lx".into()]; let agent = NodeAgent::new(api_client, config); - let node = agent.build_node(); + let node = agent.build_node(&[]); let labels = node.metadata.labels.unwrap(); - assert_eq!(labels.get("reddwarf.io/zone-brands").unwrap(), "reddwarf,lx"); + assert_eq!( + labels.get("reddwarf.io/zone-brands").unwrap(), + "reddwarf,lx" + ); } #[test] @@ -381,7 +462,7 @@ mod tests { NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); let agent = NodeAgent::new(api_client, config); - let node = agent.build_node(); + let node = agent.build_node(&[]); let labels = node.metadata.labels.unwrap(); assert_eq!(labels.get("reddwarf.io/zone-brands").unwrap(), "reddwarf"); @@ -395,7 +476,7 @@ mod tests { // Simulate detection failure let agent = NodeAgent::new_with_detected(api_client, config, None); - let node = agent.build_node(); + let node = agent.build_node(&[]); let status = node.status.unwrap(); let alloc = status.allocatable.unwrap(); let cap = status.capacity.unwrap(); @@ -413,4 +494,60 @@ mod tests { assert_eq!(alloc["cpu"].0, expected_cpu); assert_eq!(cap["cpu"].0, expected_cpu); } + + #[test] + fn test_build_node_with_zone_info() { + let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); + let config = + NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); + let agent = NodeAgent::new(api_client, config); + + let zones = vec![ + ZoneInfo { + zone_name: "pod-a".to_string(), + zone_id: Some(1), + state: crate::types::ZoneState::Running, + zonepath: "/zones/pod-a".to_string(), + brand: "reddwarf".to_string(), + uuid: String::new(), + }, + ZoneInfo { + zone_name: "pod-b".to_string(), + zone_id: Some(2), + state: crate::types::ZoneState::Running, + zonepath: "/zones/pod-b".to_string(), + brand: "reddwarf".to_string(), + uuid: String::new(), + }, + ZoneInfo { + zone_name: "pod-c".to_string(), + zone_id: None, + state: crate::types::ZoneState::Installed, + zonepath: "/zones/pod-c".to_string(), + brand: "lx".to_string(), + uuid: String::new(), + }, + ]; + + let node = agent.build_node(&zones); + + let annotations = node.metadata.annotations.unwrap(); + assert_eq!(annotations.get("reddwarf.io/zone-count").unwrap(), "3"); + let summary: BTreeMap = + serde_json::from_str(annotations.get("reddwarf.io/zone-summary").unwrap()).unwrap(); + assert_eq!(summary.get("running"), Some(&2)); + assert_eq!(summary.get("installed"), Some(&1)); + } + + #[test] + fn test_build_node_no_zones_no_annotations() { + let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); + let config = + NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); + let agent = NodeAgent::new(api_client, config); + + let node = agent.build_node(&[]); + + assert!(node.metadata.annotations.is_none()); + } } diff --git a/crates/reddwarf-runtime/src/node_health.rs b/crates/reddwarf-runtime/src/node_health.rs index 18bc1cc..f89bfa0 100644 --- a/crates/reddwarf-runtime/src/node_health.rs +++ b/crates/reddwarf-runtime/src/node_health.rs @@ -94,11 +94,7 @@ impl NodeHealthChecker { /// Check a single node's heartbeat and mark it NotReady if stale async fn check_node(&self, node_name: &str, node: &Node) -> Result<()> { - let conditions = match node - .status - .as_ref() - .and_then(|s| s.conditions.as_ref()) - { + let conditions = match node.status.as_ref().and_then(|s| s.conditions.as_ref()) { Some(c) => c, None => { debug!("Node {} has no conditions, skipping", node_name); @@ -218,9 +214,7 @@ mod tests { reason: Some("KubeletReady".to_string()), message: Some("node agent is healthy".to_string()), last_heartbeat_time: Some(Time(heartbeat_time)), - last_transition_time: Some(Time( - Utc::now() - chrono::Duration::seconds(3600), - )), + last_transition_time: Some(Time(Utc::now() - chrono::Duration::seconds(3600))), }]), ..Default::default() }), @@ -242,9 +236,7 @@ mod tests { reason: Some("NodeStatusUnknown".to_string()), message: Some("Node heartbeat not received".to_string()), last_heartbeat_time: Some(Time(heartbeat_time)), - last_transition_time: Some(Time( - Utc::now() - chrono::Duration::seconds(100), - )), + last_transition_time: Some(Time(Utc::now() - chrono::Duration::seconds(100))), }]), ..Default::default() }), diff --git a/crates/reddwarf-runtime/src/probes/executor.rs b/crates/reddwarf-runtime/src/probes/executor.rs index 1e919de..ef2aa0f 100644 --- a/crates/reddwarf-runtime/src/probes/executor.rs +++ b/crates/reddwarf-runtime/src/probes/executor.rs @@ -31,10 +31,9 @@ impl ProbeExecutor { .await { Ok(outcome) => outcome, - Err(_) => ProbeOutcome::Failure(format!( - "probe timed out after {}s", - timeout.as_secs() - )), + Err(_) => { + ProbeOutcome::Failure(format!("probe timed out after {}s", timeout.as_secs())) + } }; ProbeResult { @@ -108,10 +107,7 @@ impl ProbeExecutor { let mut stream = match TcpStream::connect(&addr).await { Ok(s) => s, Err(e) => { - return ProbeOutcome::Failure(format!( - "HTTP connection to {} failed: {}", - addr, e - )) + return ProbeOutcome::Failure(format!("HTTP connection to {} failed: {}", addr, e)) } }; @@ -159,11 +155,15 @@ mod tests { use crate::mock::MockRuntime; use crate::storage::MockStorageEngine; use crate::traits::ZoneRuntime; - use crate::types::{StoragePoolConfig, ZoneBrand, ZoneConfig, ZoneStorageOpts, NetworkMode, EtherstubConfig}; + use crate::types::{ + EtherstubConfig, NetworkMode, StoragePoolConfig, ZoneBrand, ZoneConfig, ZoneStorageOpts, + }; use tokio::net::TcpListener; fn make_test_runtime() -> Arc { - let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool("rpool"))); + let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool( + "rpool", + ))); Arc::new(MockRuntime::new(storage)) } @@ -181,6 +181,7 @@ mod tests { }), storage: ZoneStorageOpts::default(), lx_image_path: None, + bhyve_disk_image: None, processes: vec![], cpu_cap: None, memory_cap: None, @@ -333,8 +334,7 @@ mod tests { if let Ok((mut stream, _)) = listener.accept().await { let mut buf = [0u8; 1024]; let _ = stream.read(&mut buf).await; - let response = - "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 5\r\n\r\nError"; + let response = "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 5\r\n\r\nError"; let _ = stream.write_all(response.as_bytes()).await; } }); diff --git a/crates/reddwarf-runtime/src/probes/mod.rs b/crates/reddwarf-runtime/src/probes/mod.rs index 2e3e4cd..694dc50 100644 --- a/crates/reddwarf-runtime/src/probes/mod.rs +++ b/crates/reddwarf-runtime/src/probes/mod.rs @@ -5,5 +5,5 @@ pub mod types; pub use executor::ProbeExecutor; pub use tracker::{PodProbeStatus, ProbeTracker}; pub use types::{ - ContainerProbeConfig, ProbeAction, ProbeKind, ProbeOutcome, ProbeResult, extract_probes, + extract_probes, ContainerProbeConfig, ProbeAction, ProbeKind, ProbeOutcome, ProbeResult, }; diff --git a/crates/reddwarf-runtime/src/probes/tracker.rs b/crates/reddwarf-runtime/src/probes/tracker.rs index 8dd54ea..548b89a 100644 --- a/crates/reddwarf-runtime/src/probes/tracker.rs +++ b/crates/reddwarf-runtime/src/probes/tracker.rs @@ -268,7 +268,9 @@ mod tests { use std::sync::Arc; fn make_test_runtime() -> Arc { - let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool("rpool"))); + let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool( + "rpool", + ))); Arc::new(MockRuntime::new(storage)) } @@ -286,6 +288,7 @@ mod tests { }), storage: ZoneStorageOpts::default(), lx_image_path: None, + bhyve_disk_image: None, processes: vec![], cpu_cap: None, memory_cap: None, diff --git a/crates/reddwarf-runtime/src/probes/types.rs b/crates/reddwarf-runtime/src/probes/types.rs index 8d28e97..bf3a52e 100644 --- a/crates/reddwarf-runtime/src/probes/types.rs +++ b/crates/reddwarf-runtime/src/probes/types.rs @@ -23,9 +23,19 @@ impl std::fmt::Display for ProbeKind { /// The action a probe performs #[derive(Debug, Clone, PartialEq, Eq)] pub enum ProbeAction { - Exec { command: Vec }, - HttpGet { path: String, port: u16, host: String, scheme: String }, - TcpSocket { port: u16, host: String }, + Exec { + command: Vec, + }, + HttpGet { + path: String, + port: u16, + host: String, + scheme: String, + }, + TcpSocket { + port: u16, + host: String, + }, } /// Extracted probe configuration for a single container + probe kind @@ -132,9 +142,7 @@ pub fn extract_probes(container: &Container) -> Vec { #[cfg(test)] mod tests { use super::*; - use k8s_openapi::api::core::v1::{ - ExecAction, HTTPGetAction, Probe, TCPSocketAction, - }; + use k8s_openapi::api::core::v1::{ExecAction, HTTPGetAction, Probe, TCPSocketAction}; #[test] fn test_extract_exec_probe() { @@ -142,7 +150,11 @@ mod tests { name: "web".to_string(), liveness_probe: Some(Probe { exec: Some(ExecAction { - command: Some(vec!["/bin/sh".to_string(), "-c".to_string(), "exit 0".to_string()]), + command: Some(vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "exit 0".to_string(), + ]), }), period_seconds: Some(5), failure_threshold: Some(2), @@ -157,7 +169,11 @@ mod tests { assert_eq!( probes[0].action, ProbeAction::Exec { - command: vec!["/bin/sh".to_string(), "-c".to_string(), "exit 0".to_string()] + command: vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "exit 0".to_string() + ] } ); assert_eq!(probes[0].period_seconds, 5); diff --git a/crates/reddwarf-runtime/src/service_controller.rs b/crates/reddwarf-runtime/src/service_controller.rs new file mode 100644 index 0000000..eaae597 --- /dev/null +++ b/crates/reddwarf-runtime/src/service_controller.rs @@ -0,0 +1,236 @@ +use crate::error::Result; +use crate::network::nat::NatManager; +use k8s_openapi::api::core::v1::{Pod, Service}; +use reddwarf_core::ResourceEvent; +use reddwarf_storage::{KVStore, KeyEncoder, RedbBackend}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +/// Configuration for the service controller +#[derive(Debug, Clone)] +pub struct ServiceControllerConfig { + /// Interval between full reconciliation cycles + pub reconcile_interval: Duration, +} + +impl Default for ServiceControllerConfig { + fn default() -> Self { + Self { + reconcile_interval: Duration::from_secs(30), + } + } +} + +/// Endpoint information for a service +#[derive(Debug, Clone)] +pub struct ServiceEndpoints { + /// Service name + pub name: String, + /// Service namespace + pub namespace: String, + /// ClusterIP assigned to this service + pub cluster_ip: String, + /// Service ports (port -> target_port) + pub ports: Vec<(i32, i32)>, + /// Pod IPs that match the service selector + pub pod_ips: Vec, +} + +/// Service controller that watches for Service and Pod events, +/// resolves service selectors to pod endpoints, and configures NAT rules. +pub struct ServiceController { + storage: Arc, + event_tx: broadcast::Sender, + nat_manager: Arc, + config: ServiceControllerConfig, +} + +impl ServiceController { + pub fn new( + storage: Arc, + event_tx: broadcast::Sender, + nat_manager: Arc, + config: ServiceControllerConfig, + ) -> Self { + Self { + storage, + event_tx, + nat_manager, + config, + } + } + + /// Run the controller — reacts to Service and Pod events from the event bus. + pub async fn run(&self, token: CancellationToken) -> Result<()> { + info!("Starting service controller"); + + // Initial full reconcile + if let Err(e) = self.reconcile_all().await { + error!("Initial service reconcile failed: {}", e); + } + + let mut rx = self.event_tx.subscribe(); + let mut reconcile_tick = tokio::time::interval(self.config.reconcile_interval); + reconcile_tick.tick().await; // consume first tick + + loop { + tokio::select! { + _ = token.cancelled() => { + info!("Service controller shutting down"); + return Ok(()); + } + _ = reconcile_tick.tick() => { + debug!("Service controller periodic reconcile"); + if let Err(e) = self.reconcile_all().await { + error!("Service periodic reconcile failed: {}", e); + } + } + result = rx.recv() => { + match result { + Ok(event) => { + let kind = event.gvk.kind.as_str(); + if kind == "Service" || kind == "Pod" { + debug!("Service controller handling {} event for {}", kind, event.resource_key.name); + if let Err(e) = self.reconcile_all().await { + error!("Service reconcile after event failed: {}", e); + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Service controller lagged by {} events, full reconcile", n); + if let Err(e) = self.reconcile_all().await { + error!("Service reconcile after lag failed: {}", e); + } + } + Err(broadcast::error::RecvError::Closed) => { + info!("Event bus closed, service controller stopping"); + return Ok(()); + } + } + } + } + } + } + + /// Full reconcile: for each service, resolve endpoints and sync NAT rules. + async fn reconcile_all(&self) -> Result<()> { + let services = self.list_services()?; + let pods = self.list_pods()?; + + let mut all_endpoints = Vec::new(); + + for service in &services { + let namespace = service.metadata.namespace.as_deref().unwrap_or("default"); + let name = service.metadata.name.as_deref().unwrap_or("unknown"); + + let selector = match service.spec.as_ref().and_then(|s| s.selector.as_ref()) { + Some(s) if !s.is_empty() => s, + _ => continue, // No selector means no endpoints to track + }; + + let cluster_ip = match service.spec.as_ref().and_then(|s| s.cluster_ip.as_ref()) { + Some(ip) if !ip.is_empty() && ip != "None" => ip.clone(), + _ => continue, // No ClusterIP + }; + + // Collect ports + let ports: Vec<(i32, i32)> = service + .spec + .as_ref() + .and_then(|s| s.ports.as_ref()) + .map(|ports| { + ports + .iter() + .map(|p| { + let port = p.port; + let target_port = p + .target_port + .as_ref() + .and_then(|tp| match tp { + k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(i) => Some(*i), + k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::String(_) => None, + }) + .unwrap_or(port); + (port, target_port) + }) + .collect() + }) + .unwrap_or_default(); + + // Find pods matching the selector + let matching_ips: Vec = pods + .iter() + .filter(|pod| { + let pod_ns = pod.metadata.namespace.as_deref().unwrap_or("default"); + if pod_ns != namespace { + return false; + } + let pod_labels = pod.metadata.labels.as_ref(); + match pod_labels { + Some(labels) => selector.iter().all(|(k, v)| labels.get(k) == Some(v)), + None => false, + } + }) + .filter_map(|pod| pod.status.as_ref().and_then(|s| s.pod_ip.as_ref()).cloned()) + .collect(); + + debug!( + "Service {}/{}: ClusterIP={}, endpoints={:?}, ports={:?}", + namespace, name, cluster_ip, matching_ips, ports + ); + + all_endpoints.push(ServiceEndpoints { + name: name.to_string(), + namespace: namespace.to_string(), + cluster_ip, + ports, + pod_ips: matching_ips, + }); + } + + // Sync NAT rules + self.nat_manager.sync_rules(&all_endpoints); + + Ok(()) + } + + /// List all services from storage + fn list_services(&self) -> Result> { + let prefix = KeyEncoder::encode_prefix("v1", "Service", None); + let entries = self.storage.scan(prefix.as_bytes())?; + let mut services = Vec::new(); + for (_key, value) in entries { + if let Ok(svc) = serde_json::from_slice::(&value) { + services.push(svc); + } + } + Ok(services) + } + + /// List all pods from storage + fn list_pods(&self) -> Result> { + let prefix = KeyEncoder::encode_prefix("v1", "Pod", None); + let entries = self.storage.scan(prefix.as_bytes())?; + let mut pods = Vec::new(); + for (_key, value) in entries { + if let Ok(pod) = serde_json::from_slice::(&value) { + pods.push(pod); + } + } + Ok(pods) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_service_controller_config_defaults() { + let config = ServiceControllerConfig::default(); + assert_eq!(config.reconcile_interval, Duration::from_secs(30)); + } +} diff --git a/crates/reddwarf-runtime/src/sysinfo.rs b/crates/reddwarf-runtime/src/sysinfo.rs index b7db695..f7d8542 100644 --- a/crates/reddwarf-runtime/src/sysinfo.rs +++ b/crates/reddwarf-runtime/src/sysinfo.rs @@ -62,8 +62,7 @@ pub fn compute_node_resources( let capacity = detect_system_resources()?; let capacity_cpu_millicores = capacity.cpu_count as i64 * 1000; - let allocatable_cpu_millicores = - (capacity_cpu_millicores - reservation.cpu_millicores).max(0); + let allocatable_cpu_millicores = (capacity_cpu_millicores - reservation.cpu_millicores).max(0); let allocatable_memory_bytes = (capacity.total_memory_bytes as i64 - reservation.memory_bytes).max(0) as u64; @@ -104,10 +103,7 @@ mod tests { fn test_detect_system_resources() { let res = detect_system_resources().expect("detection should succeed in test env"); assert!(res.cpu_count > 0, "should detect at least 1 CPU"); - assert!( - res.total_memory_bytes > 0, - "should detect nonzero memory" - ); + assert!(res.total_memory_bytes > 0, "should detect nonzero memory"); } #[test] @@ -139,8 +135,7 @@ mod tests { cpu_millicores: 100, memory_bytes: 256 * 1024 * 1024, }; - let nr = compute_node_resources(&reservation, 110) - .expect("should succeed in test env"); + let nr = compute_node_resources(&reservation, 110).expect("should succeed in test env"); let capacity_cpu_millis = nr.capacity.cpu_count as i64 * 1000; assert!( @@ -164,8 +159,7 @@ mod tests { cpu_millicores: i64::MAX, memory_bytes: i64::MAX, }; - let nr = compute_node_resources(&reservation, 110) - .expect("should succeed in test env"); + let nr = compute_node_resources(&reservation, 110).expect("should succeed in test env"); assert_eq!(nr.allocatable_cpu_millicores, 0); assert_eq!(nr.allocatable_memory_bytes, 0); diff --git a/crates/reddwarf-runtime/src/traits.rs b/crates/reddwarf-runtime/src/traits.rs index a86be2d..f2e6e0b 100644 --- a/crates/reddwarf-runtime/src/traits.rs +++ b/crates/reddwarf-runtime/src/traits.rs @@ -64,6 +64,12 @@ pub trait ZoneRuntime: Send + Sync { /// Set up network for a zone async fn setup_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>; + /// Configure IP address inside a running zone via ipadm + /// + /// Must be called after the zone is booted. Creates the IP interface, + /// assigns a static address, and configures the default route. + async fn configure_zone_ip(&self, zone_name: &str, network: &NetworkMode) -> Result<()>; + /// Tear down network for a zone async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>; diff --git a/crates/reddwarf-runtime/src/types.rs b/crates/reddwarf-runtime/src/types.rs index 02c2513..2166df8 100644 --- a/crates/reddwarf-runtime/src/types.rs +++ b/crates/reddwarf-runtime/src/types.rs @@ -7,6 +7,8 @@ pub enum ZoneBrand { Lx, /// Custom reddwarf brand (Pod = Zone, containers = supervised processes) Reddwarf, + /// Bhyve branded zone (hardware virtual machine) + Bhyve, } impl ZoneBrand { @@ -15,6 +17,7 @@ impl ZoneBrand { match self { ZoneBrand::Lx => "lx", ZoneBrand::Reddwarf => "reddwarf", + ZoneBrand::Bhyve => "bhyve", } } } @@ -230,6 +233,8 @@ pub struct ZoneConfig { pub storage: ZoneStorageOpts, /// LX brand image path (only for Lx brand) pub lx_image_path: Option, + /// Bhyve disk image path (only for Bhyve brand, e.g., a ZFS zvol device) + pub bhyve_disk_image: Option, /// Supervised processes (for reddwarf brand) pub processes: Vec, /// CPU cap (fraction, e.g., "1.0" = 1 CPU) @@ -277,6 +282,7 @@ mod tests { fn test_zone_brand_display() { assert_eq!(ZoneBrand::Lx.as_str(), "lx"); assert_eq!(ZoneBrand::Reddwarf.as_str(), "reddwarf"); + assert_eq!(ZoneBrand::Bhyve.as_str(), "bhyve"); } #[test] diff --git a/crates/reddwarf-runtime/src/zone/config.rs b/crates/reddwarf-runtime/src/zone/config.rs index aab8426..b1d02b0 100644 --- a/crates/reddwarf-runtime/src/zone/config.rs +++ b/crates/reddwarf-runtime/src/zone/config.rs @@ -1,5 +1,6 @@ +use crate::brand::bhyve::bhyve_zonecfg_lines; use crate::error::Result; -use crate::types::{NetworkMode, ZoneConfig}; +use crate::types::{NetworkMode, ZoneBrand, ZoneConfig}; /// Generate a zonecfg command file from a ZoneConfig pub fn generate_zonecfg(config: &ZoneConfig) -> Result { @@ -45,6 +46,13 @@ pub fn generate_zonecfg(config: &ZoneConfig) -> Result { lines.push("end".to_string()); } + // Bhyve-specific device configuration + if config.brand == ZoneBrand::Bhyve { + for line in bhyve_zonecfg_lines(config) { + lines.push(line); + } + } + // Filesystem mounts for mount in &config.fs_mounts { lines.push("add fs".to_string()); @@ -86,6 +94,7 @@ mod tests { quota: Some("10G".to_string()), }, lx_image_path: Some("/images/ubuntu-22.04.tar.gz".to_string()), + bhyve_disk_image: None, processes: vec![], cpu_cap: Some("2.0".to_string()), memory_cap: Some("1G".to_string()), @@ -123,6 +132,7 @@ mod tests { quota: None, }, lx_image_path: None, + bhyve_disk_image: None, processes: vec![ContainerProcess { name: "web".to_string(), command: vec!["/usr/bin/node".to_string(), "server.js".to_string()], diff --git a/crates/reddwarf-scheduler/src/filter.rs b/crates/reddwarf-scheduler/src/filter.rs index 686b1f8..70eebae 100644 --- a/crates/reddwarf-scheduler/src/filter.rs +++ b/crates/reddwarf-scheduler/src/filter.rs @@ -435,7 +435,10 @@ mod tests { let filter = ZoneBrandMatch; let result = filter.filter(&context, &node); assert!(!result.passed); - assert!(result.reason.unwrap().contains("does not support zone brand 'lx'")); + assert!(result + .reason + .unwrap() + .contains("does not support zone brand 'lx'")); } #[test] diff --git a/crates/reddwarf/src/main.rs b/crates/reddwarf/src/main.rs index 8e5ffc3..c046d2b 100644 --- a/crates/reddwarf/src/main.rs +++ b/crates/reddwarf/src/main.rs @@ -2,8 +2,9 @@ use clap::{Parser, Subcommand}; use reddwarf_apiserver::{ApiError, ApiServer, AppState, Config as ApiConfig, TlsMode}; use reddwarf_core::{Namespace, ResourceQuantities}; use reddwarf_runtime::{ - ApiClient, Ipam, MockRuntime, MockStorageEngine, NodeAgent, NodeAgentConfig, - NodeHealthChecker, NodeHealthCheckerConfig, PodController, PodControllerConfig, StorageEngine, + ApiClient, DnsServer, DnsServerConfig, Ipam, MockRuntime, MockStorageEngine, NatManager, + NodeAgent, NodeAgentConfig, NodeHealthChecker, NodeHealthCheckerConfig, PodController, + PodControllerConfig, ServiceController, ServiceControllerConfig, StorageEngine, StoragePoolConfig, ZoneBrand, }; use reddwarf_scheduler::scheduler::SchedulerConfig; @@ -40,6 +41,7 @@ struct TlsArgs { } #[derive(Subcommand)] +#[allow(clippy::large_enum_variant)] enum Commands { /// Run the API server only Serve { @@ -81,6 +83,9 @@ enum Commands { /// Pod network CIDR for IPAM allocation #[arg(long, default_value = "10.88.0.0/16")] pod_cidr: String, + /// Service ClusterIP CIDR for service IP allocation + #[arg(long, default_value = "10.96.0.0/12")] + service_cidr: String, /// Etherstub name for pod networking #[arg(long, default_value = "reddwarf0")] etherstub_name: String, @@ -96,6 +101,9 @@ enum Commands { /// Comma-separated list of zone brands this node supports #[arg(long, default_value = "reddwarf")] supported_brands: String, + /// DNS server listen address for service discovery + #[arg(long, default_value = "0.0.0.0:10053")] + cluster_dns: String, #[command(flatten)] tls_args: TlsArgs, }, @@ -129,15 +137,17 @@ async fn main() -> miette::Result<()> { volumes_dataset, zonepath_prefix, pod_cidr, + service_cidr, etherstub_name, system_reserved_cpu, system_reserved_memory, max_pods, supported_brands, + cluster_dns, tls_args, } => { - let reserved_cpu_millicores = - ResourceQuantities::parse_cpu(&system_reserved_cpu).map_err(|e| { + let reserved_cpu_millicores = ResourceQuantities::parse_cpu(&system_reserved_cpu) + .map_err(|e| { miette::miette!( help = "Use a value like '100m' or '0.1' for --system-reserved-cpu", "Invalid --system-reserved-cpu '{}': {}", @@ -145,15 +155,15 @@ async fn main() -> miette::Result<()> { e ) })?; - let reserved_memory_bytes = - ResourceQuantities::parse_memory(&system_reserved_memory).map_err(|e| { - miette::miette!( - help = "Use a value like '256Mi' or '1Gi' for --system-reserved-memory", - "Invalid --system-reserved-memory '{}': {}", - system_reserved_memory, - e - ) - })?; + let reserved_memory_bytes = ResourceQuantities::parse_memory(&system_reserved_memory) + .map_err(|e| { + miette::miette!( + help = "Use a value like '256Mi' or '1Gi' for --system-reserved-memory", + "Invalid --system-reserved-memory '{}': {}", + system_reserved_memory, + e + ) + })?; let supported_brands: Vec = supported_brands .split(',') @@ -171,11 +181,13 @@ async fn main() -> miette::Result<()> { volumes_dataset.as_deref(), zonepath_prefix.as_deref(), &pod_cidr, + &service_cidr, ðerstub_name, reserved_cpu_millicores, reserved_memory_bytes, max_pods, &supported_brands, + &cluster_dns, &tls_args, ) .await @@ -272,17 +284,33 @@ async fn run_agent( volumes_dataset: Option<&str>, zonepath_prefix: Option<&str>, pod_cidr: &str, + service_cidr: &str, etherstub_name: &str, system_reserved_cpu_millicores: i64, system_reserved_memory_bytes: i64, max_pods: u32, supported_brands: &[String], + cluster_dns: &str, tls_args: &TlsArgs, ) -> miette::Result<()> { info!("Starting reddwarf agent for node '{}'", node_name); let state = create_app_state(data_dir)?; + // Create service IPAM for ClusterIP allocation and attach to state + let service_ipam = + Ipam::with_prefix(state.storage.clone(), service_cidr, "svc-ipam").map_err(|e| { + miette::miette!( + "Failed to initialize service IPAM with CIDR '{}': {}", + service_cidr, + e + ) + })?; + let state = Arc::new(AppState { + service_ipam: Some(Arc::new(service_ipam)), + ..(*state).clone() + }); + bootstrap_default_namespace(&state).await?; let listen_addr: std::net::SocketAddr = bind @@ -372,7 +400,7 @@ async fn run_agent( }; let controller = PodController::new( - runtime, + runtime.clone(), api_client.clone(), state.event_tx.clone(), controller_config, @@ -391,7 +419,8 @@ async fn run_agent( node_agent_config.system_reserved_memory_bytes = system_reserved_memory_bytes; node_agent_config.max_pods = max_pods; node_agent_config.supported_brands = supported_brands.to_vec(); - let node_agent = NodeAgent::new(api_client.clone(), node_agent_config); + let node_agent = + NodeAgent::with_runtime(api_client.clone(), node_agent_config, Some(runtime.clone())); let agent_token = token.clone(); let node_agent_handle = tokio::spawn(async move { if let Err(e) = node_agent.run(agent_token).await { @@ -399,7 +428,35 @@ async fn run_agent( } }); - // 7. Spawn node health checker + // 7. Spawn service controller + let nat_manager = Arc::new(NatManager::new(etherstub_name)); + let service_controller = ServiceController::new( + state.storage.clone(), + state.event_tx.clone(), + nat_manager, + ServiceControllerConfig::default(), + ); + let service_token = token.clone(); + let service_handle = tokio::spawn(async move { + if let Err(e) = service_controller.run(service_token).await { + error!("Service controller error: {}", e); + } + }); + + // 8. Spawn DNS server for service discovery + let dns_config = DnsServerConfig { + listen_addr: cluster_dns.to_string(), + ..DnsServerConfig::default() + }; + let dns_server = DnsServer::new(state.storage.clone(), dns_config); + let dns_token = token.clone(); + let dns_handle = tokio::spawn(async move { + if let Err(e) = dns_server.run(dns_token).await { + error!("DNS server error: {}", e); + } + }); + + // 9. Spawn node health checker let health_checker = NodeHealthChecker::new(api_client, NodeHealthCheckerConfig::default()); let health_token = token.clone(); let health_handle = tokio::spawn(async move { @@ -409,8 +466,8 @@ async fn run_agent( }); info!( - "All components started. API server on {}, node name: {}, pod CIDR: {}", - bind, node_name, pod_cidr + "All components started. API server on {}, node name: {}, pod CIDR: {}, DNS on {}", + bind, node_name, pod_cidr, cluster_dns ); // Wait for shutdown signal (SIGINT or SIGTERM) @@ -426,6 +483,8 @@ async fn run_agent( scheduler_handle, controller_handle, node_agent_handle, + service_handle, + dns_handle, health_handle, ); })