From 4bfcc39a69ee3bad765066dfde39c78f44d94e6d Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sat, 14 Feb 2026 17:34:39 +0100 Subject: [PATCH] Add container resource limits to zone caps: extract, aggregate, and convert Move ResourceQuantities from reddwarf-scheduler to reddwarf-core so both scheduler and runtime share K8s CPU/memory parsing. Add cpu_as_zone_cap() and memory_as_zone_cap() conversions for illumos zonecfg format. Wire pod_to_zone_config() to aggregate container limits (with requests fallback) and pass capped-cpu/capped-memory to the zone, closing the resource pipeline. Co-Authored-By: Claude Opus 4.6 --- crates/reddwarf-core/src/lib.rs | 2 +- crates/reddwarf-core/src/resources/mod.rs | 4 + .../reddwarf-core/src/resources/quantities.rs | 169 +++++++++++++++++ crates/reddwarf-runtime/src/controller.rs | 174 +++++++++++++++++- crates/reddwarf-scheduler/src/types.rs | 104 +---------- 5 files changed, 346 insertions(+), 107 deletions(-) create mode 100644 crates/reddwarf-core/src/resources/quantities.rs diff --git a/crates/reddwarf-core/src/lib.rs b/crates/reddwarf-core/src/lib.rs index 98a4799..5699142 100644 --- a/crates/reddwarf-core/src/lib.rs +++ b/crates/reddwarf-core/src/lib.rs @@ -14,7 +14,7 @@ pub mod types; // Re-export commonly used types pub use error::{ReddwarfError, Result}; pub use events::{ResourceEvent, WatchEventType}; -pub use resources::{is_valid_name, Resource, ResourceError}; +pub use resources::{is_valid_name, Resource, ResourceError, ResourceQuantities}; pub use types::{GroupVersionKind, ResourceKey, ResourceVersion}; // Re-export k8s-openapi types for convenience diff --git a/crates/reddwarf-core/src/resources/mod.rs b/crates/reddwarf-core/src/resources/mod.rs index bd7a170..c5c2824 100644 --- a/crates/reddwarf-core/src/resources/mod.rs +++ b/crates/reddwarf-core/src/resources/mod.rs @@ -1,3 +1,7 @@ +pub mod quantities; + +pub use quantities::ResourceQuantities; + use crate::{GroupVersionKind, ResourceKey, ResourceVersion}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use serde::{Deserialize, Serialize}; diff --git a/crates/reddwarf-core/src/resources/quantities.rs b/crates/reddwarf-core/src/resources/quantities.rs new file mode 100644 index 0000000..45b4a40 --- /dev/null +++ b/crates/reddwarf-core/src/resources/quantities.rs @@ -0,0 +1,169 @@ +use std::collections::HashMap; + +/// Resource quantities for nodes and pods +#[derive(Debug, Clone, Default)] +pub struct ResourceQuantities { + /// CPU in millicores (1000 = 1 core) + pub cpu_millicores: i64, + /// Memory in bytes + pub memory_bytes: i64, +} + +impl ResourceQuantities { + /// Parse CPU string (e.g., "2", "1000m", "0.5") + pub fn parse_cpu(s: &str) -> Result { + if let Some(m) = s.strip_suffix('m') { + // Millicores + m.parse::() + .map_err(|e| format!("Invalid CPU millicore value: {}", e)) + } else if let Ok(cores) = s.parse::() { + // Cores as float + Ok((cores * 1000.0) as i64) + } else { + Err(format!("Invalid CPU format: {}", s)) + } + } + + /// Parse memory string (e.g., "128Mi", "1Gi", "1024") + pub fn parse_memory(s: &str) -> Result { + if let Some(num) = s.strip_suffix("Ki") { + Ok(num.parse::().map_err(|e| e.to_string())? * 1024) + } else if let Some(num) = s.strip_suffix("Mi") { + Ok(num.parse::().map_err(|e| e.to_string())? * 1024 * 1024) + } else if let Some(num) = s.strip_suffix("Gi") { + Ok(num.parse::().map_err(|e| e.to_string())? * 1024 * 1024 * 1024) + } else { + // Plain bytes + s.parse::().map_err(|e| e.to_string()) + } + } + + /// Get CPU and memory from a resource map (k8s-openapi format) + pub fn from_k8s_resource_map( + resources: &std::collections::BTreeMap< + String, + k8s_openapi::apimachinery::pkg::api::resource::Quantity, + >, + ) -> Self { + let cpu_millicores = resources + .get("cpu") + .and_then(|q| Self::parse_cpu(&q.0).ok()) + .unwrap_or(0); + + let memory_bytes = resources + .get("memory") + .and_then(|q| Self::parse_memory(&q.0).ok()) + .unwrap_or(0); + + Self { + cpu_millicores, + memory_bytes, + } + } + + /// Get CPU and memory from a resource map (test format) + pub fn from_resource_map(resources: &HashMap) -> Self { + let cpu_millicores = resources + .get("cpu") + .and_then(|s| Self::parse_cpu(s).ok()) + .unwrap_or(0); + + let memory_bytes = resources + .get("memory") + .and_then(|s| Self::parse_memory(s).ok()) + .unwrap_or(0); + + Self { + cpu_millicores, + memory_bytes, + } + } + + /// Convert millicores to illumos zonecfg `capped-cpu` ncpus value. + /// + /// Returns a float string: 500m -> "0.50", 2000m -> "2.00". + pub fn cpu_as_zone_cap(millicores: i64) -> String { + format!("{:.2}", millicores as f64 / 1000.0) + } + + /// Convert bytes to illumos zonecfg `capped-memory` physical value. + /// + /// Picks the largest clean unit: G, M, K, or raw bytes. + /// Uses illumos zonecfg suffixes (G/M/K), NOT K8s (Gi/Mi/Ki). + pub fn memory_as_zone_cap(bytes: i64) -> String { + const GIB: i64 = 1024 * 1024 * 1024; + const MIB: i64 = 1024 * 1024; + const KIB: i64 = 1024; + + if bytes > 0 && bytes % GIB == 0 { + format!("{}G", bytes / GIB) + } else if bytes > 0 && bytes % MIB == 0 { + format!("{}M", bytes / MIB) + } else if bytes > 0 && bytes % KIB == 0 { + format!("{}K", bytes / KIB) + } else { + format!("{}", bytes) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_cpu() { + assert_eq!(ResourceQuantities::parse_cpu("1").unwrap(), 1000); + assert_eq!(ResourceQuantities::parse_cpu("0.5").unwrap(), 500); + assert_eq!(ResourceQuantities::parse_cpu("100m").unwrap(), 100); + assert_eq!(ResourceQuantities::parse_cpu("2").unwrap(), 2000); + } + + #[test] + fn test_parse_memory() { + assert_eq!(ResourceQuantities::parse_memory("1024").unwrap(), 1024); + assert_eq!(ResourceQuantities::parse_memory("1Ki").unwrap(), 1024); + assert_eq!( + ResourceQuantities::parse_memory("128Mi").unwrap(), + 128 * 1024 * 1024 + ); + assert_eq!( + ResourceQuantities::parse_memory("1Gi").unwrap(), + 1024 * 1024 * 1024 + ); + } + + #[test] + fn test_cpu_as_zone_cap() { + assert_eq!(ResourceQuantities::cpu_as_zone_cap(500), "0.50"); + assert_eq!(ResourceQuantities::cpu_as_zone_cap(1000), "1.00"); + assert_eq!(ResourceQuantities::cpu_as_zone_cap(2500), "2.50"); + assert_eq!(ResourceQuantities::cpu_as_zone_cap(100), "0.10"); + } + + #[test] + fn test_memory_as_zone_cap() { + // Exact GiB + assert_eq!( + ResourceQuantities::memory_as_zone_cap(1024 * 1024 * 1024), + "1G" + ); + // Exact MiB + assert_eq!( + ResourceQuantities::memory_as_zone_cap(512 * 1024 * 1024), + "512M" + ); + // Exact KiB + assert_eq!( + ResourceQuantities::memory_as_zone_cap(256 * 1024), + "256K" + ); + // 1500 MiB = not a clean GiB, falls to MiB + assert_eq!( + ResourceQuantities::memory_as_zone_cap(1500 * 1024 * 1024), + "1500M" + ); + // Raw bytes (not aligned) + assert_eq!(ResourceQuantities::memory_as_zone_cap(1023), "1023"); + } +} diff --git a/crates/reddwarf-runtime/src/controller.rs b/crates/reddwarf-runtime/src/controller.rs index 2bda569..5f1261c 100644 --- a/crates/reddwarf-runtime/src/controller.rs +++ b/crates/reddwarf-runtime/src/controller.rs @@ -4,7 +4,7 @@ use crate::network::{vnic_name_for_pod, Ipam}; use crate::traits::ZoneRuntime; use crate::types::*; use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus}; -use reddwarf_core::{ResourceEvent, WatchEventType}; +use reddwarf_core::{ResourceEvent, ResourceQuantities, WatchEventType}; use std::sync::Arc; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; @@ -437,6 +437,37 @@ impl PodController { }) .collect(); + // Aggregate resource limits across all containers in the pod. + // Prefer limits (hard cap) over requests (soft guarantee). + let (total_cpu_millicores, total_memory_bytes) = + spec.containers.iter().fold((0i64, 0i64), |(cpu, mem), c| { + let resources = c.resources.as_ref(); + let res_map = resources + .and_then(|r| r.limits.as_ref()) + .or_else(|| resources.and_then(|r| r.requests.as_ref())); + + let (c_cpu, c_mem) = match res_map { + Some(map) => { + let rq = ResourceQuantities::from_k8s_resource_map(map); + (rq.cpu_millicores, rq.memory_bytes) + } + None => (0, 0), + }; + (cpu + c_cpu, mem + c_mem) + }); + + let cpu_cap = if total_cpu_millicores > 0 { + Some(ResourceQuantities::cpu_as_zone_cap(total_cpu_millicores)) + } else { + None + }; + + let memory_cap = if total_memory_bytes > 0 { + Some(ResourceQuantities::memory_as_zone_cap(total_memory_bytes)) + } else { + None + }; + Ok(ZoneConfig { zone_name, brand: self.config.default_brand.clone(), @@ -445,8 +476,8 @@ impl PodController { storage: ZoneStorageOpts::default(), lx_image_path: None, processes, - cpu_cap: None, - memory_cap: None, + cpu_cap, + memory_cap, fs_mounts: vec![], }) } @@ -646,4 +677,141 @@ mod tests { let result = controller.pod_to_zone_config(&pod); assert!(result.is_err()); } + + #[test] + fn test_pod_to_zone_config_with_cpu_and_memory_limits() { + use k8s_openapi::api::core::v1::ResourceRequirements; + use k8s_openapi::apimachinery::pkg::api::resource::Quantity; + use std::collections::BTreeMap; + + let (controller, _dir) = make_test_controller(); + + let mut limits = BTreeMap::new(); + limits.insert("cpu".to_string(), Quantity("1".to_string())); + limits.insert("memory".to_string(), Quantity("512Mi".to_string())); + + let mut pod = Pod::default(); + pod.metadata.name = Some("capped-pod".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.spec = Some(PodSpec { + containers: vec![Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + resources: Some(ResourceRequirements { + limits: Some(limits), + ..Default::default() + }), + ..Default::default() + }], + ..Default::default() + }); + + let zone_config = controller.pod_to_zone_config(&pod).unwrap(); + assert_eq!(zone_config.cpu_cap, Some("1.00".to_string())); + assert_eq!(zone_config.memory_cap, Some("512M".to_string())); + } + + #[test] + fn test_pod_to_zone_config_with_requests_fallback() { + use k8s_openapi::api::core::v1::ResourceRequirements; + use k8s_openapi::apimachinery::pkg::api::resource::Quantity; + use std::collections::BTreeMap; + + let (controller, _dir) = make_test_controller(); + + let mut requests = BTreeMap::new(); + requests.insert("cpu".to_string(), Quantity("500m".to_string())); + requests.insert("memory".to_string(), Quantity("256Mi".to_string())); + + let mut pod = Pod::default(); + pod.metadata.name = Some("req-pod".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.spec = Some(PodSpec { + containers: vec![Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + resources: Some(ResourceRequirements { + requests: Some(requests), + limits: None, + ..Default::default() + }), + ..Default::default() + }], + ..Default::default() + }); + + let zone_config = controller.pod_to_zone_config(&pod).unwrap(); + assert_eq!(zone_config.cpu_cap, Some("0.50".to_string())); + assert_eq!(zone_config.memory_cap, Some("256M".to_string())); + } + + #[test] + fn test_pod_to_zone_config_aggregates_multiple_containers() { + use k8s_openapi::api::core::v1::ResourceRequirements; + use k8s_openapi::apimachinery::pkg::api::resource::Quantity; + use std::collections::BTreeMap; + + let (controller, _dir) = make_test_controller(); + + let make_limits = |cpu: &str, mem: &str| { + let mut limits = BTreeMap::new(); + limits.insert("cpu".to_string(), Quantity(cpu.to_string())); + limits.insert("memory".to_string(), Quantity(mem.to_string())); + limits + }; + + let mut pod = Pod::default(); + pod.metadata.name = Some("multi-pod".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.spec = Some(PodSpec { + containers: vec![ + Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + resources: Some(ResourceRequirements { + limits: Some(make_limits("500m", "256Mi")), + ..Default::default() + }), + ..Default::default() + }, + Container { + name: "sidecar".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + resources: Some(ResourceRequirements { + limits: Some(make_limits("500m", "256Mi")), + ..Default::default() + }), + ..Default::default() + }, + ], + ..Default::default() + }); + + let zone_config = controller.pod_to_zone_config(&pod).unwrap(); + // 500m + 500m = 1000m = 1.00 + assert_eq!(zone_config.cpu_cap, Some("1.00".to_string())); + // 256Mi + 256Mi = 512Mi + assert_eq!(zone_config.memory_cap, Some("512M".to_string())); + } + + #[test] + fn test_pod_to_zone_config_no_resources() { + let (controller, _dir) = make_test_controller(); + + let mut pod = Pod::default(); + pod.metadata.name = Some("bare-pod".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.spec = Some(PodSpec { + containers: vec![Container { + name: "web".to_string(), + command: Some(vec!["/bin/sh".to_string()]), + ..Default::default() + }], + ..Default::default() + }); + + let zone_config = controller.pod_to_zone_config(&pod).unwrap(); + assert_eq!(zone_config.cpu_cap, None); + assert_eq!(zone_config.memory_cap, None); + } } diff --git a/crates/reddwarf-scheduler/src/types.rs b/crates/reddwarf-scheduler/src/types.rs index be517c2..1c1c6eb 100644 --- a/crates/reddwarf-scheduler/src/types.rs +++ b/crates/reddwarf-scheduler/src/types.rs @@ -1,5 +1,5 @@ +pub use reddwarf_core::ResourceQuantities; use reddwarf_core::{Node, Pod}; -use std::collections::HashMap; /// Scheduling context containing pod and available nodes #[derive(Debug, Clone)] @@ -64,112 +64,10 @@ impl ScoreResult { } } -/// Resource quantities for nodes -#[derive(Debug, Clone, Default)] -pub struct ResourceQuantities { - /// CPU in millicores (1000 = 1 core) - pub cpu_millicores: i64, - /// Memory in bytes - pub memory_bytes: i64, -} - -impl ResourceQuantities { - /// Parse CPU string (e.g., "2", "1000m", "0.5") - pub fn parse_cpu(s: &str) -> Result { - if let Some(m) = s.strip_suffix('m') { - // Millicores - m.parse::() - .map_err(|e| format!("Invalid CPU millicore value: {}", e)) - } else if let Ok(cores) = s.parse::() { - // Cores as float - Ok((cores * 1000.0) as i64) - } else { - Err(format!("Invalid CPU format: {}", s)) - } - } - - /// Parse memory string (e.g., "128Mi", "1Gi", "1024") - pub fn parse_memory(s: &str) -> Result { - if let Some(num) = s.strip_suffix("Ki") { - Ok(num.parse::().map_err(|e| e.to_string())? * 1024) - } else if let Some(num) = s.strip_suffix("Mi") { - Ok(num.parse::().map_err(|e| e.to_string())? * 1024 * 1024) - } else if let Some(num) = s.strip_suffix("Gi") { - Ok(num.parse::().map_err(|e| e.to_string())? * 1024 * 1024 * 1024) - } else { - // Plain bytes - s.parse::().map_err(|e| e.to_string()) - } - } - - /// Get CPU and memory from a resource map (k8s-openapi format) - pub fn from_k8s_resource_map( - resources: &std::collections::BTreeMap< - String, - k8s_openapi::apimachinery::pkg::api::resource::Quantity, - >, - ) -> Self { - let cpu_millicores = resources - .get("cpu") - .and_then(|q| Self::parse_cpu(&q.0).ok()) - .unwrap_or(0); - - let memory_bytes = resources - .get("memory") - .and_then(|q| Self::parse_memory(&q.0).ok()) - .unwrap_or(0); - - Self { - cpu_millicores, - memory_bytes, - } - } - - /// Get CPU and memory from a resource map (test format) - pub fn from_resource_map(resources: &HashMap) -> Self { - let cpu_millicores = resources - .get("cpu") - .and_then(|s| Self::parse_cpu(s).ok()) - .unwrap_or(0); - - let memory_bytes = resources - .get("memory") - .and_then(|s| Self::parse_memory(s).ok()) - .unwrap_or(0); - - Self { - cpu_millicores, - memory_bytes, - } - } -} - #[cfg(test)] mod tests { use super::*; - #[test] - fn test_parse_cpu() { - assert_eq!(ResourceQuantities::parse_cpu("1").unwrap(), 1000); - assert_eq!(ResourceQuantities::parse_cpu("0.5").unwrap(), 500); - assert_eq!(ResourceQuantities::parse_cpu("100m").unwrap(), 100); - assert_eq!(ResourceQuantities::parse_cpu("2").unwrap(), 2000); - } - - #[test] - fn test_parse_memory() { - assert_eq!(ResourceQuantities::parse_memory("1024").unwrap(), 1024); - assert_eq!(ResourceQuantities::parse_memory("1Ki").unwrap(), 1024); - assert_eq!( - ResourceQuantities::parse_memory("128Mi").unwrap(), - 128 * 1024 * 1024 - ); - assert_eq!( - ResourceQuantities::parse_memory("1Gi").unwrap(), - 1024 * 1024 * 1024 - ); - } - #[test] fn test_filter_result() { let pass = FilterResult::pass("node1".to_string());