Add container resource limits to zone caps: extract, aggregate, and convert

Move ResourceQuantities from reddwarf-scheduler to reddwarf-core so both
scheduler and runtime share K8s CPU/memory parsing. Add cpu_as_zone_cap()
and memory_as_zone_cap() conversions for illumos zonecfg format. Wire
pod_to_zone_config() to aggregate container limits (with requests fallback)
and pass capped-cpu/capped-memory to the zone, closing the resource pipeline.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Till Wegmueller 2026-02-14 17:34:39 +01:00
parent 0ac169e1bd
commit 4bfcc39a69
No known key found for this signature in database
5 changed files with 346 additions and 107 deletions

View file

@ -14,7 +14,7 @@ pub mod types;
// Re-export commonly used types // Re-export commonly used types
pub use error::{ReddwarfError, Result}; pub use error::{ReddwarfError, Result};
pub use events::{ResourceEvent, WatchEventType}; pub use events::{ResourceEvent, WatchEventType};
pub use resources::{is_valid_name, Resource, ResourceError}; pub use resources::{is_valid_name, Resource, ResourceError, ResourceQuantities};
pub use types::{GroupVersionKind, ResourceKey, ResourceVersion}; pub use types::{GroupVersionKind, ResourceKey, ResourceVersion};
// Re-export k8s-openapi types for convenience // Re-export k8s-openapi types for convenience

View file

@ -1,3 +1,7 @@
pub mod quantities;
pub use quantities::ResourceQuantities;
use crate::{GroupVersionKind, ResourceKey, ResourceVersion}; use crate::{GroupVersionKind, ResourceKey, ResourceVersion};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

View file

@ -0,0 +1,169 @@
use std::collections::HashMap;
/// Resource quantities for nodes and pods
#[derive(Debug, Clone, Default)]
pub struct ResourceQuantities {
/// CPU in millicores (1000 = 1 core)
pub cpu_millicores: i64,
/// Memory in bytes
pub memory_bytes: i64,
}
impl ResourceQuantities {
/// Parse CPU string (e.g., "2", "1000m", "0.5")
pub fn parse_cpu(s: &str) -> Result<i64, String> {
if let Some(m) = s.strip_suffix('m') {
// Millicores
m.parse::<i64>()
.map_err(|e| format!("Invalid CPU millicore value: {}", e))
} else if let Ok(cores) = s.parse::<f64>() {
// Cores as float
Ok((cores * 1000.0) as i64)
} else {
Err(format!("Invalid CPU format: {}", s))
}
}
/// Parse memory string (e.g., "128Mi", "1Gi", "1024")
pub fn parse_memory(s: &str) -> Result<i64, String> {
if let Some(num) = s.strip_suffix("Ki") {
Ok(num.parse::<i64>().map_err(|e| e.to_string())? * 1024)
} else if let Some(num) = s.strip_suffix("Mi") {
Ok(num.parse::<i64>().map_err(|e| e.to_string())? * 1024 * 1024)
} else if let Some(num) = s.strip_suffix("Gi") {
Ok(num.parse::<i64>().map_err(|e| e.to_string())? * 1024 * 1024 * 1024)
} else {
// Plain bytes
s.parse::<i64>().map_err(|e| e.to_string())
}
}
/// Get CPU and memory from a resource map (k8s-openapi format)
pub fn from_k8s_resource_map(
resources: &std::collections::BTreeMap<
String,
k8s_openapi::apimachinery::pkg::api::resource::Quantity,
>,
) -> Self {
let cpu_millicores = resources
.get("cpu")
.and_then(|q| Self::parse_cpu(&q.0).ok())
.unwrap_or(0);
let memory_bytes = resources
.get("memory")
.and_then(|q| Self::parse_memory(&q.0).ok())
.unwrap_or(0);
Self {
cpu_millicores,
memory_bytes,
}
}
/// Get CPU and memory from a resource map (test format)
pub fn from_resource_map(resources: &HashMap<String, String>) -> Self {
let cpu_millicores = resources
.get("cpu")
.and_then(|s| Self::parse_cpu(s).ok())
.unwrap_or(0);
let memory_bytes = resources
.get("memory")
.and_then(|s| Self::parse_memory(s).ok())
.unwrap_or(0);
Self {
cpu_millicores,
memory_bytes,
}
}
/// Convert millicores to illumos zonecfg `capped-cpu` ncpus value.
///
/// Returns a float string: 500m -> "0.50", 2000m -> "2.00".
pub fn cpu_as_zone_cap(millicores: i64) -> String {
format!("{:.2}", millicores as f64 / 1000.0)
}
/// Convert bytes to illumos zonecfg `capped-memory` physical value.
///
/// Picks the largest clean unit: G, M, K, or raw bytes.
/// Uses illumos zonecfg suffixes (G/M/K), NOT K8s (Gi/Mi/Ki).
pub fn memory_as_zone_cap(bytes: i64) -> String {
const GIB: i64 = 1024 * 1024 * 1024;
const MIB: i64 = 1024 * 1024;
const KIB: i64 = 1024;
if bytes > 0 && bytes % GIB == 0 {
format!("{}G", bytes / GIB)
} else if bytes > 0 && bytes % MIB == 0 {
format!("{}M", bytes / MIB)
} else if bytes > 0 && bytes % KIB == 0 {
format!("{}K", bytes / KIB)
} else {
format!("{}", bytes)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_cpu() {
assert_eq!(ResourceQuantities::parse_cpu("1").unwrap(), 1000);
assert_eq!(ResourceQuantities::parse_cpu("0.5").unwrap(), 500);
assert_eq!(ResourceQuantities::parse_cpu("100m").unwrap(), 100);
assert_eq!(ResourceQuantities::parse_cpu("2").unwrap(), 2000);
}
#[test]
fn test_parse_memory() {
assert_eq!(ResourceQuantities::parse_memory("1024").unwrap(), 1024);
assert_eq!(ResourceQuantities::parse_memory("1Ki").unwrap(), 1024);
assert_eq!(
ResourceQuantities::parse_memory("128Mi").unwrap(),
128 * 1024 * 1024
);
assert_eq!(
ResourceQuantities::parse_memory("1Gi").unwrap(),
1024 * 1024 * 1024
);
}
#[test]
fn test_cpu_as_zone_cap() {
assert_eq!(ResourceQuantities::cpu_as_zone_cap(500), "0.50");
assert_eq!(ResourceQuantities::cpu_as_zone_cap(1000), "1.00");
assert_eq!(ResourceQuantities::cpu_as_zone_cap(2500), "2.50");
assert_eq!(ResourceQuantities::cpu_as_zone_cap(100), "0.10");
}
#[test]
fn test_memory_as_zone_cap() {
// Exact GiB
assert_eq!(
ResourceQuantities::memory_as_zone_cap(1024 * 1024 * 1024),
"1G"
);
// Exact MiB
assert_eq!(
ResourceQuantities::memory_as_zone_cap(512 * 1024 * 1024),
"512M"
);
// Exact KiB
assert_eq!(
ResourceQuantities::memory_as_zone_cap(256 * 1024),
"256K"
);
// 1500 MiB = not a clean GiB, falls to MiB
assert_eq!(
ResourceQuantities::memory_as_zone_cap(1500 * 1024 * 1024),
"1500M"
);
// Raw bytes (not aligned)
assert_eq!(ResourceQuantities::memory_as_zone_cap(1023), "1023");
}
}

View file

@ -4,7 +4,7 @@ use crate::network::{vnic_name_for_pod, Ipam};
use crate::traits::ZoneRuntime; use crate::traits::ZoneRuntime;
use crate::types::*; use crate::types::*;
use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus}; use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus};
use reddwarf_core::{ResourceEvent, WatchEventType}; use reddwarf_core::{ResourceEvent, ResourceQuantities, WatchEventType};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -437,6 +437,37 @@ impl PodController {
}) })
.collect(); .collect();
// Aggregate resource limits across all containers in the pod.
// Prefer limits (hard cap) over requests (soft guarantee).
let (total_cpu_millicores, total_memory_bytes) =
spec.containers.iter().fold((0i64, 0i64), |(cpu, mem), c| {
let resources = c.resources.as_ref();
let res_map = resources
.and_then(|r| r.limits.as_ref())
.or_else(|| resources.and_then(|r| r.requests.as_ref()));
let (c_cpu, c_mem) = match res_map {
Some(map) => {
let rq = ResourceQuantities::from_k8s_resource_map(map);
(rq.cpu_millicores, rq.memory_bytes)
}
None => (0, 0),
};
(cpu + c_cpu, mem + c_mem)
});
let cpu_cap = if total_cpu_millicores > 0 {
Some(ResourceQuantities::cpu_as_zone_cap(total_cpu_millicores))
} else {
None
};
let memory_cap = if total_memory_bytes > 0 {
Some(ResourceQuantities::memory_as_zone_cap(total_memory_bytes))
} else {
None
};
Ok(ZoneConfig { Ok(ZoneConfig {
zone_name, zone_name,
brand: self.config.default_brand.clone(), brand: self.config.default_brand.clone(),
@ -445,8 +476,8 @@ impl PodController {
storage: ZoneStorageOpts::default(), storage: ZoneStorageOpts::default(),
lx_image_path: None, lx_image_path: None,
processes, processes,
cpu_cap: None, cpu_cap,
memory_cap: None, memory_cap,
fs_mounts: vec![], fs_mounts: vec![],
}) })
} }
@ -646,4 +677,141 @@ mod tests {
let result = controller.pod_to_zone_config(&pod); let result = controller.pod_to_zone_config(&pod);
assert!(result.is_err()); assert!(result.is_err());
} }
#[test]
fn test_pod_to_zone_config_with_cpu_and_memory_limits() {
use k8s_openapi::api::core::v1::ResourceRequirements;
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use std::collections::BTreeMap;
let (controller, _dir) = make_test_controller();
let mut limits = BTreeMap::new();
limits.insert("cpu".to_string(), Quantity("1".to_string()));
limits.insert("memory".to_string(), Quantity("512Mi".to_string()));
let mut pod = Pod::default();
pod.metadata.name = Some("capped-pod".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.spec = Some(PodSpec {
containers: vec![Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
resources: Some(ResourceRequirements {
limits: Some(limits),
..Default::default()
}),
..Default::default()
}],
..Default::default()
});
let zone_config = controller.pod_to_zone_config(&pod).unwrap();
assert_eq!(zone_config.cpu_cap, Some("1.00".to_string()));
assert_eq!(zone_config.memory_cap, Some("512M".to_string()));
}
#[test]
fn test_pod_to_zone_config_with_requests_fallback() {
use k8s_openapi::api::core::v1::ResourceRequirements;
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use std::collections::BTreeMap;
let (controller, _dir) = make_test_controller();
let mut requests = BTreeMap::new();
requests.insert("cpu".to_string(), Quantity("500m".to_string()));
requests.insert("memory".to_string(), Quantity("256Mi".to_string()));
let mut pod = Pod::default();
pod.metadata.name = Some("req-pod".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.spec = Some(PodSpec {
containers: vec![Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
resources: Some(ResourceRequirements {
requests: Some(requests),
limits: None,
..Default::default()
}),
..Default::default()
}],
..Default::default()
});
let zone_config = controller.pod_to_zone_config(&pod).unwrap();
assert_eq!(zone_config.cpu_cap, Some("0.50".to_string()));
assert_eq!(zone_config.memory_cap, Some("256M".to_string()));
}
#[test]
fn test_pod_to_zone_config_aggregates_multiple_containers() {
use k8s_openapi::api::core::v1::ResourceRequirements;
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use std::collections::BTreeMap;
let (controller, _dir) = make_test_controller();
let make_limits = |cpu: &str, mem: &str| {
let mut limits = BTreeMap::new();
limits.insert("cpu".to_string(), Quantity(cpu.to_string()));
limits.insert("memory".to_string(), Quantity(mem.to_string()));
limits
};
let mut pod = Pod::default();
pod.metadata.name = Some("multi-pod".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.spec = Some(PodSpec {
containers: vec![
Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
resources: Some(ResourceRequirements {
limits: Some(make_limits("500m", "256Mi")),
..Default::default()
}),
..Default::default()
},
Container {
name: "sidecar".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
resources: Some(ResourceRequirements {
limits: Some(make_limits("500m", "256Mi")),
..Default::default()
}),
..Default::default()
},
],
..Default::default()
});
let zone_config = controller.pod_to_zone_config(&pod).unwrap();
// 500m + 500m = 1000m = 1.00
assert_eq!(zone_config.cpu_cap, Some("1.00".to_string()));
// 256Mi + 256Mi = 512Mi
assert_eq!(zone_config.memory_cap, Some("512M".to_string()));
}
#[test]
fn test_pod_to_zone_config_no_resources() {
let (controller, _dir) = make_test_controller();
let mut pod = Pod::default();
pod.metadata.name = Some("bare-pod".to_string());
pod.metadata.namespace = Some("default".to_string());
pod.spec = Some(PodSpec {
containers: vec![Container {
name: "web".to_string(),
command: Some(vec!["/bin/sh".to_string()]),
..Default::default()
}],
..Default::default()
});
let zone_config = controller.pod_to_zone_config(&pod).unwrap();
assert_eq!(zone_config.cpu_cap, None);
assert_eq!(zone_config.memory_cap, None);
}
} }

View file

@ -1,5 +1,5 @@
pub use reddwarf_core::ResourceQuantities;
use reddwarf_core::{Node, Pod}; use reddwarf_core::{Node, Pod};
use std::collections::HashMap;
/// Scheduling context containing pod and available nodes /// Scheduling context containing pod and available nodes
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -64,112 +64,10 @@ impl ScoreResult {
} }
} }
/// Resource quantities for nodes
#[derive(Debug, Clone, Default)]
pub struct ResourceQuantities {
/// CPU in millicores (1000 = 1 core)
pub cpu_millicores: i64,
/// Memory in bytes
pub memory_bytes: i64,
}
impl ResourceQuantities {
/// Parse CPU string (e.g., "2", "1000m", "0.5")
pub fn parse_cpu(s: &str) -> Result<i64, String> {
if let Some(m) = s.strip_suffix('m') {
// Millicores
m.parse::<i64>()
.map_err(|e| format!("Invalid CPU millicore value: {}", e))
} else if let Ok(cores) = s.parse::<f64>() {
// Cores as float
Ok((cores * 1000.0) as i64)
} else {
Err(format!("Invalid CPU format: {}", s))
}
}
/// Parse memory string (e.g., "128Mi", "1Gi", "1024")
pub fn parse_memory(s: &str) -> Result<i64, String> {
if let Some(num) = s.strip_suffix("Ki") {
Ok(num.parse::<i64>().map_err(|e| e.to_string())? * 1024)
} else if let Some(num) = s.strip_suffix("Mi") {
Ok(num.parse::<i64>().map_err(|e| e.to_string())? * 1024 * 1024)
} else if let Some(num) = s.strip_suffix("Gi") {
Ok(num.parse::<i64>().map_err(|e| e.to_string())? * 1024 * 1024 * 1024)
} else {
// Plain bytes
s.parse::<i64>().map_err(|e| e.to_string())
}
}
/// Get CPU and memory from a resource map (k8s-openapi format)
pub fn from_k8s_resource_map(
resources: &std::collections::BTreeMap<
String,
k8s_openapi::apimachinery::pkg::api::resource::Quantity,
>,
) -> Self {
let cpu_millicores = resources
.get("cpu")
.and_then(|q| Self::parse_cpu(&q.0).ok())
.unwrap_or(0);
let memory_bytes = resources
.get("memory")
.and_then(|q| Self::parse_memory(&q.0).ok())
.unwrap_or(0);
Self {
cpu_millicores,
memory_bytes,
}
}
/// Get CPU and memory from a resource map (test format)
pub fn from_resource_map(resources: &HashMap<String, String>) -> Self {
let cpu_millicores = resources
.get("cpu")
.and_then(|s| Self::parse_cpu(s).ok())
.unwrap_or(0);
let memory_bytes = resources
.get("memory")
.and_then(|s| Self::parse_memory(s).ok())
.unwrap_or(0);
Self {
cpu_millicores,
memory_bytes,
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
#[test]
fn test_parse_cpu() {
assert_eq!(ResourceQuantities::parse_cpu("1").unwrap(), 1000);
assert_eq!(ResourceQuantities::parse_cpu("0.5").unwrap(), 500);
assert_eq!(ResourceQuantities::parse_cpu("100m").unwrap(), 100);
assert_eq!(ResourceQuantities::parse_cpu("2").unwrap(), 2000);
}
#[test]
fn test_parse_memory() {
assert_eq!(ResourceQuantities::parse_memory("1024").unwrap(), 1024);
assert_eq!(ResourceQuantities::parse_memory("1Ki").unwrap(), 1024);
assert_eq!(
ResourceQuantities::parse_memory("128Mi").unwrap(),
128 * 1024 * 1024
);
assert_eq!(
ResourceQuantities::parse_memory("1Gi").unwrap(),
1024 * 1024 * 1024
);
}
#[test] #[test]
fn test_filter_result() { fn test_filter_result() {
let pass = FilterResult::pass("node1".to_string()); let pass = FilterResult::pass("node1".to_string());