Add service networking, bhyve brand, ipadm IP config, and zone state reporting

Service networking:
- ClusterIP IPAM allocation on service create/delete via reusable Ipam with_prefix()
- ServiceController watches Pod/Service events + periodic reconcile to track endpoints
- NatManager generates ipnat rdr rules for ClusterIP -> pod IP forwarding
- Embedded DNS server resolves {svc}.{ns}.svc.cluster.local to ClusterIP
- New CLI flags: --service-cidr (default 10.96.0.0/12), --cluster-dns (default 0.0.0.0:10053)

Quick wins:
- ipadm IP assignment: configure_zone_ip() runs ipadm/route inside zone via zlogin after boot
- Node heartbeat zone state reporting: reddwarf.io/zone-count and zone-summary annotations
- bhyve brand support: ZoneBrand::Bhyve, install args, zonecfg device generation, controller integration

189 tests passing, clippy clean.

https://claude.ai/code/session_016QLFjAyYGzMPbBjEGMe75j
This commit is contained in:
Claude 2026-03-19 20:28:40 +00:00
parent d79f8ce011
commit d8425ad85d
No known key found for this signature in database
34 changed files with 1608 additions and 220 deletions

View file

@ -1,6 +1,6 @@
# Reddwarf Production Readiness Audit # Reddwarf Production Readiness Audit
**Last updated:** 2026-02-14 **Last updated:** 2026-03-19
**Baseline commit:** `58171c7` (Add periodic reconciliation, node health checker, and graceful pod termination) **Baseline commit:** `58171c7` (Add periodic reconciliation, node health checker, and graceful pod termination)
--- ---
@ -43,7 +43,7 @@
|---|---|---| |---|---|---|
| Self-registration | DONE | Creates Node resource with allocatable CPU/memory | | Self-registration | DONE | Creates Node resource with allocatable CPU/memory |
| Periodic heartbeat | DONE | 10-second interval, Ready condition | | Periodic heartbeat | DONE | 10-second interval, Ready condition |
| Report zone states | NOT DONE | Heartbeat doesn't query actual zone states | | Report zone states | DONE | Heartbeat queries `list_zones()`, reports `reddwarf.io/zone-count` and `reddwarf.io/zone-summary` annotations |
| Dynamic resource reporting | DONE | `sysinfo.rs` — detects CPU/memory via `sys-info`, capacity vs allocatable split with configurable reservations (`--system-reserved-cpu`, `--system-reserved-memory`, `--max-pods`). Done in `d3eb0b2` | | Dynamic resource reporting | DONE | `sysinfo.rs` — detects CPU/memory via `sys-info`, capacity vs allocatable split with configurable reservations (`--system-reserved-cpu`, `--system-reserved-memory`, `--max-pods`). Done in `d3eb0b2` |
## 5. Main Binary ## 5. Main Binary
@ -62,9 +62,9 @@
|---|---|---| |---|---|---|
| Etherstub creation | DONE | `dladm create-etherstub` | | Etherstub creation | DONE | `dladm create-etherstub` |
| VNIC per zone | DONE | `dladm create-vnic -l etherstub` | | VNIC per zone | DONE | `dladm create-vnic -l etherstub` |
| ipadm IP assignment | PARTIAL | IP set in zonecfg `allowed-address` but no explicit `ipadm create-addr` call | | ipadm IP assignment | DONE | `configure_zone_ip()` runs `ipadm create-if`, `ipadm create-addr`, `route add default` inside zone via `zlogin` after boot |
| IPAM | DONE | Sequential alloc, idempotent, persistent, pool exhaustion handling | | IPAM | DONE | Sequential alloc, idempotent, persistent, pool exhaustion handling |
| Service ClusterIP / NAT | NOT DONE | Services stored at API level but no backend controller, no ipnat rules, no proxy, no DNS | | Service ClusterIP / NAT | DONE | ClusterIP IPAM allocation on service create/delete, `ServiceController` watches events + periodic reconcile, `NatManager` generates ipnat rdr rules, embedded `DnsServer` resolves `{svc}.{ns}.svc.cluster.local` → ClusterIP |
## 7. Scheduler ## 7. Scheduler
@ -88,7 +88,7 @@
- [x] Graceful pod termination — done in `58171c7` - [x] Graceful pod termination — done in `58171c7`
### Medium (limits functionality) ### Medium (limits functionality)
- [ ] Service networking — no ClusterIP, no NAT/proxy, no DNS - [x] Service networking — ClusterIP IPAM, ServiceController endpoint tracking, ipnat NAT rules, embedded DNS server
- [x] Health probes — exec/HTTP/TCP liveness/readiness/startup probes via zlogin - [x] Health probes — exec/HTTP/TCP liveness/readiness/startup probes via zlogin
- [ ] Image management — no pull/registry, no `.zar` support, no golden image bootstrap - [ ] Image management — no pull/registry, no `.zar` support, no golden image bootstrap
- [x] Dynamic node resources — done in `d3eb0b2` - [x] Dynamic node resources — done in `d3eb0b2`
@ -96,4 +96,4 @@
### Low (nice to have) ### Low (nice to have)
- [x] Zone brand scheduling filter — done in `4c7f50a` - [x] Zone brand scheduling filter — done in `4c7f50a`
- [x] ShuttingDown to Terminating mapping fix — done in `58171c7` - [x] ShuttingDown to Terminating mapping fix — done in `58171c7`
- [ ] bhyve brand — type exists but no implementation - [x] bhyve brand — `ZoneBrand::Bhyve`, install args, zonecfg device generation, controller brand selection

1
Cargo.lock generated
View file

@ -1495,6 +1495,7 @@ dependencies = [
"miette", "miette",
"rcgen", "rcgen",
"reddwarf-core", "reddwarf-core",
"reddwarf-runtime",
"reddwarf-storage", "reddwarf-storage",
"reddwarf-versioning", "reddwarf-versioning",
"rustls", "rustls",

View file

@ -9,6 +9,7 @@ rust-version.workspace = true
[dependencies] [dependencies]
reddwarf-core = { workspace = true } reddwarf-core = { workspace = true }
reddwarf-runtime = { workspace = true }
reddwarf-storage = { workspace = true } reddwarf-storage = { workspace = true }
reddwarf-versioning = { workspace = true } reddwarf-versioning = { workspace = true }
axum = { workspace = true } axum = { workspace = true }

View file

@ -374,9 +374,7 @@ mod tests {
// Set deletion metadata // Set deletion metadata
pod.metadata.deletion_timestamp = Some( pod.metadata.deletion_timestamp = Some(
reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()),
chrono::Utc::now(),
),
); );
pod.metadata.deletion_grace_period_seconds = Some(30); pod.metadata.deletion_grace_period_seconds = Some(30);
let status = pod.status.get_or_insert_with(Default::default); let status = pod.status.get_or_insert_with(Default::default);
@ -410,14 +408,10 @@ mod tests {
// First delete: set deletion_timestamp // First delete: set deletion_timestamp
let mut pod: Pod = get_resource(&*state, &key).await.unwrap(); let mut pod: Pod = get_resource(&*state, &key).await.unwrap();
pod.metadata.deletion_timestamp = Some( pod.metadata.deletion_timestamp = Some(
reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()),
chrono::Utc::now(),
),
); );
pod.metadata.deletion_grace_period_seconds = Some(30); pod.metadata.deletion_grace_period_seconds = Some(30);
pod.status pod.status.get_or_insert_with(Default::default).phase = Some("Terminating".to_string());
.get_or_insert_with(Default::default)
.phase = Some("Terminating".to_string());
update_resource(&*state, pod).await.unwrap(); update_resource(&*state, pod).await.unwrap();
// Second "delete" attempt should see deletion_timestamp is already set // Second "delete" attempt should see deletion_timestamp is already set
@ -467,13 +461,9 @@ mod tests {
let mut pod: Pod = get_resource(&*state, &key).await.unwrap(); let mut pod: Pod = get_resource(&*state, &key).await.unwrap();
pod.metadata.deletion_timestamp = Some( pod.metadata.deletion_timestamp = Some(
reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( reddwarf_core::k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()),
chrono::Utc::now(),
),
); );
pod.status pod.status.get_or_insert_with(Default::default).phase = Some("Terminating".to_string());
.get_or_insert_with(Default::default)
.phase = Some("Terminating".to_string());
update_resource(&*state, pod).await.unwrap(); update_resource(&*state, pod).await.unwrap();
// Should get a MODIFIED event // Should get a MODIFIED event

View file

@ -53,9 +53,41 @@ pub async fn create_service(
) -> Result<Response> { ) -> Result<Response> {
info!("Creating service in namespace: {}", namespace); info!("Creating service in namespace: {}", namespace);
service.metadata.namespace = Some(namespace); service.metadata.namespace = Some(namespace.clone());
validate_resource(&service)?; validate_resource(&service)?;
// Allocate a ClusterIP if service IPAM is configured and no ClusterIP is set
if let Some(ref ipam) = state.service_ipam {
let name = service.metadata.name.as_deref().unwrap_or("unknown");
let has_cluster_ip = service
.spec
.as_ref()
.and_then(|s| s.cluster_ip.as_ref())
.is_some_and(|ip| !ip.is_empty() && ip != "None");
if !has_cluster_ip {
match ipam.allocate(&namespace, name) {
Ok(alloc) => {
let spec = service.spec.get_or_insert_with(Default::default);
spec.cluster_ip = Some(alloc.ip_address.to_string());
spec.cluster_ips = Some(vec![alloc.ip_address.to_string()]);
info!(
"Allocated ClusterIP {} for service {}/{}",
alloc.ip_address, namespace, name
);
}
Err(e) => {
tracing::warn!(
"Failed to allocate ClusterIP for {}/{}: {}",
namespace,
name,
e
);
}
}
}
}
let created = create_resource(&state, service).await?; let created = create_resource(&state, service).await?;
Ok(ApiResponse::created(created).into_response()) Ok(ApiResponse::created(created).into_response())
@ -86,9 +118,21 @@ pub async fn delete_service(
info!("Deleting service: {}/{}", namespace, name); info!("Deleting service: {}/{}", namespace, name);
let gvk = GroupVersionKind::from_api_version_kind("v1", "Service"); let gvk = GroupVersionKind::from_api_version_kind("v1", "Service");
let key = ResourceKey::new(gvk, namespace, name.clone()); let key = ResourceKey::new(gvk, namespace.clone(), name.clone());
delete_resource(&state, &key).await?; delete_resource(&state, &key).await?;
// Release ClusterIP allocation
if let Some(ref ipam) = state.service_ipam {
if let Err(e) = ipam.release(&namespace, &name) {
tracing::warn!(
"Failed to release ClusterIP for {}/{}: {}",
namespace,
name,
e
);
}
}
Ok(status_deleted(&name, "Service")) Ok(status_deleted(&name, "Service"))
} }

View file

@ -134,26 +134,20 @@ impl ApiServer {
.await .await
} }
Some(material) => { Some(material) => {
info!( info!("Starting API server on {} (HTTPS)", self.config.listen_addr);
"Starting API server on {} (HTTPS)",
self.config.listen_addr
);
let rustls_config = axum_server::tls_rustls::RustlsConfig::from_pem( let rustls_config = axum_server::tls_rustls::RustlsConfig::from_pem(
material.cert_pem, material.cert_pem,
material.key_pem, material.key_pem,
) )
.await .await
.map_err(|e| { .map_err(|e| std::io::Error::other(format!("failed to build RustlsConfig: {e}")))?;
std::io::Error::other(format!("failed to build RustlsConfig: {e}"))
})?;
let handle = axum_server::Handle::new(); let handle = axum_server::Handle::new();
let shutdown_handle = handle.clone(); let shutdown_handle = handle.clone();
tokio::spawn(async move { tokio::spawn(async move {
token.cancelled().await; token.cancelled().await;
shutdown_handle shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
}); });
axum_server::bind_rustls(self.config.listen_addr, rustls_config) axum_server::bind_rustls(self.config.listen_addr, rustls_config)

View file

@ -1,4 +1,5 @@
use crate::event_bus::{EventBusConfig, ResourceEvent}; use crate::event_bus::{EventBusConfig, ResourceEvent};
use reddwarf_runtime::Ipam;
use reddwarf_storage::RedbBackend; use reddwarf_storage::RedbBackend;
use reddwarf_versioning::VersionStore; use reddwarf_versioning::VersionStore;
use std::sync::Arc; use std::sync::Arc;
@ -15,6 +16,9 @@ pub struct AppState {
/// Event bus sender — broadcast channel for resource mutation events /// Event bus sender — broadcast channel for resource mutation events
pub event_tx: broadcast::Sender<ResourceEvent>, pub event_tx: broadcast::Sender<ResourceEvent>,
/// Service CIDR IPAM for ClusterIP allocation (None if not configured)
pub service_ipam: Option<Arc<Ipam>>,
} }
impl AppState { impl AppState {
@ -34,6 +38,7 @@ impl AppState {
storage, storage,
version_store, version_store,
event_tx, event_tx,
service_ipam: None,
} }
} }

View file

@ -46,7 +46,10 @@ pub fn resolve_tls(mode: &TlsMode) -> miette::Result<Option<TlsMaterial>> {
let key_path = data_dir.join("server-key.pem"); let key_path = data_dir.join("server-key.pem");
if ca_path.exists() && cert_path.exists() && key_path.exists() { if ca_path.exists() && cert_path.exists() && key_path.exists() {
info!("Loading existing TLS certificates from {}", data_dir.display()); info!(
"Loading existing TLS certificates from {}",
data_dir.display()
);
let ca_pem = std::fs::read(&ca_path) let ca_pem = std::fs::read(&ca_path)
.into_diagnostic() .into_diagnostic()
.wrap_err_with(|| format!("failed to read CA cert at {}", ca_path.display()))?; .wrap_err_with(|| format!("failed to read CA cert at {}", ca_path.display()))?;
@ -80,9 +83,7 @@ pub fn resolve_tls(mode: &TlsMode) -> miette::Result<Option<TlsMaterial>> {
} => { } => {
let cert_pem = std::fs::read(cert_path) let cert_pem = std::fs::read(cert_path)
.into_diagnostic() .into_diagnostic()
.wrap_err_with(|| { .wrap_err_with(|| format!("failed to read TLS cert at {}", cert_path.display()))?;
format!("failed to read TLS cert at {}", cert_path.display())
})?;
let key_pem = std::fs::read(key_path) let key_pem = std::fs::read(key_path)
.into_diagnostic() .into_diagnostic()
.wrap_err_with(|| format!("failed to read TLS key at {}", key_path.display()))?; .wrap_err_with(|| format!("failed to read TLS key at {}", key_path.display()))?;
@ -103,7 +104,9 @@ fn generate_self_signed(data_dir: &Path, san_entries: &[String]) -> miette::Resu
.wrap_err_with(|| format!("failed to create TLS directory {}", data_dir.display()))?; .wrap_err_with(|| format!("failed to create TLS directory {}", data_dir.display()))?;
// --- CA --- // --- CA ---
let ca_key = KeyPair::generate().into_diagnostic().wrap_err("failed to generate CA key pair")?; let ca_key = KeyPair::generate()
.into_diagnostic()
.wrap_err("failed to generate CA key pair")?;
let mut ca_params = CertificateParams::new(vec!["Reddwarf CA".to_string()]) let mut ca_params = CertificateParams::new(vec!["Reddwarf CA".to_string()])
.into_diagnostic() .into_diagnostic()
@ -177,7 +180,9 @@ mod tests {
san_entries: vec!["localhost".to_string(), "127.0.0.1".to_string()], san_entries: vec!["localhost".to_string(), "127.0.0.1".to_string()],
}; };
let material = resolve_tls(&mode).unwrap().expect("should produce material"); let material = resolve_tls(&mode)
.unwrap()
.expect("should produce material");
assert!(!material.cert_pem.is_empty()); assert!(!material.cert_pem.is_empty());
assert!(!material.key_pem.is_empty()); assert!(!material.key_pem.is_empty());
@ -227,7 +232,9 @@ mod tests {
key_path: tls_dir.join("server-key.pem"), key_path: tls_dir.join("server-key.pem"),
}; };
let material = resolve_tls(&mode).unwrap().expect("should produce material"); let material = resolve_tls(&mode)
.unwrap()
.expect("should produce material");
assert!(!material.cert_pem.is_empty()); assert!(!material.cert_pem.is_empty());
assert!(!material.key_pem.is_empty()); assert!(!material.key_pem.is_empty());
assert!(material.ca_pem.is_none()); assert!(material.ca_pem.is_none());

View file

@ -154,10 +154,7 @@ mod tests {
"512M" "512M"
); );
// Exact KiB // Exact KiB
assert_eq!( assert_eq!(ResourceQuantities::memory_as_zone_cap(256 * 1024), "256K");
ResourceQuantities::memory_as_zone_cap(256 * 1024),
"256K"
);
// 1500 MiB = not a clean GiB, falls to MiB // 1500 MiB = not a clean GiB, falls to MiB
assert_eq!( assert_eq!(
ResourceQuantities::memory_as_zone_cap(1500 * 1024 * 1024), ResourceQuantities::memory_as_zone_cap(1500 * 1024 * 1024),

View file

@ -0,0 +1,91 @@
use crate::error::Result;
use crate::types::ZoneConfig;
/// Get the install arguments for a bhyve brand zone
///
/// Bhyve zones use hardware virtualization. If a disk image is specified,
/// it is passed as a boot disk device.
pub fn bhyve_install_args(config: &ZoneConfig) -> Result<Vec<String>> {
// bhyve brand zones typically install without extra arguments.
// If a disk image is provided, it can be used as the boot disk.
if let Some(ref _disk_image) = config.bhyve_disk_image {
// The disk device is configured in zonecfg, not install args.
// Standard install is sufficient.
Ok(vec![])
} else {
Ok(vec![])
}
}
/// Generate bhyve-specific zonecfg lines for device passthrough
pub fn bhyve_zonecfg_lines(config: &ZoneConfig) -> Vec<String> {
let mut lines = Vec::new();
// Add disk device if specified (typically a ZFS zvol)
if let Some(ref disk_image) = config.bhyve_disk_image {
lines.push("add device".to_string());
lines.push(format!("set match={}", disk_image));
lines.push("end".to_string());
}
lines
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::*;
fn make_bhyve_config(disk_image: Option<String>) -> ZoneConfig {
ZoneConfig {
zone_name: "bhyve-test".to_string(),
brand: ZoneBrand::Bhyve,
zonepath: "/zones/bhyve-test".to_string(),
network: NetworkMode::Etherstub(EtherstubConfig {
etherstub_name: "stub0".to_string(),
vnic_name: "vnic0".to_string(),
ip_address: "10.0.0.2".to_string(),
gateway: "10.0.0.1".to_string(),
prefix_len: 16,
}),
storage: ZoneStorageOpts::default(),
lx_image_path: None,
bhyve_disk_image: disk_image,
processes: vec![],
cpu_cap: None,
memory_cap: None,
fs_mounts: vec![],
}
}
#[test]
fn test_bhyve_install_args_no_disk() {
let config = make_bhyve_config(None);
let args = bhyve_install_args(&config).unwrap();
assert!(args.is_empty());
}
#[test]
fn test_bhyve_install_args_with_disk() {
let config = make_bhyve_config(Some("/dev/zvol/rpool/bhyve-disk".to_string()));
let args = bhyve_install_args(&config).unwrap();
assert!(args.is_empty()); // disk is in zonecfg, not install args
}
#[test]
fn test_bhyve_zonecfg_lines_no_disk() {
let config = make_bhyve_config(None);
let lines = bhyve_zonecfg_lines(&config);
assert!(lines.is_empty());
}
#[test]
fn test_bhyve_zonecfg_lines_with_disk() {
let config = make_bhyve_config(Some("/dev/zvol/rpool/bhyve-disk".to_string()));
let lines = bhyve_zonecfg_lines(&config);
assert_eq!(lines.len(), 3);
assert!(lines[0].contains("add device"));
assert!(lines[1].contains("/dev/zvol/rpool/bhyve-disk"));
assert!(lines[2].contains("end"));
}
}

View file

@ -32,6 +32,7 @@ mod tests {
}), }),
storage: ZoneStorageOpts::default(), storage: ZoneStorageOpts::default(),
lx_image_path: image_path, lx_image_path: image_path,
bhyve_disk_image: None,
processes: vec![], processes: vec![],
cpu_cap: None, cpu_cap: None,
memory_cap: None, memory_cap: None,

View file

@ -1,2 +1,3 @@
pub mod bhyve;
pub mod custom; pub mod custom;
pub mod lx; pub mod lx;

View file

@ -291,15 +291,13 @@ impl PodController {
let mut tracker = self.probe_tracker.lock().await; let mut tracker = self.probe_tracker.lock().await;
tracker.register_pod(&pod_key, probes, started_at); tracker.register_pod(&pod_key, probes, started_at);
let status = tracker let status = tracker.check_pod(&pod_key, &zone_name, &zone_ip).await;
.check_pod(&pod_key, &zone_name, &zone_ip)
.await;
drop(tracker); drop(tracker);
if status.liveness_failed { if status.liveness_failed {
let message = status.failure_message.unwrap_or_else(|| { let message = status
"Liveness probe failed".to_string() .failure_message
}); .unwrap_or_else(|| "Liveness probe failed".to_string());
warn!( warn!(
"Liveness probe failed for pod {}/{}: {}", "Liveness probe failed for pod {}/{}: {}",
namespace, pod_name, message namespace, pod_name, message
@ -328,9 +326,9 @@ impl PodController {
let mut tracker = self.probe_tracker.lock().await; let mut tracker = self.probe_tracker.lock().await;
tracker.unregister_pod(&pod_key); tracker.unregister_pod(&pod_key);
} else if !status.ready { } else if !status.ready {
let message = status.failure_message.unwrap_or_else(|| { let message = status
"Readiness probe failed".to_string() .failure_message
}); .unwrap_or_else(|| "Readiness probe failed".to_string());
debug!( debug!(
"Readiness probe not passing for pod {}/{}: {}", "Readiness probe not passing for pod {}/{}: {}",
namespace, pod_name, message namespace, pod_name, message
@ -639,10 +637,7 @@ impl PodController {
// Finalize — remove the pod from API server storage // Finalize — remove the pod from API server storage
if let Err(e) = self.api_client.finalize_pod(namespace, pod_name).await { if let Err(e) = self.api_client.finalize_pod(namespace, pod_name).await {
error!( error!("Failed to finalize pod {}/{}: {}", namespace, pod_name, e);
"Failed to finalize pod {}/{}: {}",
namespace, pod_name, e
);
} else { } else {
info!("Pod {}/{} finalized and removed", namespace, pod_name); info!("Pod {}/{} finalized and removed", namespace, pod_name);
} }
@ -659,10 +654,7 @@ impl PodController {
None => return false, None => return false,
}; };
let grace_secs = pod let grace_secs = pod.metadata.deletion_grace_period_seconds.unwrap_or(30);
.metadata
.deletion_grace_period_seconds
.unwrap_or(30);
let deadline = deletion_ts + chrono::Duration::seconds(grace_secs); let deadline = deletion_ts + chrono::Duration::seconds(grace_secs);
Utc::now() >= deadline Utc::now() >= deadline
@ -768,6 +760,7 @@ impl PodController {
.and_then(|v| match v.as_str() { .and_then(|v| match v.as_str() {
"lx" => Some(ZoneBrand::Lx), "lx" => Some(ZoneBrand::Lx),
"reddwarf" => Some(ZoneBrand::Reddwarf), "reddwarf" => Some(ZoneBrand::Reddwarf),
"bhyve" => Some(ZoneBrand::Bhyve),
_ => None, _ => None,
}) })
.unwrap_or_else(|| self.config.default_brand.clone()); .unwrap_or_else(|| self.config.default_brand.clone());
@ -779,6 +772,7 @@ impl PodController {
network, network,
storage: ZoneStorageOpts::default(), storage: ZoneStorageOpts::default(),
lx_image_path: None, lx_image_path: None,
bhyve_disk_image: None,
processes, processes,
cpu_cap, cpu_cap,
memory_cap, memory_cap,
@ -801,10 +795,7 @@ impl PodController {
None => return vec![], None => return vec![],
}; };
spec.containers spec.containers.iter().flat_map(extract_probes).collect()
.iter()
.flat_map(|c| extract_probes(c))
.collect()
} }
/// Get the pod's IP from its status, falling back to empty string /// Get the pod's IP from its status, falling back to empty string
@ -820,16 +811,14 @@ impl PodController {
fn pod_start_time(&self, pod: &Pod) -> Instant { fn pod_start_time(&self, pod: &Pod) -> Instant {
// We can't convert k8s Time to std Instant directly. Instead, compute // We can't convert k8s Time to std Instant directly. Instead, compute
// the elapsed duration since start_time and subtract from Instant::now(). // the elapsed duration since start_time and subtract from Instant::now().
if let Some(start_time) = pod if let Some(start_time) = pod.status.as_ref().and_then(|s| s.start_time.as_ref()) {
.status
.as_ref()
.and_then(|s| s.start_time.as_ref())
{
let now_utc = Utc::now(); let now_utc = Utc::now();
let started_utc = start_time.0; let started_utc = start_time.0;
let elapsed = now_utc.signed_duration_since(started_utc); let elapsed = now_utc.signed_duration_since(started_utc);
if let Ok(elapsed_std) = elapsed.to_std() { if let Ok(elapsed_std) = elapsed.to_std() {
return Instant::now().checked_sub(elapsed_std).unwrap_or_else(Instant::now); return Instant::now()
.checked_sub(elapsed_std)
.unwrap_or_else(Instant::now);
} }
} }
Instant::now() Instant::now()
@ -1183,11 +1172,10 @@ mod tests {
let mut pod = Pod::default(); let mut pod = Pod::default();
pod.metadata.name = Some("expired-pod".to_string()); pod.metadata.name = Some("expired-pod".to_string());
// Set deletion_timestamp 60 seconds in the past // Set deletion_timestamp 60 seconds in the past
pod.metadata.deletion_timestamp = Some( pod.metadata.deletion_timestamp =
k8s_openapi::apimachinery::pkg::apis::meta::v1::Time( Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(
Utc::now() - chrono::Duration::seconds(60), Utc::now() - chrono::Duration::seconds(60),
), ));
);
pod.metadata.deletion_grace_period_seconds = Some(30); pod.metadata.deletion_grace_period_seconds = Some(30);
assert!(controller.is_grace_period_expired(&pod)); assert!(controller.is_grace_period_expired(&pod));
@ -1344,7 +1332,11 @@ mod tests {
assert_eq!(zone_config.brand, ZoneBrand::Reddwarf); assert_eq!(zone_config.brand, ZoneBrand::Reddwarf);
} }
fn make_test_controller_with_runtime() -> (PodController, Arc<crate::mock::MockRuntime>, tempfile::TempDir) { fn make_test_controller_with_runtime() -> (
PodController,
Arc<crate::mock::MockRuntime>,
tempfile::TempDir,
) {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let db_path = dir.path().join("test-controller-rt.redb"); let db_path = dir.path().join("test-controller-rt.redb");
let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); let storage = Arc::new(RedbBackend::new(&db_path).unwrap());
@ -1367,7 +1359,13 @@ mod tests {
reconcile_interval: Duration::from_secs(30), reconcile_interval: Duration::from_secs(30),
}; };
let controller = PodController::new(runtime.clone() as Arc<dyn ZoneRuntime>, api_client, event_tx, config, ipam); let controller = PodController::new(
runtime.clone() as Arc<dyn ZoneRuntime>,
api_client,
event_tx,
config,
ipam,
);
(controller, runtime, dir) (controller, runtime, dir)
} }
@ -1473,9 +1471,7 @@ mod tests {
// Verify the liveness failure path was taken: probes should be unregistered // Verify the liveness failure path was taken: probes should be unregistered
let pod_key = "default/liveness-pod"; let pod_key = "default/liveness-pod";
let mut tracker = controller.probe_tracker.lock().await; let mut tracker = controller.probe_tracker.lock().await;
let status = tracker let status = tracker.check_pod(pod_key, &zone_name, "10.88.0.2").await;
.check_pod(pod_key, &zone_name, "10.88.0.2")
.await;
// No probes registered → default status (ready=true, liveness_failed=false) // No probes registered → default status (ready=true, liveness_failed=false)
// This confirms the unregister happened, which only occurs on liveness failure // This confirms the unregister happened, which only occurs on liveness failure
assert!(status.ready); assert!(status.ready);

View file

@ -0,0 +1,461 @@
use reddwarf_storage::{KVStore, KeyEncoder, RedbBackend};
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::sync::{Arc, RwLock};
use tokio::net::UdpSocket;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
/// Configuration for the embedded DNS server
#[derive(Debug, Clone)]
pub struct DnsServerConfig {
/// Address to listen on (e.g. "10.96.0.10:53" or "0.0.0.0:10053")
pub listen_addr: String,
/// Cluster domain suffix
pub cluster_domain: String,
/// How often to refresh the service map from storage (seconds)
pub refresh_interval_secs: u64,
}
impl Default for DnsServerConfig {
fn default() -> Self {
Self {
listen_addr: "0.0.0.0:10053".to_string(),
cluster_domain: "cluster.local".to_string(),
refresh_interval_secs: 5,
}
}
}
/// A minimal DNS server that resolves `{service}.{namespace}.svc.cluster.local`
/// to the corresponding ClusterIP by reading services from storage.
pub struct DnsServer {
storage: Arc<RedbBackend>,
config: DnsServerConfig,
/// Cache of service name -> ClusterIP, keyed by "{name}.{namespace}"
records: Arc<RwLock<HashMap<String, Ipv4Addr>>>,
}
impl DnsServer {
pub fn new(storage: Arc<RedbBackend>, config: DnsServerConfig) -> Self {
Self {
storage,
config,
records: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Run the DNS server
pub async fn run(&self, token: CancellationToken) -> crate::error::Result<()> {
// Refresh records before starting
self.refresh_records();
let socket = UdpSocket::bind(&self.config.listen_addr)
.await
.map_err(|e| {
crate::error::RuntimeError::network_error(format!(
"Failed to bind DNS server on {}: {}",
self.config.listen_addr, e
))
})?;
info!("DNS server listening on {}", self.config.listen_addr);
let mut buf = [0u8; 512];
let mut refresh_interval = tokio::time::interval(std::time::Duration::from_secs(
self.config.refresh_interval_secs,
));
refresh_interval.tick().await; // consume first tick
loop {
tokio::select! {
_ = token.cancelled() => {
info!("DNS server shutting down");
return Ok(());
}
_ = refresh_interval.tick() => {
self.refresh_records();
}
result = socket.recv_from(&mut buf) => {
match result {
Ok((len, src)) => {
let query = &buf[..len];
match self.handle_query(query) {
Some(response) => {
if let Err(e) = socket.send_to(&response, src).await {
warn!("Failed to send DNS response to {}: {}", src, e);
}
}
None => {
// Send NXDOMAIN / SERVFAIL
if let Some(response) = build_nxdomain(query) {
let _ = socket.send_to(&response, src).await;
}
}
}
}
Err(e) => {
warn!("DNS recv error: {}", e);
}
}
}
}
}
}
/// Refresh the service -> ClusterIP map from storage
fn refresh_records(&self) {
let prefix = KeyEncoder::encode_prefix("v1", "Service", None);
let entries = match self.storage.scan(prefix.as_bytes()) {
Ok(entries) => entries,
Err(e) => {
error!("DNS: failed to scan services: {}", e);
return;
}
};
let mut new_records = HashMap::new();
for (_key, value) in entries {
if let Ok(svc) = serde_json::from_slice::<k8s_openapi::api::core::v1::Service>(&value) {
let name = match svc.metadata.name.as_deref() {
Some(n) => n,
None => continue,
};
let namespace = svc.metadata.namespace.as_deref().unwrap_or("default");
let cluster_ip = match svc.spec.as_ref().and_then(|s| s.cluster_ip.as_ref()) {
Some(ip) if !ip.is_empty() && ip != "None" => ip,
_ => continue,
};
if let Ok(ip) = cluster_ip.parse::<Ipv4Addr>() {
let key = format!("{}.{}", name, namespace);
new_records.insert(key, ip);
}
}
}
debug!("DNS: refreshed {} service records", new_records.len());
if let Ok(mut records) = self.records.write() {
*records = new_records;
}
}
/// Handle a DNS query, returning a response if we can answer it
fn handle_query(&self, query: &[u8]) -> Option<Vec<u8>> {
// Minimum DNS header is 12 bytes
if query.len() < 12 {
return None;
}
// Parse the question name
let (qname, qname_end) = parse_dns_name(query, 12)?;
// Verify we have enough bytes for qtype + qclass (4 bytes after name)
if qname_end + 4 > query.len() {
return None;
}
let qtype = u16::from_be_bytes([query[qname_end], query[qname_end + 1]]);
// Only handle A records (type 1)
if qtype != 1 {
return None;
}
// Try to resolve: expect format "{service}.{namespace}.svc.{cluster_domain}"
let suffix = format!(".svc.{}", self.config.cluster_domain);
let name_lower = qname.to_lowercase();
let without_suffix = name_lower.strip_suffix(&suffix)?;
// Split into service.namespace
let parts: Vec<&str> = without_suffix.splitn(2, '.').collect();
if parts.len() != 2 {
return None;
}
let (svc_name, ns_name) = (parts[0], parts[1]);
let lookup_key = format!("{}.{}", svc_name, ns_name);
let ip = {
let records = self.records.read().ok()?;
records.get(&lookup_key).copied()?
};
debug!("DNS: resolved {} -> {}", qname, ip);
// Build response
Some(build_a_response(query, qname_end + 4, ip))
}
}
/// Parse a DNS wire-format name starting at `offset` in `buf`.
/// Returns (dotted-name, offset-after-name).
fn parse_dns_name(buf: &[u8], mut offset: usize) -> Option<(String, usize)> {
let mut parts = Vec::new();
let mut total_len = 0usize;
loop {
if offset >= buf.len() {
return None;
}
let len = buf[offset] as usize;
if len == 0 {
offset += 1;
break;
}
// Check for compression pointer (top 2 bits set)
if len & 0xC0 == 0xC0 {
// We don't follow compression pointers for simplicity
// (queries usually don't use them)
return None;
}
offset += 1;
if offset + len > buf.len() {
return None;
}
total_len += len + 1;
if total_len > 253 {
return None; // Name too long
}
let label = std::str::from_utf8(&buf[offset..offset + len]).ok()?;
parts.push(label.to_string());
offset += len;
}
if parts.is_empty() {
return None;
}
Some((parts.join("."), offset))
}
/// Build a DNS A record response
fn build_a_response(query: &[u8], question_end: usize, ip: Ipv4Addr) -> Vec<u8> {
let mut resp = Vec::with_capacity(question_end + 16);
// Copy header
resp.extend_from_slice(&query[..2]); // Transaction ID
// Flags: QR=1 (response), AA=1 (authoritative), RA=1 (recursion available)
resp.push(0x84); // QR=1, Opcode=0, AA=1, TC=0, RD=0
resp.push(0x00); // RA=0, Z=0, RCODE=0 (no error)
// QDCOUNT = 1
resp.push(0x00);
resp.push(0x01);
// ANCOUNT = 1
resp.push(0x00);
resp.push(0x01);
// NSCOUNT = 0
resp.push(0x00);
resp.push(0x00);
// ARCOUNT = 0
resp.push(0x00);
resp.push(0x00);
// Copy question section
resp.extend_from_slice(&query[12..question_end]);
// Answer section: pointer to name in question
resp.push(0xC0); // Compression pointer
resp.push(0x0C); // Offset 12 (start of question name)
// Type A
resp.push(0x00);
resp.push(0x01);
// Class IN
resp.push(0x00);
resp.push(0x01);
// TTL = 30 seconds
resp.push(0x00);
resp.push(0x00);
resp.push(0x00);
resp.push(0x1E);
// RDLENGTH = 4
resp.push(0x00);
resp.push(0x04);
// RDATA: IPv4 address
let octets = ip.octets();
resp.extend_from_slice(&octets);
resp
}
/// Build an NXDOMAIN response for unresolvable queries
fn build_nxdomain(query: &[u8]) -> Option<Vec<u8>> {
if query.len() < 12 {
return None;
}
let mut resp = Vec::with_capacity(query.len());
// Copy transaction ID
resp.extend_from_slice(&query[..2]);
// Flags: QR=1, AA=1, RCODE=3 (NXDOMAIN)
resp.push(0x84);
resp.push(0x03);
// QDCOUNT = 1, ANCOUNT = 0, NSCOUNT = 0, ARCOUNT = 0
resp.push(0x00);
resp.push(0x01);
resp.push(0x00);
resp.push(0x00);
resp.push(0x00);
resp.push(0x00);
resp.push(0x00);
resp.push(0x00);
// Copy question section from original query
resp.extend_from_slice(&query[12..]);
Some(resp)
}
#[cfg(test)]
mod tests {
use super::*;
/// Build a DNS query for a given name (A record, IN class)
fn build_dns_query(name: &str) -> Vec<u8> {
let mut buf = Vec::new();
// Header
buf.extend_from_slice(&[0xAB, 0xCD]); // Transaction ID
buf.extend_from_slice(&[0x01, 0x00]); // Flags: RD=1
buf.extend_from_slice(&[0x00, 0x01]); // QDCOUNT=1
buf.extend_from_slice(&[0x00, 0x00]); // ANCOUNT=0
buf.extend_from_slice(&[0x00, 0x00]); // NSCOUNT=0
buf.extend_from_slice(&[0x00, 0x00]); // ARCOUNT=0
// Question: encode name
for label in name.split('.') {
buf.push(label.len() as u8);
buf.extend_from_slice(label.as_bytes());
}
buf.push(0x00); // End of name
// Type A (1)
buf.extend_from_slice(&[0x00, 0x01]);
// Class IN (1)
buf.extend_from_slice(&[0x00, 0x01]);
buf
}
#[test]
fn test_parse_dns_name() {
let query = build_dns_query("my-svc.default.svc.cluster.local");
let (name, end) = parse_dns_name(&query, 12).unwrap();
assert_eq!(name, "my-svc.default.svc.cluster.local");
// After name: 1 + 6 + 1 + 7 + 1 + 3 + 1 + 7 + 1 + 5 + 1 = label data + null
assert!(end > 12);
}
#[test]
fn test_build_a_response() {
let query = build_dns_query("my-svc.default.svc.cluster.local");
let (_, qname_end) = parse_dns_name(&query, 12).unwrap();
let question_end = qname_end + 4;
let ip = Ipv4Addr::new(10, 96, 0, 1);
let resp = build_a_response(&query, question_end, ip);
// Check transaction ID preserved
assert_eq!(resp[0], 0xAB);
assert_eq!(resp[1], 0xCD);
// Check QR=1 in flags
assert!(resp[2] & 0x80 != 0);
// Check ANCOUNT=1
assert_eq!(resp[6], 0x00);
assert_eq!(resp[7], 0x01);
// Check IP is in last 4 bytes
let len = resp.len();
assert_eq!(resp[len - 4], 10);
assert_eq!(resp[len - 3], 96);
assert_eq!(resp[len - 2], 0);
assert_eq!(resp[len - 1], 1);
}
#[test]
fn test_build_nxdomain() {
let query = build_dns_query("nonexistent.default.svc.cluster.local");
let resp = build_nxdomain(&query).unwrap();
// Check transaction ID preserved
assert_eq!(resp[0], 0xAB);
assert_eq!(resp[1], 0xCD);
// Check RCODE=3 (NXDOMAIN)
assert_eq!(resp[3] & 0x0F, 3);
}
#[test]
fn test_handle_query_resolves() {
let storage = Arc::new(
reddwarf_storage::RedbBackend::new(
&tempfile::tempdir().unwrap().path().join("dns-test.redb"),
)
.unwrap(),
);
let dns = DnsServer::new(storage, DnsServerConfig::default());
// Manually insert a record
{
let mut records = dns.records.write().unwrap();
records.insert("my-svc.default".to_string(), Ipv4Addr::new(10, 96, 0, 1));
}
let query = build_dns_query("my-svc.default.svc.cluster.local");
let resp = dns.handle_query(&query);
assert!(resp.is_some());
let resp = resp.unwrap();
let len = resp.len();
// Check the IP in the response
assert_eq!(resp[len - 4], 10);
assert_eq!(resp[len - 3], 96);
assert_eq!(resp[len - 2], 0);
assert_eq!(resp[len - 1], 1);
}
#[test]
fn test_handle_query_nxdomain() {
let storage = Arc::new(
reddwarf_storage::RedbBackend::new(
&tempfile::tempdir().unwrap().path().join("dns-test2.redb"),
)
.unwrap(),
);
let dns = DnsServer::new(storage, DnsServerConfig::default());
let query = build_dns_query("nonexistent.default.svc.cluster.local");
let resp = dns.handle_query(&query);
assert!(resp.is_none()); // No record found
}
#[test]
fn test_handle_query_wrong_domain() {
let storage = Arc::new(
reddwarf_storage::RedbBackend::new(
&tempfile::tempdir().unwrap().path().join("dns-test3.redb"),
)
.unwrap(),
);
let dns = DnsServer::new(storage, DnsServerConfig::default());
let query = build_dns_query("google.com");
let resp = dns.handle_query(&query);
assert!(resp.is_none());
}
#[test]
fn test_parse_short_query() {
// Too short for DNS header
assert!(parse_dns_name(&[0; 5], 12).is_none());
}
}

View file

@ -156,7 +156,9 @@ pub enum RuntimeError {
}, },
/// Health probe failed /// Health probe failed
#[error("Health probe failed for container '{container_name}' in zone '{zone_name}': {message}")] #[error(
"Health probe failed for container '{container_name}' in zone '{zone_name}': {message}"
)]
#[diagnostic( #[diagnostic(
code(reddwarf::runtime::probe_failed), code(reddwarf::runtime::probe_failed),
help("Check that the probe target is reachable inside the zone. Failure count: {failure_count}/{failure_threshold}. Verify the application is running and the probe command/port/path is correct") help("Check that the probe target is reachable inside the zone. Failure count: {failure_count}/{failure_threshold}. Verify the application is running and the probe command/port/path is correct")

View file

@ -1,3 +1,4 @@
use crate::brand::bhyve::bhyve_install_args;
use crate::brand::lx::lx_install_args; use crate::brand::lx::lx_install_args;
use crate::command::{exec, CommandOutput}; use crate::command::{exec, CommandOutput};
use crate::error::Result; use crate::error::Result;
@ -159,6 +160,74 @@ impl ZoneRuntime for IllumosRuntime {
Ok(()) Ok(())
} }
async fn configure_zone_ip(&self, zone_name: &str, network: &NetworkMode) -> Result<()> {
let (vnic_name, ip_address, prefix_len, gateway) = match network {
NetworkMode::Etherstub(cfg) => (
&cfg.vnic_name,
&cfg.ip_address,
cfg.prefix_len,
&cfg.gateway,
),
NetworkMode::Direct(cfg) => (
&cfg.vnic_name,
&cfg.ip_address,
cfg.prefix_len,
&cfg.gateway,
),
};
info!(
"Configuring IP {} on {} in zone {}",
ip_address, vnic_name, zone_name
);
// Create the IP interface
self.exec_in_zone(
zone_name,
&[
"ipadm".to_string(),
"create-if".to_string(),
"-t".to_string(),
vnic_name.clone(),
],
)
.await
.map_err(|e| RuntimeError::network_error(format!("ipadm create-if failed: {}", e)))?;
// Assign a static IP address
self.exec_in_zone(
zone_name,
&[
"ipadm".to_string(),
"create-addr".to_string(),
"-T".to_string(),
"static".to_string(),
"-a".to_string(),
format!("{}/{}", ip_address, prefix_len),
format!("{}/v4", vnic_name),
],
)
.await
.map_err(|e| RuntimeError::network_error(format!("ipadm create-addr failed: {}", e)))?;
// Add default route
self.exec_in_zone(
zone_name,
&[
"route".to_string(),
"-p".to_string(),
"add".to_string(),
"default".to_string(),
gateway.clone(),
],
)
.await
.map_err(|e| RuntimeError::network_error(format!("route add default failed: {}", e)))?;
info!("IP configuration complete for zone: {}", zone_name);
Ok(())
}
async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()> { async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()> {
info!("Tearing down network for zone: {}", zone_name); info!("Tearing down network for zone: {}", zone_name);
@ -183,19 +252,35 @@ impl ZoneRuntime for IllumosRuntime {
.await?; .await?;
self.create_zone(config).await?; self.create_zone(config).await?;
// LX brand needs image path for install // Brand-specific install
if config.brand == ZoneBrand::Lx { match config.brand {
ZoneBrand::Lx => {
let args = lx_install_args(config)?; let args = lx_install_args(config)?;
let mut cmd_args = vec!["-z", &config.zone_name, "install"]; let mut cmd_args = vec!["-z", &config.zone_name, "install"];
let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
cmd_args.extend(str_args); cmd_args.extend(str_args);
exec("zoneadm", &cmd_args).await?; exec("zoneadm", &cmd_args).await?;
} else { }
ZoneBrand::Bhyve => {
let args = bhyve_install_args(config)?;
let mut cmd_args = vec!["-z", &config.zone_name, "install"];
let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
cmd_args.extend(str_args);
exec("zoneadm", &cmd_args).await?;
}
ZoneBrand::Reddwarf => {
self.install_zone(&config.zone_name).await?; self.install_zone(&config.zone_name).await?;
} }
}
self.boot_zone(&config.zone_name).await?; self.boot_zone(&config.zone_name).await?;
// Brief pause to let the zone finish booting before configuring IP
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
self.configure_zone_ip(&config.zone_name, &config.network)
.await?;
info!("Zone provisioned: {}", config.zone_name); info!("Zone provisioned: {}", config.zone_name);
Ok(()) Ok(())
} }

View file

@ -5,14 +5,16 @@ pub mod api_client;
pub mod brand; pub mod brand;
pub mod command; pub mod command;
pub mod controller; pub mod controller;
pub mod dns;
pub mod error; pub mod error;
#[cfg(target_os = "illumos")] #[cfg(target_os = "illumos")]
pub mod illumos; pub mod illumos;
pub mod mock; pub mod mock;
pub mod network; pub mod network;
pub mod node_agent; pub mod node_agent;
pub mod probes;
pub mod node_health; pub mod node_health;
pub mod probes;
pub mod service_controller;
pub mod storage; pub mod storage;
pub mod sysinfo; pub mod sysinfo;
pub mod traits; pub mod traits;
@ -20,9 +22,11 @@ pub mod types;
pub mod zone; pub mod zone;
// Re-export primary types // Re-export primary types
pub use dns::{DnsServer, DnsServerConfig};
pub use error::{Result, RuntimeError}; pub use error::{Result, RuntimeError};
pub use mock::MockRuntime; pub use mock::MockRuntime;
pub use network::{CidrConfig, IpAllocation, Ipam}; pub use network::{CidrConfig, IpAllocation, Ipam, NatManager};
pub use service_controller::{ServiceController, ServiceControllerConfig};
pub use traits::ZoneRuntime; pub use traits::ZoneRuntime;
pub use types::{ pub use types::{
ContainerProcess, DirectNicConfig, EtherstubConfig, FsMount, NetworkMode, StoragePoolConfig, ContainerProcess, DirectNicConfig, EtherstubConfig, FsMount, NetworkMode, StoragePoolConfig,

View file

@ -275,6 +275,11 @@ impl ZoneRuntime for MockRuntime {
Ok(()) Ok(())
} }
async fn configure_zone_ip(&self, zone_name: &str, _network: &NetworkMode) -> Result<()> {
debug!("Mock: IP configured for zone: {}", zone_name);
Ok(())
}
async fn teardown_network(&self, zone_name: &str, _network: &NetworkMode) -> Result<()> { async fn teardown_network(&self, zone_name: &str, _network: &NetworkMode) -> Result<()> {
debug!("Mock: network teardown for zone: {}", zone_name); debug!("Mock: network teardown for zone: {}", zone_name);
Ok(()) Ok(())
@ -289,6 +294,8 @@ impl ZoneRuntime for MockRuntime {
self.create_zone(config).await?; self.create_zone(config).await?;
self.install_zone(&config.zone_name).await?; self.install_zone(&config.zone_name).await?;
self.boot_zone(&config.zone_name).await?; self.boot_zone(&config.zone_name).await?;
self.configure_zone_ip(&config.zone_name, &config.network)
.await?;
Ok(()) Ok(())
} }
@ -356,6 +363,7 @@ mod tests {
}), }),
storage: ZoneStorageOpts::default(), storage: ZoneStorageOpts::default(),
lx_image_path: None, lx_image_path: None,
bhyve_disk_image: None,
processes: vec![], processes: vec![],
cpu_cap: None, cpu_cap: None,
memory_cap: None, memory_cap: None,
@ -384,9 +392,7 @@ mod tests {
rt.install_zone("stopped-zone").await.unwrap(); rt.install_zone("stopped-zone").await.unwrap();
// Zone is Installed, not Running // Zone is Installed, not Running
let result = rt let result = rt.exec_in_zone("stopped-zone", &["echo".to_string()]).await;
.exec_in_zone("stopped-zone", &["echo".to_string()])
.await;
assert!(result.is_err()); assert!(result.is_err());
} }
@ -394,9 +400,7 @@ mod tests {
async fn test_exec_in_zone_not_found_errors() { async fn test_exec_in_zone_not_found_errors() {
let rt = MockRuntime::new(make_test_storage()); let rt = MockRuntime::new(make_test_storage());
let result = rt let result = rt.exec_in_zone("nonexistent", &["echo".to_string()]).await;
.exec_in_zone("nonexistent", &["echo".to_string()])
.await;
assert!(matches!( assert!(matches!(
result.unwrap_err(), result.unwrap_err(),
RuntimeError::ZoneNotFound { .. } RuntimeError::ZoneNotFound { .. }

View file

@ -30,47 +30,65 @@ pub struct IpAllocation {
/// IPAM (IP Address Management) backed by a KVStore /// IPAM (IP Address Management) backed by a KVStore
/// ///
/// Storage keys: /// Storage keys (with default "ipam" prefix):
/// - `ipam/_cidr` → the CIDR string (e.g. "10.88.0.0/16") /// - `{prefix}/_cidr` → the CIDR string (e.g. "10.88.0.0/16")
/// - `ipam/alloc/{ip}` → `"{namespace}/{pod_name}"` /// - `{prefix}/alloc/{ip}` → `"{namespace}/{name}"`
pub struct Ipam { pub struct Ipam {
storage: Arc<dyn KVStore>, storage: Arc<dyn KVStore>,
cidr: CidrConfig, cidr: CidrConfig,
/// Storage key for CIDR config
#[allow(dead_code)]
cidr_key: Vec<u8>,
/// Storage key prefix for allocations
alloc_prefix: Vec<u8>,
} }
const IPAM_CIDR_KEY: &[u8] = b"ipam/_cidr";
const IPAM_ALLOC_PREFIX: &[u8] = b"ipam/alloc/";
impl Ipam { impl Ipam {
/// Create a new IPAM instance, persisting the CIDR config /// Create a new IPAM instance with default "ipam" prefix, persisting the CIDR config
pub fn new(storage: Arc<dyn KVStore>, cidr_str: &str) -> Result<Self> { pub fn new(storage: Arc<dyn KVStore>, cidr_str: &str) -> Result<Self> {
let cidr = parse_cidr(cidr_str)?; Self::with_prefix(storage, cidr_str, "ipam")
// Persist the CIDR configuration
storage.put(IPAM_CIDR_KEY, cidr_str.as_bytes())?;
debug!(
"IPAM initialized: network={}, gateway={}, first_host={}, broadcast={}, prefix_len={}",
cidr.network, cidr.gateway, cidr.first_host, cidr.broadcast, cidr.prefix_len
);
Ok(Self { storage, cidr })
} }
/// Allocate an IP for a pod. Idempotent: returns existing allocation if one exists. /// Create a new IPAM instance with a custom storage key prefix
pub fn allocate(&self, namespace: &str, pod_name: &str) -> Result<IpAllocation> { pub fn with_prefix(storage: Arc<dyn KVStore>, cidr_str: &str, prefix: &str) -> Result<Self> {
let pod_key = format!("{}/{}", namespace, pod_name); let cidr = parse_cidr(cidr_str)?;
// Check if this pod already has an allocation let cidr_key = format!("{}/_cidr", prefix).into_bytes();
let allocations = self.storage.scan(IPAM_ALLOC_PREFIX)?; let alloc_prefix = format!("{}/alloc/", prefix).into_bytes();
// Persist the CIDR configuration
storage.put(&cidr_key, cidr_str.as_bytes())?;
debug!(
"IPAM ({}) initialized: network={}, gateway={}, first_host={}, broadcast={}, prefix_len={}",
prefix, cidr.network, cidr.gateway, cidr.first_host, cidr.broadcast, cidr.prefix_len
);
Ok(Self {
storage,
cidr,
cidr_key,
alloc_prefix,
})
}
/// Allocate an IP for a resource. Idempotent: returns existing allocation if one exists.
pub fn allocate(&self, namespace: &str, name: &str) -> Result<IpAllocation> {
let resource_key = format!("{}/{}", namespace, name);
let prefix_len = self.alloc_prefix.len();
// Check if this resource already has an allocation
let allocations = self.storage.scan(&self.alloc_prefix)?;
for (key, value) in &allocations { for (key, value) in &allocations {
let existing_pod = String::from_utf8_lossy(value); let existing = String::from_utf8_lossy(value);
if existing_pod == pod_key { if existing == resource_key {
// Parse the IP from the key: "ipam/alloc/{ip}"
let key_str = String::from_utf8_lossy(key); let key_str = String::from_utf8_lossy(key);
let ip_str = &key_str[IPAM_ALLOC_PREFIX.len()..]; let ip_str = &key_str[prefix_len..];
if let Ok(ip) = ip_str.parse::<Ipv4Addr>() { if let Ok(ip) = ip_str.parse::<Ipv4Addr>() {
debug!("IPAM: returning existing allocation {} for {}", ip, pod_key); debug!(
"IPAM: returning existing allocation {} for {}",
ip, resource_key
);
return Ok(IpAllocation { return Ok(IpAllocation {
ip_address: ip, ip_address: ip,
gateway: self.cidr.gateway, gateway: self.cidr.gateway,
@ -85,7 +103,7 @@ impl Ipam {
.iter() .iter()
.filter_map(|(key, _)| { .filter_map(|(key, _)| {
let key_str = String::from_utf8_lossy(key); let key_str = String::from_utf8_lossy(key);
let ip_str = &key_str[IPAM_ALLOC_PREFIX.len()..]; let ip_str = &key_str[prefix_len..];
ip_str.parse::<Ipv4Addr>().ok() ip_str.parse::<Ipv4Addr>().ok()
}) })
.collect(); .collect();
@ -101,10 +119,12 @@ impl Ipam {
if !allocated.contains(&candidate) { if !allocated.contains(&candidate) {
// Allocate this IP // Allocate this IP
let alloc_key = format!("ipam/alloc/{}", candidate); let alloc_key_str = String::from_utf8_lossy(&self.alloc_prefix);
self.storage.put(alloc_key.as_bytes(), pod_key.as_bytes())?; let alloc_key = format!("{}{}", alloc_key_str, candidate);
self.storage
.put(alloc_key.as_bytes(), resource_key.as_bytes())?;
debug!("IPAM: allocated {} for {}", candidate, pod_key); debug!("IPAM: allocated {} for {}", candidate, resource_key);
return Ok(IpAllocation { return Ok(IpAllocation {
ip_address: candidate, ip_address: candidate,
gateway: self.cidr.gateway, gateway: self.cidr.gateway,
@ -116,36 +136,38 @@ impl Ipam {
} }
} }
/// Release the IP allocated to a pod /// Release the IP allocated to a resource
pub fn release(&self, namespace: &str, pod_name: &str) -> Result<Option<Ipv4Addr>> { pub fn release(&self, namespace: &str, name: &str) -> Result<Option<Ipv4Addr>> {
let pod_key = format!("{}/{}", namespace, pod_name); let resource_key = format!("{}/{}", namespace, name);
let prefix_len = self.alloc_prefix.len();
let allocations = self.storage.scan(IPAM_ALLOC_PREFIX)?; let allocations = self.storage.scan(&self.alloc_prefix)?;
for (key, value) in &allocations { for (key, value) in &allocations {
let existing_pod = String::from_utf8_lossy(value); let existing = String::from_utf8_lossy(value);
if existing_pod == pod_key { if existing == resource_key {
let key_str = String::from_utf8_lossy(key); let key_str = String::from_utf8_lossy(key);
let ip_str = &key_str[IPAM_ALLOC_PREFIX.len()..]; let ip_str = &key_str[prefix_len..];
let ip = ip_str.parse::<Ipv4Addr>().ok(); let ip = ip_str.parse::<Ipv4Addr>().ok();
self.storage.delete(key)?; self.storage.delete(key)?;
debug!("IPAM: released {:?} for {}", ip, pod_key); debug!("IPAM: released {:?} for {}", ip, resource_key);
return Ok(ip); return Ok(ip);
} }
} }
debug!("IPAM: no allocation found for {}", pod_key); debug!("IPAM: no allocation found for {}", resource_key);
Ok(None) Ok(None)
} }
/// Get all current allocations /// Get all current allocations
pub fn get_all_allocations(&self) -> Result<BTreeMap<Ipv4Addr, String>> { pub fn get_all_allocations(&self) -> Result<BTreeMap<Ipv4Addr, String>> {
let allocations = self.storage.scan(IPAM_ALLOC_PREFIX)?; let prefix_len = self.alloc_prefix.len();
let allocations = self.storage.scan(&self.alloc_prefix)?;
let mut result = BTreeMap::new(); let mut result = BTreeMap::new();
for (key, value) in &allocations { for (key, value) in &allocations {
let key_str = String::from_utf8_lossy(key); let key_str = String::from_utf8_lossy(key);
let ip_str = &key_str[IPAM_ALLOC_PREFIX.len()..]; let ip_str = &key_str[prefix_len..];
if let Ok(ip) = ip_str.parse::<Ipv4Addr>() { if let Ok(ip) = ip_str.parse::<Ipv4Addr>() {
result.insert(ip, String::from_utf8_lossy(value).into_owned()); result.insert(ip, String::from_utf8_lossy(value).into_owned());
} }

View file

@ -1,8 +1,10 @@
pub mod ipam; pub mod ipam;
pub mod nat;
pub mod types; pub mod types;
pub use crate::types::{DirectNicConfig, EtherstubConfig, NetworkMode}; pub use crate::types::{DirectNicConfig, EtherstubConfig, NetworkMode};
pub use ipam::{CidrConfig, IpAllocation, Ipam}; pub use ipam::{CidrConfig, IpAllocation, Ipam};
pub use nat::NatManager;
/// Generate a VNIC name from pod namespace and name /// Generate a VNIC name from pod namespace and name
pub fn vnic_name_for_pod(namespace: &str, pod_name: &str) -> String { pub fn vnic_name_for_pod(namespace: &str, pod_name: &str) -> String {

View file

@ -0,0 +1,218 @@
use crate::service_controller::ServiceEndpoints;
use tracing::debug;
/// Manages NAT (Network Address Translation) rules for service traffic routing.
///
/// On illumos, this generates ipnat rdr (redirect) rules so that traffic destined
/// for a service ClusterIP:port is forwarded to one of the backing pod IPs.
/// On non-illumos platforms, this is a no-op that only logs the desired state.
pub struct NatManager {
/// Name of the network interface to apply rules on (e.g. "reddwarf0")
interface: String,
}
impl NatManager {
pub fn new(interface: &str) -> Self {
Self {
interface: interface.to_string(),
}
}
/// Synchronize all NAT rules to match the desired service endpoints.
///
/// This is a full reconcile: it replaces all existing rules with the new set.
pub fn sync_rules(&self, endpoints: &[ServiceEndpoints]) {
let rules = self.generate_rules(endpoints);
if rules.is_empty() {
debug!("NAT sync: no rules to apply (no services with endpoints)");
return;
}
debug!(
"NAT sync: {} rules for {} services",
rules.len(),
endpoints.len()
);
#[cfg(target_os = "illumos")]
{
if let Err(e) = self.apply_rules_illumos(&rules) {
tracing::error!("Failed to apply NAT rules: {}", e);
}
}
#[cfg(not(target_os = "illumos"))]
{
for rule in &rules {
debug!("NAT rule (mock): {}", rule);
}
}
}
/// Generate ipnat rdr rules for all service endpoints.
///
/// For each service port, generates one rdr rule per backing pod IP.
/// Multiple rdr rules for the same destination provide round-robin load balancing
/// in ipnat.
fn generate_rules(&self, endpoints: &[ServiceEndpoints]) -> Vec<String> {
let mut rules = Vec::new();
for ep in endpoints {
if ep.pod_ips.is_empty() {
continue;
}
for &(port, target_port) in &ep.ports {
for pod_ip in &ep.pod_ips {
// ipnat rdr rule format:
// rdr <interface> <from_ip>/32 port <from_port> -> <to_ip> port <to_port> tcp
let rule = format!(
"rdr {} {}/32 port {} -> {} port {} tcp",
self.interface, ep.cluster_ip, port, pod_ip, target_port
);
rules.push(rule);
}
}
}
rules
}
/// Apply rules on illumos by writing to ipnat config and reloading.
#[cfg(target_os = "illumos")]
fn apply_rules_illumos(&self, rules: &[String]) -> std::io::Result<()> {
use std::fmt::Write;
let conf_path = "/etc/ipf/reddwarf-ipnat.conf";
// Write rules to config file
let mut content = String::new();
let _ = writeln!(content, "# Auto-generated by reddwarf service controller");
let _ = writeln!(content, "# Do not edit manually");
for rule in rules {
let _ = writeln!(content, "{}", rule);
}
std::fs::write(conf_path, &content)?;
info!("Wrote {} NAT rules to {}", rules.len(), conf_path);
// Reload ipnat rules
let output = std::process::Command::new("ipnat")
.args(["-CF", "-f", conf_path])
.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::error!("ipnat reload failed: {}", stderr);
} else {
info!("NAT rules reloaded successfully");
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_endpoints(
name: &str,
cluster_ip: &str,
ports: Vec<(i32, i32)>,
pod_ips: Vec<&str>,
) -> ServiceEndpoints {
ServiceEndpoints {
name: name.to_string(),
namespace: "default".to_string(),
cluster_ip: cluster_ip.to_string(),
ports,
pod_ips: pod_ips.into_iter().map(String::from).collect(),
}
}
#[test]
fn test_generate_rules_single_pod() {
let mgr = NatManager::new("reddwarf0");
let eps = vec![make_endpoints(
"my-svc",
"10.96.0.1",
vec![(80, 8080)],
vec!["10.88.0.2"],
)];
let rules = mgr.generate_rules(&eps);
assert_eq!(rules.len(), 1);
assert_eq!(
rules[0],
"rdr reddwarf0 10.96.0.1/32 port 80 -> 10.88.0.2 port 8080 tcp"
);
}
#[test]
fn test_generate_rules_multiple_pods() {
let mgr = NatManager::new("reddwarf0");
let eps = vec![make_endpoints(
"my-svc",
"10.96.0.1",
vec![(80, 8080)],
vec!["10.88.0.2", "10.88.0.3"],
)];
let rules = mgr.generate_rules(&eps);
assert_eq!(rules.len(), 2);
assert!(rules[0].contains("10.88.0.2"));
assert!(rules[1].contains("10.88.0.3"));
}
#[test]
fn test_generate_rules_multiple_ports() {
let mgr = NatManager::new("reddwarf0");
let eps = vec![make_endpoints(
"my-svc",
"10.96.0.1",
vec![(80, 8080), (443, 8443)],
vec!["10.88.0.2"],
)];
let rules = mgr.generate_rules(&eps);
assert_eq!(rules.len(), 2);
assert!(rules[0].contains("port 80"));
assert!(rules[1].contains("port 443"));
}
#[test]
fn test_generate_rules_no_pods() {
let mgr = NatManager::new("reddwarf0");
let eps = vec![make_endpoints(
"my-svc",
"10.96.0.1",
vec![(80, 8080)],
vec![],
)];
let rules = mgr.generate_rules(&eps);
assert!(rules.is_empty());
}
#[test]
fn test_generate_rules_empty_endpoints() {
let mgr = NatManager::new("reddwarf0");
let rules = mgr.generate_rules(&[]);
assert!(rules.is_empty());
}
#[test]
fn test_sync_rules_no_panic() {
let mgr = NatManager::new("reddwarf0");
let eps = vec![make_endpoints(
"my-svc",
"10.96.0.1",
vec![(80, 8080)],
vec!["10.88.0.2"],
)];
// Should not panic
mgr.sync_rules(&eps);
}
}

View file

@ -3,12 +3,15 @@ use crate::error::{Result, RuntimeError};
use crate::sysinfo::{ use crate::sysinfo::{
compute_node_resources, format_memory_quantity, NodeResources, ResourceReservation, compute_node_resources, format_memory_quantity, NodeResources, ResourceReservation,
}; };
use crate::traits::ZoneRuntime;
use crate::types::ZoneInfo;
use k8s_openapi::api::core::v1::{Node, NodeAddress, NodeCondition, NodeStatus}; use k8s_openapi::api::core::v1::{Node, NodeAddress, NodeCondition, NodeStatus};
use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{info, warn}; use tracing::{info, warn};
@ -51,10 +54,23 @@ pub struct NodeAgent {
config: NodeAgentConfig, config: NodeAgentConfig,
/// Detected system resources (None if detection failed at startup). /// Detected system resources (None if detection failed at startup).
detected: Option<NodeResources>, detected: Option<NodeResources>,
/// Zone runtime for querying zone states during heartbeat
runtime: Option<Arc<dyn ZoneRuntime>>,
/// Cached zone info from last heartbeat
cached_zones: Mutex<Vec<ZoneInfo>>,
} }
impl NodeAgent { impl NodeAgent {
pub fn new(api_client: Arc<ApiClient>, config: NodeAgentConfig) -> Self { pub fn new(api_client: Arc<ApiClient>, config: NodeAgentConfig) -> Self {
Self::with_runtime(api_client, config, None)
}
/// Create a node agent with a zone runtime for zone state reporting
pub fn with_runtime(
api_client: Arc<ApiClient>,
config: NodeAgentConfig,
runtime: Option<Arc<dyn ZoneRuntime>>,
) -> Self {
let reservation = ResourceReservation { let reservation = ResourceReservation {
cpu_millicores: config.system_reserved_cpu_millicores, cpu_millicores: config.system_reserved_cpu_millicores,
memory_bytes: config.system_reserved_memory_bytes, memory_bytes: config.system_reserved_memory_bytes,
@ -85,6 +101,8 @@ impl NodeAgent {
api_client, api_client,
config, config,
detected, detected,
runtime,
cached_zones: Mutex::new(Vec::new()),
} }
} }
@ -98,6 +116,24 @@ impl NodeAgent {
api_client, api_client,
config, config,
detected, detected,
runtime: None,
cached_zones: Mutex::new(Vec::new()),
}
}
#[cfg(test)]
fn new_with_runtime_and_detected(
api_client: Arc<ApiClient>,
config: NodeAgentConfig,
detected: Option<NodeResources>,
runtime: Option<Arc<dyn ZoneRuntime>>,
) -> Self {
Self {
api_client,
config,
detected,
runtime,
cached_zones: Mutex::new(Vec::new()),
} }
} }
@ -105,7 +141,9 @@ impl NodeAgent {
pub async fn register(&self) -> Result<()> { pub async fn register(&self) -> Result<()> {
info!("Registering node '{}'", self.config.node_name); info!("Registering node '{}'", self.config.node_name);
let node = self.build_node(); let zones = self.cached_zones.lock().await;
let node = self.build_node(&zones);
drop(zones);
match self.api_client.create_node(&node).await { match self.api_client.create_node(&node).await {
Ok(_) => { Ok(_) => {
@ -154,7 +192,23 @@ impl NodeAgent {
/// Send a heartbeat by updating node status /// Send a heartbeat by updating node status
async fn heartbeat(&self) -> Result<()> { async fn heartbeat(&self) -> Result<()> {
let node = self.build_node(); // Query zone states if runtime is available
if let Some(ref runtime) = self.runtime {
match runtime.list_zones().await {
Ok(zones) => {
let mut cached = self.cached_zones.lock().await;
*cached = zones;
}
Err(e) => {
warn!("Failed to list zones for heartbeat: {}", e);
// Keep previous cached state
}
}
}
let zones = self.cached_zones.lock().await;
let node = self.build_node(&zones);
drop(zones);
self.api_client self.api_client
.update_node_status(&self.config.node_name, &node) .update_node_status(&self.config.node_name, &node)
@ -165,7 +219,7 @@ impl NodeAgent {
} }
/// Build the Node resource with current status /// Build the Node resource with current status
fn build_node(&self) -> Node { fn build_node(&self, zones: &[ZoneInfo]) -> Node {
let hostname = self.config.node_name.clone(); let hostname = self.config.node_name.clone();
let (capacity, allocatable) = if let Some(ref nr) = self.detected { let (capacity, allocatable) = if let Some(ref nr) = self.detected {
@ -210,6 +264,28 @@ impl NodeAgent {
(capacity, allocatable) (capacity, allocatable)
}; };
// Build zone summary annotations
let annotations = if !zones.is_empty() {
let mut state_counts: BTreeMap<String, u32> = BTreeMap::new();
for zone in zones {
*state_counts.entry(zone.state.to_string()).or_insert(0) += 1;
}
let summary = serde_json::to_string(&state_counts).unwrap_or_default();
Some(
[
(
"reddwarf.io/zone-count".to_string(),
zones.len().to_string(),
),
("reddwarf.io/zone-summary".to_string(), summary),
]
.into_iter()
.collect(),
)
} else {
None
};
Node { Node {
metadata: ObjectMeta { metadata: ObjectMeta {
name: Some(self.config.node_name.clone()), name: Some(self.config.node_name.clone()),
@ -228,6 +304,7 @@ impl NodeAgent {
.into_iter() .into_iter()
.collect(), .collect(),
), ),
annotations,
..Default::default() ..Default::default()
}, },
status: Some(NodeStatus { status: Some(NodeStatus {
@ -279,7 +356,7 @@ mod tests {
NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string());
let agent = NodeAgent::new(api_client, config); let agent = NodeAgent::new(api_client, config);
let node = agent.build_node(); let node = agent.build_node(&[]);
assert_eq!(node.metadata.name, Some("test-node".to_string())); assert_eq!(node.metadata.name, Some("test-node".to_string()));
let status = node.status.unwrap(); let status = node.status.unwrap();
@ -297,7 +374,7 @@ mod tests {
NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string());
let agent = NodeAgent::new(api_client, config); let agent = NodeAgent::new(api_client, config);
let node = agent.build_node(); let node = agent.build_node(&[]);
let status = node.status.unwrap(); let status = node.status.unwrap();
// Check allocatable has all keys // Check allocatable has all keys
@ -325,9 +402,12 @@ mod tests {
let agent = NodeAgent::new(api_client, config); let agent = NodeAgent::new(api_client, config);
// Agent should have detected resources (we're on a real host) // Agent should have detected resources (we're on a real host)
assert!(agent.detected.is_some(), "detection should succeed in tests"); assert!(
agent.detected.is_some(),
"detection should succeed in tests"
);
let node = agent.build_node(); let node = agent.build_node(&[]);
let status = node.status.unwrap(); let status = node.status.unwrap();
let cap = status.capacity.unwrap(); let cap = status.capacity.unwrap();
let alloc = status.allocatable.unwrap(); let alloc = status.allocatable.unwrap();
@ -335,8 +415,7 @@ mod tests {
// Allocatable CPU (millicores) should be less than capacity CPU (whole cores) // Allocatable CPU (millicores) should be less than capacity CPU (whole cores)
let cap_cpu_m = reddwarf_core::resources::ResourceQuantities::parse_cpu(&cap["cpu"].0) let cap_cpu_m = reddwarf_core::resources::ResourceQuantities::parse_cpu(&cap["cpu"].0)
.expect("valid cpu"); .expect("valid cpu");
let alloc_cpu_m = let alloc_cpu_m = reddwarf_core::resources::ResourceQuantities::parse_cpu(&alloc["cpu"].0)
reddwarf_core::resources::ResourceQuantities::parse_cpu(&alloc["cpu"].0)
.expect("valid cpu"); .expect("valid cpu");
assert!( assert!(
alloc_cpu_m < cap_cpu_m, alloc_cpu_m < cap_cpu_m,
@ -346,8 +425,7 @@ mod tests {
); );
// Allocatable memory should be less than capacity memory // Allocatable memory should be less than capacity memory
let cap_mem = let cap_mem = reddwarf_core::resources::ResourceQuantities::parse_memory(&cap["memory"].0)
reddwarf_core::resources::ResourceQuantities::parse_memory(&cap["memory"].0)
.expect("valid mem"); .expect("valid mem");
let alloc_mem = let alloc_mem =
reddwarf_core::resources::ResourceQuantities::parse_memory(&alloc["memory"].0) reddwarf_core::resources::ResourceQuantities::parse_memory(&alloc["memory"].0)
@ -368,10 +446,13 @@ mod tests {
config.supported_brands = vec!["reddwarf".into(), "lx".into()]; config.supported_brands = vec!["reddwarf".into(), "lx".into()];
let agent = NodeAgent::new(api_client, config); let agent = NodeAgent::new(api_client, config);
let node = agent.build_node(); let node = agent.build_node(&[]);
let labels = node.metadata.labels.unwrap(); let labels = node.metadata.labels.unwrap();
assert_eq!(labels.get("reddwarf.io/zone-brands").unwrap(), "reddwarf,lx"); assert_eq!(
labels.get("reddwarf.io/zone-brands").unwrap(),
"reddwarf,lx"
);
} }
#[test] #[test]
@ -381,7 +462,7 @@ mod tests {
NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string());
let agent = NodeAgent::new(api_client, config); let agent = NodeAgent::new(api_client, config);
let node = agent.build_node(); let node = agent.build_node(&[]);
let labels = node.metadata.labels.unwrap(); let labels = node.metadata.labels.unwrap();
assert_eq!(labels.get("reddwarf.io/zone-brands").unwrap(), "reddwarf"); assert_eq!(labels.get("reddwarf.io/zone-brands").unwrap(), "reddwarf");
@ -395,7 +476,7 @@ mod tests {
// Simulate detection failure // Simulate detection failure
let agent = NodeAgent::new_with_detected(api_client, config, None); let agent = NodeAgent::new_with_detected(api_client, config, None);
let node = agent.build_node(); let node = agent.build_node(&[]);
let status = node.status.unwrap(); let status = node.status.unwrap();
let alloc = status.allocatable.unwrap(); let alloc = status.allocatable.unwrap();
let cap = status.capacity.unwrap(); let cap = status.capacity.unwrap();
@ -413,4 +494,60 @@ mod tests {
assert_eq!(alloc["cpu"].0, expected_cpu); assert_eq!(alloc["cpu"].0, expected_cpu);
assert_eq!(cap["cpu"].0, expected_cpu); assert_eq!(cap["cpu"].0, expected_cpu);
} }
#[test]
fn test_build_node_with_zone_info() {
let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443"));
let config =
NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string());
let agent = NodeAgent::new(api_client, config);
let zones = vec![
ZoneInfo {
zone_name: "pod-a".to_string(),
zone_id: Some(1),
state: crate::types::ZoneState::Running,
zonepath: "/zones/pod-a".to_string(),
brand: "reddwarf".to_string(),
uuid: String::new(),
},
ZoneInfo {
zone_name: "pod-b".to_string(),
zone_id: Some(2),
state: crate::types::ZoneState::Running,
zonepath: "/zones/pod-b".to_string(),
brand: "reddwarf".to_string(),
uuid: String::new(),
},
ZoneInfo {
zone_name: "pod-c".to_string(),
zone_id: None,
state: crate::types::ZoneState::Installed,
zonepath: "/zones/pod-c".to_string(),
brand: "lx".to_string(),
uuid: String::new(),
},
];
let node = agent.build_node(&zones);
let annotations = node.metadata.annotations.unwrap();
assert_eq!(annotations.get("reddwarf.io/zone-count").unwrap(), "3");
let summary: BTreeMap<String, u32> =
serde_json::from_str(annotations.get("reddwarf.io/zone-summary").unwrap()).unwrap();
assert_eq!(summary.get("running"), Some(&2));
assert_eq!(summary.get("installed"), Some(&1));
}
#[test]
fn test_build_node_no_zones_no_annotations() {
let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443"));
let config =
NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string());
let agent = NodeAgent::new(api_client, config);
let node = agent.build_node(&[]);
assert!(node.metadata.annotations.is_none());
}
} }

View file

@ -94,11 +94,7 @@ impl NodeHealthChecker {
/// Check a single node's heartbeat and mark it NotReady if stale /// Check a single node's heartbeat and mark it NotReady if stale
async fn check_node(&self, node_name: &str, node: &Node) -> Result<()> { async fn check_node(&self, node_name: &str, node: &Node) -> Result<()> {
let conditions = match node let conditions = match node.status.as_ref().and_then(|s| s.conditions.as_ref()) {
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
{
Some(c) => c, Some(c) => c,
None => { None => {
debug!("Node {} has no conditions, skipping", node_name); debug!("Node {} has no conditions, skipping", node_name);
@ -218,9 +214,7 @@ mod tests {
reason: Some("KubeletReady".to_string()), reason: Some("KubeletReady".to_string()),
message: Some("node agent is healthy".to_string()), message: Some("node agent is healthy".to_string()),
last_heartbeat_time: Some(Time(heartbeat_time)), last_heartbeat_time: Some(Time(heartbeat_time)),
last_transition_time: Some(Time( last_transition_time: Some(Time(Utc::now() - chrono::Duration::seconds(3600))),
Utc::now() - chrono::Duration::seconds(3600),
)),
}]), }]),
..Default::default() ..Default::default()
}), }),
@ -242,9 +236,7 @@ mod tests {
reason: Some("NodeStatusUnknown".to_string()), reason: Some("NodeStatusUnknown".to_string()),
message: Some("Node heartbeat not received".to_string()), message: Some("Node heartbeat not received".to_string()),
last_heartbeat_time: Some(Time(heartbeat_time)), last_heartbeat_time: Some(Time(heartbeat_time)),
last_transition_time: Some(Time( last_transition_time: Some(Time(Utc::now() - chrono::Duration::seconds(100))),
Utc::now() - chrono::Duration::seconds(100),
)),
}]), }]),
..Default::default() ..Default::default()
}), }),

View file

@ -31,10 +31,9 @@ impl ProbeExecutor {
.await .await
{ {
Ok(outcome) => outcome, Ok(outcome) => outcome,
Err(_) => ProbeOutcome::Failure(format!( Err(_) => {
"probe timed out after {}s", ProbeOutcome::Failure(format!("probe timed out after {}s", timeout.as_secs()))
timeout.as_secs() }
)),
}; };
ProbeResult { ProbeResult {
@ -108,10 +107,7 @@ impl ProbeExecutor {
let mut stream = match TcpStream::connect(&addr).await { let mut stream = match TcpStream::connect(&addr).await {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
return ProbeOutcome::Failure(format!( return ProbeOutcome::Failure(format!("HTTP connection to {} failed: {}", addr, e))
"HTTP connection to {} failed: {}",
addr, e
))
} }
}; };
@ -159,11 +155,15 @@ mod tests {
use crate::mock::MockRuntime; use crate::mock::MockRuntime;
use crate::storage::MockStorageEngine; use crate::storage::MockStorageEngine;
use crate::traits::ZoneRuntime; use crate::traits::ZoneRuntime;
use crate::types::{StoragePoolConfig, ZoneBrand, ZoneConfig, ZoneStorageOpts, NetworkMode, EtherstubConfig}; use crate::types::{
EtherstubConfig, NetworkMode, StoragePoolConfig, ZoneBrand, ZoneConfig, ZoneStorageOpts,
};
use tokio::net::TcpListener; use tokio::net::TcpListener;
fn make_test_runtime() -> Arc<MockRuntime> { fn make_test_runtime() -> Arc<MockRuntime> {
let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool("rpool"))); let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool(
"rpool",
)));
Arc::new(MockRuntime::new(storage)) Arc::new(MockRuntime::new(storage))
} }
@ -181,6 +181,7 @@ mod tests {
}), }),
storage: ZoneStorageOpts::default(), storage: ZoneStorageOpts::default(),
lx_image_path: None, lx_image_path: None,
bhyve_disk_image: None,
processes: vec![], processes: vec![],
cpu_cap: None, cpu_cap: None,
memory_cap: None, memory_cap: None,
@ -333,8 +334,7 @@ mod tests {
if let Ok((mut stream, _)) = listener.accept().await { if let Ok((mut stream, _)) = listener.accept().await {
let mut buf = [0u8; 1024]; let mut buf = [0u8; 1024];
let _ = stream.read(&mut buf).await; let _ = stream.read(&mut buf).await;
let response = let response = "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 5\r\n\r\nError";
"HTTP/1.1 503 Service Unavailable\r\nContent-Length: 5\r\n\r\nError";
let _ = stream.write_all(response.as_bytes()).await; let _ = stream.write_all(response.as_bytes()).await;
} }
}); });

View file

@ -5,5 +5,5 @@ pub mod types;
pub use executor::ProbeExecutor; pub use executor::ProbeExecutor;
pub use tracker::{PodProbeStatus, ProbeTracker}; pub use tracker::{PodProbeStatus, ProbeTracker};
pub use types::{ pub use types::{
ContainerProbeConfig, ProbeAction, ProbeKind, ProbeOutcome, ProbeResult, extract_probes, extract_probes, ContainerProbeConfig, ProbeAction, ProbeKind, ProbeOutcome, ProbeResult,
}; };

View file

@ -268,7 +268,9 @@ mod tests {
use std::sync::Arc; use std::sync::Arc;
fn make_test_runtime() -> Arc<MockRuntime> { fn make_test_runtime() -> Arc<MockRuntime> {
let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool("rpool"))); let storage = Arc::new(MockStorageEngine::new(StoragePoolConfig::from_pool(
"rpool",
)));
Arc::new(MockRuntime::new(storage)) Arc::new(MockRuntime::new(storage))
} }
@ -286,6 +288,7 @@ mod tests {
}), }),
storage: ZoneStorageOpts::default(), storage: ZoneStorageOpts::default(),
lx_image_path: None, lx_image_path: None,
bhyve_disk_image: None,
processes: vec![], processes: vec![],
cpu_cap: None, cpu_cap: None,
memory_cap: None, memory_cap: None,

View file

@ -23,9 +23,19 @@ impl std::fmt::Display for ProbeKind {
/// The action a probe performs /// The action a probe performs
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeAction { pub enum ProbeAction {
Exec { command: Vec<String> }, Exec {
HttpGet { path: String, port: u16, host: String, scheme: String }, command: Vec<String>,
TcpSocket { port: u16, host: String }, },
HttpGet {
path: String,
port: u16,
host: String,
scheme: String,
},
TcpSocket {
port: u16,
host: String,
},
} }
/// Extracted probe configuration for a single container + probe kind /// Extracted probe configuration for a single container + probe kind
@ -132,9 +142,7 @@ pub fn extract_probes(container: &Container) -> Vec<ContainerProbeConfig> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use k8s_openapi::api::core::v1::{ use k8s_openapi::api::core::v1::{ExecAction, HTTPGetAction, Probe, TCPSocketAction};
ExecAction, HTTPGetAction, Probe, TCPSocketAction,
};
#[test] #[test]
fn test_extract_exec_probe() { fn test_extract_exec_probe() {
@ -142,7 +150,11 @@ mod tests {
name: "web".to_string(), name: "web".to_string(),
liveness_probe: Some(Probe { liveness_probe: Some(Probe {
exec: Some(ExecAction { exec: Some(ExecAction {
command: Some(vec!["/bin/sh".to_string(), "-c".to_string(), "exit 0".to_string()]), command: Some(vec![
"/bin/sh".to_string(),
"-c".to_string(),
"exit 0".to_string(),
]),
}), }),
period_seconds: Some(5), period_seconds: Some(5),
failure_threshold: Some(2), failure_threshold: Some(2),
@ -157,7 +169,11 @@ mod tests {
assert_eq!( assert_eq!(
probes[0].action, probes[0].action,
ProbeAction::Exec { ProbeAction::Exec {
command: vec!["/bin/sh".to_string(), "-c".to_string(), "exit 0".to_string()] command: vec![
"/bin/sh".to_string(),
"-c".to_string(),
"exit 0".to_string()
]
} }
); );
assert_eq!(probes[0].period_seconds, 5); assert_eq!(probes[0].period_seconds, 5);

View file

@ -0,0 +1,236 @@
use crate::error::Result;
use crate::network::nat::NatManager;
use k8s_openapi::api::core::v1::{Pod, Service};
use reddwarf_core::ResourceEvent;
use reddwarf_storage::{KVStore, KeyEncoder, RedbBackend};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
/// Configuration for the service controller
#[derive(Debug, Clone)]
pub struct ServiceControllerConfig {
/// Interval between full reconciliation cycles
pub reconcile_interval: Duration,
}
impl Default for ServiceControllerConfig {
fn default() -> Self {
Self {
reconcile_interval: Duration::from_secs(30),
}
}
}
/// Endpoint information for a service
#[derive(Debug, Clone)]
pub struct ServiceEndpoints {
/// Service name
pub name: String,
/// Service namespace
pub namespace: String,
/// ClusterIP assigned to this service
pub cluster_ip: String,
/// Service ports (port -> target_port)
pub ports: Vec<(i32, i32)>,
/// Pod IPs that match the service selector
pub pod_ips: Vec<String>,
}
/// Service controller that watches for Service and Pod events,
/// resolves service selectors to pod endpoints, and configures NAT rules.
pub struct ServiceController {
storage: Arc<RedbBackend>,
event_tx: broadcast::Sender<ResourceEvent>,
nat_manager: Arc<NatManager>,
config: ServiceControllerConfig,
}
impl ServiceController {
pub fn new(
storage: Arc<RedbBackend>,
event_tx: broadcast::Sender<ResourceEvent>,
nat_manager: Arc<NatManager>,
config: ServiceControllerConfig,
) -> Self {
Self {
storage,
event_tx,
nat_manager,
config,
}
}
/// Run the controller — reacts to Service and Pod events from the event bus.
pub async fn run(&self, token: CancellationToken) -> Result<()> {
info!("Starting service controller");
// Initial full reconcile
if let Err(e) = self.reconcile_all().await {
error!("Initial service reconcile failed: {}", e);
}
let mut rx = self.event_tx.subscribe();
let mut reconcile_tick = tokio::time::interval(self.config.reconcile_interval);
reconcile_tick.tick().await; // consume first tick
loop {
tokio::select! {
_ = token.cancelled() => {
info!("Service controller shutting down");
return Ok(());
}
_ = reconcile_tick.tick() => {
debug!("Service controller periodic reconcile");
if let Err(e) = self.reconcile_all().await {
error!("Service periodic reconcile failed: {}", e);
}
}
result = rx.recv() => {
match result {
Ok(event) => {
let kind = event.gvk.kind.as_str();
if kind == "Service" || kind == "Pod" {
debug!("Service controller handling {} event for {}", kind, event.resource_key.name);
if let Err(e) = self.reconcile_all().await {
error!("Service reconcile after event failed: {}", e);
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("Service controller lagged by {} events, full reconcile", n);
if let Err(e) = self.reconcile_all().await {
error!("Service reconcile after lag failed: {}", e);
}
}
Err(broadcast::error::RecvError::Closed) => {
info!("Event bus closed, service controller stopping");
return Ok(());
}
}
}
}
}
}
/// Full reconcile: for each service, resolve endpoints and sync NAT rules.
async fn reconcile_all(&self) -> Result<()> {
let services = self.list_services()?;
let pods = self.list_pods()?;
let mut all_endpoints = Vec::new();
for service in &services {
let namespace = service.metadata.namespace.as_deref().unwrap_or("default");
let name = service.metadata.name.as_deref().unwrap_or("unknown");
let selector = match service.spec.as_ref().and_then(|s| s.selector.as_ref()) {
Some(s) if !s.is_empty() => s,
_ => continue, // No selector means no endpoints to track
};
let cluster_ip = match service.spec.as_ref().and_then(|s| s.cluster_ip.as_ref()) {
Some(ip) if !ip.is_empty() && ip != "None" => ip.clone(),
_ => continue, // No ClusterIP
};
// Collect ports
let ports: Vec<(i32, i32)> = service
.spec
.as_ref()
.and_then(|s| s.ports.as_ref())
.map(|ports| {
ports
.iter()
.map(|p| {
let port = p.port;
let target_port = p
.target_port
.as_ref()
.and_then(|tp| match tp {
k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(i) => Some(*i),
k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::String(_) => None,
})
.unwrap_or(port);
(port, target_port)
})
.collect()
})
.unwrap_or_default();
// Find pods matching the selector
let matching_ips: Vec<String> = pods
.iter()
.filter(|pod| {
let pod_ns = pod.metadata.namespace.as_deref().unwrap_or("default");
if pod_ns != namespace {
return false;
}
let pod_labels = pod.metadata.labels.as_ref();
match pod_labels {
Some(labels) => selector.iter().all(|(k, v)| labels.get(k) == Some(v)),
None => false,
}
})
.filter_map(|pod| pod.status.as_ref().and_then(|s| s.pod_ip.as_ref()).cloned())
.collect();
debug!(
"Service {}/{}: ClusterIP={}, endpoints={:?}, ports={:?}",
namespace, name, cluster_ip, matching_ips, ports
);
all_endpoints.push(ServiceEndpoints {
name: name.to_string(),
namespace: namespace.to_string(),
cluster_ip,
ports,
pod_ips: matching_ips,
});
}
// Sync NAT rules
self.nat_manager.sync_rules(&all_endpoints);
Ok(())
}
/// List all services from storage
fn list_services(&self) -> Result<Vec<Service>> {
let prefix = KeyEncoder::encode_prefix("v1", "Service", None);
let entries = self.storage.scan(prefix.as_bytes())?;
let mut services = Vec::new();
for (_key, value) in entries {
if let Ok(svc) = serde_json::from_slice::<Service>(&value) {
services.push(svc);
}
}
Ok(services)
}
/// List all pods from storage
fn list_pods(&self) -> Result<Vec<Pod>> {
let prefix = KeyEncoder::encode_prefix("v1", "Pod", None);
let entries = self.storage.scan(prefix.as_bytes())?;
let mut pods = Vec::new();
for (_key, value) in entries {
if let Ok(pod) = serde_json::from_slice::<Pod>(&value) {
pods.push(pod);
}
}
Ok(pods)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_service_controller_config_defaults() {
let config = ServiceControllerConfig::default();
assert_eq!(config.reconcile_interval, Duration::from_secs(30));
}
}

View file

@ -62,8 +62,7 @@ pub fn compute_node_resources(
let capacity = detect_system_resources()?; let capacity = detect_system_resources()?;
let capacity_cpu_millicores = capacity.cpu_count as i64 * 1000; let capacity_cpu_millicores = capacity.cpu_count as i64 * 1000;
let allocatable_cpu_millicores = let allocatable_cpu_millicores = (capacity_cpu_millicores - reservation.cpu_millicores).max(0);
(capacity_cpu_millicores - reservation.cpu_millicores).max(0);
let allocatable_memory_bytes = let allocatable_memory_bytes =
(capacity.total_memory_bytes as i64 - reservation.memory_bytes).max(0) as u64; (capacity.total_memory_bytes as i64 - reservation.memory_bytes).max(0) as u64;
@ -104,10 +103,7 @@ mod tests {
fn test_detect_system_resources() { fn test_detect_system_resources() {
let res = detect_system_resources().expect("detection should succeed in test env"); let res = detect_system_resources().expect("detection should succeed in test env");
assert!(res.cpu_count > 0, "should detect at least 1 CPU"); assert!(res.cpu_count > 0, "should detect at least 1 CPU");
assert!( assert!(res.total_memory_bytes > 0, "should detect nonzero memory");
res.total_memory_bytes > 0,
"should detect nonzero memory"
);
} }
#[test] #[test]
@ -139,8 +135,7 @@ mod tests {
cpu_millicores: 100, cpu_millicores: 100,
memory_bytes: 256 * 1024 * 1024, memory_bytes: 256 * 1024 * 1024,
}; };
let nr = compute_node_resources(&reservation, 110) let nr = compute_node_resources(&reservation, 110).expect("should succeed in test env");
.expect("should succeed in test env");
let capacity_cpu_millis = nr.capacity.cpu_count as i64 * 1000; let capacity_cpu_millis = nr.capacity.cpu_count as i64 * 1000;
assert!( assert!(
@ -164,8 +159,7 @@ mod tests {
cpu_millicores: i64::MAX, cpu_millicores: i64::MAX,
memory_bytes: i64::MAX, memory_bytes: i64::MAX,
}; };
let nr = compute_node_resources(&reservation, 110) let nr = compute_node_resources(&reservation, 110).expect("should succeed in test env");
.expect("should succeed in test env");
assert_eq!(nr.allocatable_cpu_millicores, 0); assert_eq!(nr.allocatable_cpu_millicores, 0);
assert_eq!(nr.allocatable_memory_bytes, 0); assert_eq!(nr.allocatable_memory_bytes, 0);

View file

@ -64,6 +64,12 @@ pub trait ZoneRuntime: Send + Sync {
/// Set up network for a zone /// Set up network for a zone
async fn setup_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>; async fn setup_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>;
/// Configure IP address inside a running zone via ipadm
///
/// Must be called after the zone is booted. Creates the IP interface,
/// assigns a static address, and configures the default route.
async fn configure_zone_ip(&self, zone_name: &str, network: &NetworkMode) -> Result<()>;
/// Tear down network for a zone /// Tear down network for a zone
async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>; async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>;

View file

@ -7,6 +7,8 @@ pub enum ZoneBrand {
Lx, Lx,
/// Custom reddwarf brand (Pod = Zone, containers = supervised processes) /// Custom reddwarf brand (Pod = Zone, containers = supervised processes)
Reddwarf, Reddwarf,
/// Bhyve branded zone (hardware virtual machine)
Bhyve,
} }
impl ZoneBrand { impl ZoneBrand {
@ -15,6 +17,7 @@ impl ZoneBrand {
match self { match self {
ZoneBrand::Lx => "lx", ZoneBrand::Lx => "lx",
ZoneBrand::Reddwarf => "reddwarf", ZoneBrand::Reddwarf => "reddwarf",
ZoneBrand::Bhyve => "bhyve",
} }
} }
} }
@ -230,6 +233,8 @@ pub struct ZoneConfig {
pub storage: ZoneStorageOpts, pub storage: ZoneStorageOpts,
/// LX brand image path (only for Lx brand) /// LX brand image path (only for Lx brand)
pub lx_image_path: Option<String>, pub lx_image_path: Option<String>,
/// Bhyve disk image path (only for Bhyve brand, e.g., a ZFS zvol device)
pub bhyve_disk_image: Option<String>,
/// Supervised processes (for reddwarf brand) /// Supervised processes (for reddwarf brand)
pub processes: Vec<ContainerProcess>, pub processes: Vec<ContainerProcess>,
/// CPU cap (fraction, e.g., "1.0" = 1 CPU) /// CPU cap (fraction, e.g., "1.0" = 1 CPU)
@ -277,6 +282,7 @@ mod tests {
fn test_zone_brand_display() { fn test_zone_brand_display() {
assert_eq!(ZoneBrand::Lx.as_str(), "lx"); assert_eq!(ZoneBrand::Lx.as_str(), "lx");
assert_eq!(ZoneBrand::Reddwarf.as_str(), "reddwarf"); assert_eq!(ZoneBrand::Reddwarf.as_str(), "reddwarf");
assert_eq!(ZoneBrand::Bhyve.as_str(), "bhyve");
} }
#[test] #[test]

View file

@ -1,5 +1,6 @@
use crate::brand::bhyve::bhyve_zonecfg_lines;
use crate::error::Result; use crate::error::Result;
use crate::types::{NetworkMode, ZoneConfig}; use crate::types::{NetworkMode, ZoneBrand, ZoneConfig};
/// Generate a zonecfg command file from a ZoneConfig /// Generate a zonecfg command file from a ZoneConfig
pub fn generate_zonecfg(config: &ZoneConfig) -> Result<String> { pub fn generate_zonecfg(config: &ZoneConfig) -> Result<String> {
@ -45,6 +46,13 @@ pub fn generate_zonecfg(config: &ZoneConfig) -> Result<String> {
lines.push("end".to_string()); lines.push("end".to_string());
} }
// Bhyve-specific device configuration
if config.brand == ZoneBrand::Bhyve {
for line in bhyve_zonecfg_lines(config) {
lines.push(line);
}
}
// Filesystem mounts // Filesystem mounts
for mount in &config.fs_mounts { for mount in &config.fs_mounts {
lines.push("add fs".to_string()); lines.push("add fs".to_string());
@ -86,6 +94,7 @@ mod tests {
quota: Some("10G".to_string()), quota: Some("10G".to_string()),
}, },
lx_image_path: Some("/images/ubuntu-22.04.tar.gz".to_string()), lx_image_path: Some("/images/ubuntu-22.04.tar.gz".to_string()),
bhyve_disk_image: None,
processes: vec![], processes: vec![],
cpu_cap: Some("2.0".to_string()), cpu_cap: Some("2.0".to_string()),
memory_cap: Some("1G".to_string()), memory_cap: Some("1G".to_string()),
@ -123,6 +132,7 @@ mod tests {
quota: None, quota: None,
}, },
lx_image_path: None, lx_image_path: None,
bhyve_disk_image: None,
processes: vec![ContainerProcess { processes: vec![ContainerProcess {
name: "web".to_string(), name: "web".to_string(),
command: vec!["/usr/bin/node".to_string(), "server.js".to_string()], command: vec!["/usr/bin/node".to_string(), "server.js".to_string()],

View file

@ -435,7 +435,10 @@ mod tests {
let filter = ZoneBrandMatch; let filter = ZoneBrandMatch;
let result = filter.filter(&context, &node); let result = filter.filter(&context, &node);
assert!(!result.passed); assert!(!result.passed);
assert!(result.reason.unwrap().contains("does not support zone brand 'lx'")); assert!(result
.reason
.unwrap()
.contains("does not support zone brand 'lx'"));
} }
#[test] #[test]

View file

@ -2,8 +2,9 @@ use clap::{Parser, Subcommand};
use reddwarf_apiserver::{ApiError, ApiServer, AppState, Config as ApiConfig, TlsMode}; use reddwarf_apiserver::{ApiError, ApiServer, AppState, Config as ApiConfig, TlsMode};
use reddwarf_core::{Namespace, ResourceQuantities}; use reddwarf_core::{Namespace, ResourceQuantities};
use reddwarf_runtime::{ use reddwarf_runtime::{
ApiClient, Ipam, MockRuntime, MockStorageEngine, NodeAgent, NodeAgentConfig, ApiClient, DnsServer, DnsServerConfig, Ipam, MockRuntime, MockStorageEngine, NatManager,
NodeHealthChecker, NodeHealthCheckerConfig, PodController, PodControllerConfig, StorageEngine, NodeAgent, NodeAgentConfig, NodeHealthChecker, NodeHealthCheckerConfig, PodController,
PodControllerConfig, ServiceController, ServiceControllerConfig, StorageEngine,
StoragePoolConfig, ZoneBrand, StoragePoolConfig, ZoneBrand,
}; };
use reddwarf_scheduler::scheduler::SchedulerConfig; use reddwarf_scheduler::scheduler::SchedulerConfig;
@ -40,6 +41,7 @@ struct TlsArgs {
} }
#[derive(Subcommand)] #[derive(Subcommand)]
#[allow(clippy::large_enum_variant)]
enum Commands { enum Commands {
/// Run the API server only /// Run the API server only
Serve { Serve {
@ -81,6 +83,9 @@ enum Commands {
/// Pod network CIDR for IPAM allocation /// Pod network CIDR for IPAM allocation
#[arg(long, default_value = "10.88.0.0/16")] #[arg(long, default_value = "10.88.0.0/16")]
pod_cidr: String, pod_cidr: String,
/// Service ClusterIP CIDR for service IP allocation
#[arg(long, default_value = "10.96.0.0/12")]
service_cidr: String,
/// Etherstub name for pod networking /// Etherstub name for pod networking
#[arg(long, default_value = "reddwarf0")] #[arg(long, default_value = "reddwarf0")]
etherstub_name: String, etherstub_name: String,
@ -96,6 +101,9 @@ enum Commands {
/// Comma-separated list of zone brands this node supports /// Comma-separated list of zone brands this node supports
#[arg(long, default_value = "reddwarf")] #[arg(long, default_value = "reddwarf")]
supported_brands: String, supported_brands: String,
/// DNS server listen address for service discovery
#[arg(long, default_value = "0.0.0.0:10053")]
cluster_dns: String,
#[command(flatten)] #[command(flatten)]
tls_args: TlsArgs, tls_args: TlsArgs,
}, },
@ -129,15 +137,17 @@ async fn main() -> miette::Result<()> {
volumes_dataset, volumes_dataset,
zonepath_prefix, zonepath_prefix,
pod_cidr, pod_cidr,
service_cidr,
etherstub_name, etherstub_name,
system_reserved_cpu, system_reserved_cpu,
system_reserved_memory, system_reserved_memory,
max_pods, max_pods,
supported_brands, supported_brands,
cluster_dns,
tls_args, tls_args,
} => { } => {
let reserved_cpu_millicores = let reserved_cpu_millicores = ResourceQuantities::parse_cpu(&system_reserved_cpu)
ResourceQuantities::parse_cpu(&system_reserved_cpu).map_err(|e| { .map_err(|e| {
miette::miette!( miette::miette!(
help = "Use a value like '100m' or '0.1' for --system-reserved-cpu", help = "Use a value like '100m' or '0.1' for --system-reserved-cpu",
"Invalid --system-reserved-cpu '{}': {}", "Invalid --system-reserved-cpu '{}': {}",
@ -145,8 +155,8 @@ async fn main() -> miette::Result<()> {
e e
) )
})?; })?;
let reserved_memory_bytes = let reserved_memory_bytes = ResourceQuantities::parse_memory(&system_reserved_memory)
ResourceQuantities::parse_memory(&system_reserved_memory).map_err(|e| { .map_err(|e| {
miette::miette!( miette::miette!(
help = "Use a value like '256Mi' or '1Gi' for --system-reserved-memory", help = "Use a value like '256Mi' or '1Gi' for --system-reserved-memory",
"Invalid --system-reserved-memory '{}': {}", "Invalid --system-reserved-memory '{}': {}",
@ -171,11 +181,13 @@ async fn main() -> miette::Result<()> {
volumes_dataset.as_deref(), volumes_dataset.as_deref(),
zonepath_prefix.as_deref(), zonepath_prefix.as_deref(),
&pod_cidr, &pod_cidr,
&service_cidr,
&etherstub_name, &etherstub_name,
reserved_cpu_millicores, reserved_cpu_millicores,
reserved_memory_bytes, reserved_memory_bytes,
max_pods, max_pods,
&supported_brands, &supported_brands,
&cluster_dns,
&tls_args, &tls_args,
) )
.await .await
@ -272,17 +284,33 @@ async fn run_agent(
volumes_dataset: Option<&str>, volumes_dataset: Option<&str>,
zonepath_prefix: Option<&str>, zonepath_prefix: Option<&str>,
pod_cidr: &str, pod_cidr: &str,
service_cidr: &str,
etherstub_name: &str, etherstub_name: &str,
system_reserved_cpu_millicores: i64, system_reserved_cpu_millicores: i64,
system_reserved_memory_bytes: i64, system_reserved_memory_bytes: i64,
max_pods: u32, max_pods: u32,
supported_brands: &[String], supported_brands: &[String],
cluster_dns: &str,
tls_args: &TlsArgs, tls_args: &TlsArgs,
) -> miette::Result<()> { ) -> miette::Result<()> {
info!("Starting reddwarf agent for node '{}'", node_name); info!("Starting reddwarf agent for node '{}'", node_name);
let state = create_app_state(data_dir)?; let state = create_app_state(data_dir)?;
// Create service IPAM for ClusterIP allocation and attach to state
let service_ipam =
Ipam::with_prefix(state.storage.clone(), service_cidr, "svc-ipam").map_err(|e| {
miette::miette!(
"Failed to initialize service IPAM with CIDR '{}': {}",
service_cidr,
e
)
})?;
let state = Arc::new(AppState {
service_ipam: Some(Arc::new(service_ipam)),
..(*state).clone()
});
bootstrap_default_namespace(&state).await?; bootstrap_default_namespace(&state).await?;
let listen_addr: std::net::SocketAddr = bind let listen_addr: std::net::SocketAddr = bind
@ -372,7 +400,7 @@ async fn run_agent(
}; };
let controller = PodController::new( let controller = PodController::new(
runtime, runtime.clone(),
api_client.clone(), api_client.clone(),
state.event_tx.clone(), state.event_tx.clone(),
controller_config, controller_config,
@ -391,7 +419,8 @@ async fn run_agent(
node_agent_config.system_reserved_memory_bytes = system_reserved_memory_bytes; node_agent_config.system_reserved_memory_bytes = system_reserved_memory_bytes;
node_agent_config.max_pods = max_pods; node_agent_config.max_pods = max_pods;
node_agent_config.supported_brands = supported_brands.to_vec(); node_agent_config.supported_brands = supported_brands.to_vec();
let node_agent = NodeAgent::new(api_client.clone(), node_agent_config); let node_agent =
NodeAgent::with_runtime(api_client.clone(), node_agent_config, Some(runtime.clone()));
let agent_token = token.clone(); let agent_token = token.clone();
let node_agent_handle = tokio::spawn(async move { let node_agent_handle = tokio::spawn(async move {
if let Err(e) = node_agent.run(agent_token).await { if let Err(e) = node_agent.run(agent_token).await {
@ -399,7 +428,35 @@ async fn run_agent(
} }
}); });
// 7. Spawn node health checker // 7. Spawn service controller
let nat_manager = Arc::new(NatManager::new(etherstub_name));
let service_controller = ServiceController::new(
state.storage.clone(),
state.event_tx.clone(),
nat_manager,
ServiceControllerConfig::default(),
);
let service_token = token.clone();
let service_handle = tokio::spawn(async move {
if let Err(e) = service_controller.run(service_token).await {
error!("Service controller error: {}", e);
}
});
// 8. Spawn DNS server for service discovery
let dns_config = DnsServerConfig {
listen_addr: cluster_dns.to_string(),
..DnsServerConfig::default()
};
let dns_server = DnsServer::new(state.storage.clone(), dns_config);
let dns_token = token.clone();
let dns_handle = tokio::spawn(async move {
if let Err(e) = dns_server.run(dns_token).await {
error!("DNS server error: {}", e);
}
});
// 9. Spawn node health checker
let health_checker = NodeHealthChecker::new(api_client, NodeHealthCheckerConfig::default()); let health_checker = NodeHealthChecker::new(api_client, NodeHealthCheckerConfig::default());
let health_token = token.clone(); let health_token = token.clone();
let health_handle = tokio::spawn(async move { let health_handle = tokio::spawn(async move {
@ -409,8 +466,8 @@ async fn run_agent(
}); });
info!( info!(
"All components started. API server on {}, node name: {}, pod CIDR: {}", "All components started. API server on {}, node name: {}, pod CIDR: {}, DNS on {}",
bind, node_name, pod_cidr bind, node_name, pod_cidr, cluster_dns
); );
// Wait for shutdown signal (SIGINT or SIGTERM) // Wait for shutdown signal (SIGINT or SIGTERM)
@ -426,6 +483,8 @@ async fn run_agent(
scheduler_handle, scheduler_handle,
controller_handle, controller_handle,
node_agent_handle, node_agent_handle,
service_handle,
dns_handle,
health_handle, health_handle,
); );
}) })