diff --git a/AUDIT.md b/AUDIT.md index 9c179fa..a280250 100644 --- a/AUDIT.md +++ b/AUDIT.md @@ -16,7 +16,7 @@ | Memory limits to capped-memory | DONE | Aggregates across containers, illumos G/M/K suffixes | | Network to Crossbow VNIC | DONE | `dladm create-etherstub`, `create-vnic`, per-pod VNIC+IP | | Volumes to ZFS datasets | DONE | Create, destroy, clone, quota, snapshot support | -| Image pull / clone | PARTIAL | ZFS clone works; LX tarball `-s` works. Missing: no image pull/registry, no `.zar` archive, no golden image bootstrap | +| Image pull / clone | DONE | ZFS clone works; LX tarball `-s` works. `ImageCatalog` backed by KVStore with register/resolve/list/delete. CLI `image import`/`list`/`delete` subcommands. Controller resolves container `image` field to local path (tarball → `lx_image_path`, ZFS snapshot → `clone_from`). Missing: remote registry pull, `.zar` archive format | | Health probes (zlogin) | DONE | exec-in-zone via `zlogin`, liveness/readiness/startup probes with exec/HTTP/TCP actions, probe tracker state machine integrated into reconcile loop. v1 limitation: probes run at reconcile cadence, not per-probe `periodSeconds` | ## 2. Reconciliation / Controller Loop @@ -72,7 +72,7 @@ |---|---|---| | Versioned bind_pod() | DONE | Fixed in `c50ecb2` — creates versioned commits | | Zone brand constraints | DONE | `ZoneBrandMatch` filter checks `reddwarf.io/zone-brand` annotation vs `reddwarf.io/zone-brands` node label. Done in `4c7f50a` | -| Actual resource usage | NOT DONE | Only compares requests vs static allocatable — no runtime metrics | +| Actual resource usage | DONE | Scheduler computes per-node allocated resources by summing requests of all scheduled pods. Filters and scorers subtract used from allocatable. Pod count limits enforced. Allocated resources updated within each scheduling cycle for multi-pod batches | --- @@ -90,7 +90,7 @@ ### Medium (limits functionality) - [x] Service networking — ClusterIP IPAM, ServiceController endpoint tracking, ipnat NAT rules, embedded DNS server - [x] Health probes — exec/HTTP/TCP liveness/readiness/startup probes via zlogin -- [ ] Image management — no pull/registry, no `.zar` support, no golden image bootstrap +- [x] Image management — local catalog with register/resolve/list/delete, CLI `image import/list/delete`, controller image resolution - [x] Dynamic node resources — done in `d3eb0b2` ### Low (nice to have) diff --git a/Cargo.lock b/Cargo.lock index 7e47eb4..3659446 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1468,6 +1468,7 @@ dependencies = [ name = "reddwarf" version = "0.1.0" dependencies = [ + "chrono", "clap", "miette", "reddwarf-apiserver", diff --git a/crates/reddwarf-runtime/src/controller.rs b/crates/reddwarf-runtime/src/controller.rs index d72ae47..8a24ac3 100644 --- a/crates/reddwarf-runtime/src/controller.rs +++ b/crates/reddwarf-runtime/src/controller.rs @@ -1,5 +1,6 @@ use crate::api_client::ApiClient; use crate::error::{Result, RuntimeError}; +use crate::image::{ImageCatalog, ImageType}; use crate::network::{vnic_name_for_pod, Ipam}; use crate::probes::executor::ProbeExecutor; use crate::probes::tracker::ProbeTracker; @@ -42,6 +43,7 @@ pub struct PodController { config: PodControllerConfig, ipam: Ipam, probe_tracker: Mutex, + image_catalog: Option>, } impl PodController { @@ -61,6 +63,29 @@ impl PodController { config, ipam, probe_tracker, + image_catalog: None, + } + } + + /// Create a pod controller with an image catalog for resolving container images + pub fn with_image_catalog( + runtime: Arc, + api_client: Arc, + event_tx: broadcast::Sender, + config: PodControllerConfig, + ipam: Ipam, + image_catalog: Arc, + ) -> Self { + let probe_executor = ProbeExecutor::new(Arc::clone(&runtime)); + let probe_tracker = Mutex::new(ProbeTracker::new(probe_executor)); + Self { + runtime, + api_client, + event_tx, + config, + ipam, + probe_tracker, + image_catalog: Some(image_catalog), } } @@ -765,13 +790,42 @@ impl PodController { }) .unwrap_or_else(|| self.config.default_brand.clone()); + // Resolve container image to local path via image catalog + let (mut lx_image_path, mut storage_opts) = (None, ZoneStorageOpts::default()); + if let Some(ref catalog) = self.image_catalog { + if let Some(first_container) = spec.containers.first() { + let image_ref = first_container.image.as_deref().unwrap_or(""); + if !image_ref.is_empty() { + match catalog.resolve(image_ref) { + Ok(Some(entry)) => { + debug!("Resolved image '{}' to local path '{}'", image_ref, entry.path); + match entry.image_type { + ImageType::Tarball => { + lx_image_path = Some(entry.path); + } + ImageType::ZfsSnapshot => { + storage_opts.clone_from = Some(entry.path); + } + } + } + Ok(None) => { + warn!("Image '{}' not found in local catalog", image_ref); + } + Err(e) => { + warn!("Failed to resolve image '{}': {}", image_ref, e); + } + } + } + } + } + Ok(ZoneConfig { zone_name, brand, zonepath, network, - storage: ZoneStorageOpts::default(), - lx_image_path: None, + storage: storage_opts, + lx_image_path, bhyve_disk_image: None, processes, cpu_cap, diff --git a/crates/reddwarf-runtime/src/image/catalog.rs b/crates/reddwarf-runtime/src/image/catalog.rs new file mode 100644 index 0000000..3efd826 --- /dev/null +++ b/crates/reddwarf-runtime/src/image/catalog.rs @@ -0,0 +1,264 @@ +use crate::error::{Result, RuntimeError}; +use reddwarf_storage::KVStore; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tracing::debug; + +/// Type of image stored locally +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ImageType { + /// A tarball archive (e.g., .tar.gz for LX brand zones) + Tarball, + /// A ZFS snapshot that can be cloned for fast provisioning + ZfsSnapshot, +} + +/// A locally registered image entry +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ImageEntry { + /// Image name (e.g., "ubuntu", "alpine") + pub name: String, + /// Image tag (e.g., "latest", "22.04") + pub tag: String, + /// Type of the image + pub image_type: ImageType, + /// Local path to the image (tarball path or ZFS snapshot name) + pub path: String, + /// When the image was registered (RFC 3339) + pub created_at: String, +} + +impl ImageEntry { + /// The canonical reference string for this image (e.g., "ubuntu:latest") + pub fn reference(&self) -> String { + format!("{}:{}", self.name, self.tag) + } +} + +/// Image catalog backed by a KVStore +/// +/// Storage keys: +/// - `images/{name}:{tag}` → JSON-serialized `ImageEntry` +pub struct ImageCatalog { + storage: Arc, + prefix: Vec, +} + +impl ImageCatalog { + /// Create a new image catalog + pub fn new(storage: Arc) -> Self { + Self { + storage, + prefix: b"images/".to_vec(), + } + } + + /// Register an image in the catalog. Overwrites if already exists. + pub fn register(&self, entry: ImageEntry) -> Result<()> { + let key = self.entry_key(&entry.name, &entry.tag); + let data = serde_json::to_vec(&entry).map_err(|e| { + RuntimeError::internal_error(format!("Failed to serialize image entry: {}", e)) + })?; + self.storage.put(key.as_bytes(), &data)?; + debug!("Registered image {}:{} at {}", entry.name, entry.tag, entry.path); + Ok(()) + } + + /// Look up an image by name and tag + pub fn get(&self, name: &str, tag: &str) -> Result> { + let key = self.entry_key(name, tag); + match self.storage.get(key.as_bytes())? { + Some(data) => { + let entry: ImageEntry = serde_json::from_slice(&data).map_err(|e| { + RuntimeError::internal_error(format!( + "Failed to deserialize image entry: {}", + e + )) + })?; + Ok(Some(entry)) + } + None => Ok(None), + } + } + + /// Resolve an image reference string (e.g., "ubuntu:22.04" or "ubuntu") to an entry. + /// If no tag is specified, defaults to "latest". + pub fn resolve(&self, image_ref: &str) -> Result> { + let (name, tag) = parse_image_ref(image_ref); + self.get(name, tag) + } + + /// List all registered images + pub fn list(&self) -> Result> { + let entries = self.storage.scan(&self.prefix)?; + let mut images = Vec::new(); + for (_key, data) in &entries { + match serde_json::from_slice::(data) { + Ok(entry) => images.push(entry), + Err(e) => { + debug!("Skipping malformed image entry: {}", e); + } + } + } + images.sort_by(|a, b| a.reference().cmp(&b.reference())); + Ok(images) + } + + /// Remove an image from the catalog + pub fn delete(&self, name: &str, tag: &str) -> Result { + let key = self.entry_key(name, tag); + match self.storage.get(key.as_bytes())? { + Some(_) => { + self.storage.delete(key.as_bytes())?; + debug!("Deleted image {}:{}", name, tag); + Ok(true) + } + None => Ok(false), + } + } + + fn entry_key(&self, name: &str, tag: &str) -> String { + format!("images/{}:{}", name, tag) + } +} + +/// Parse an image reference like "ubuntu:22.04" or "ubuntu" into (name, tag). +/// Defaults tag to "latest" if not specified. +fn parse_image_ref(image_ref: &str) -> (&str, &str) { + match image_ref.rsplit_once(':') { + Some((name, tag)) if !name.is_empty() && !tag.is_empty() => (name, tag), + _ => (image_ref, "latest"), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use reddwarf_storage::RedbBackend; + use tempfile::tempdir; + + fn make_catalog() -> (ImageCatalog, tempfile::TempDir) { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test-images.redb"); + let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); + (ImageCatalog::new(storage), dir) + } + + fn make_entry(name: &str, tag: &str, image_type: ImageType, path: &str) -> ImageEntry { + ImageEntry { + name: name.to_string(), + tag: tag.to_string(), + image_type, + path: path.to_string(), + created_at: "2026-03-19T00:00:00Z".to_string(), + } + } + + #[test] + fn test_register_and_get() { + let (catalog, _dir) = make_catalog(); + let entry = make_entry("ubuntu", "22.04", ImageType::Tarball, "/images/ubuntu.tar.gz"); + catalog.register(entry.clone()).unwrap(); + + let found = catalog.get("ubuntu", "22.04").unwrap().unwrap(); + assert_eq!(found.name, "ubuntu"); + assert_eq!(found.tag, "22.04"); + assert_eq!(found.image_type, ImageType::Tarball); + assert_eq!(found.path, "/images/ubuntu.tar.gz"); + } + + #[test] + fn test_get_nonexistent() { + let (catalog, _dir) = make_catalog(); + let found = catalog.get("nonexistent", "latest").unwrap(); + assert!(found.is_none()); + } + + #[test] + fn test_resolve_with_tag() { + let (catalog, _dir) = make_catalog(); + let entry = make_entry("alpine", "3.18", ImageType::ZfsSnapshot, "rpool/images/alpine@base"); + catalog.register(entry).unwrap(); + + let found = catalog.resolve("alpine:3.18").unwrap().unwrap(); + assert_eq!(found.name, "alpine"); + assert_eq!(found.tag, "3.18"); + } + + #[test] + fn test_resolve_without_tag_defaults_to_latest() { + let (catalog, _dir) = make_catalog(); + let entry = make_entry("alpine", "latest", ImageType::Tarball, "/images/alpine.tar.gz"); + catalog.register(entry).unwrap(); + + let found = catalog.resolve("alpine").unwrap().unwrap(); + assert_eq!(found.tag, "latest"); + } + + #[test] + fn test_list() { + let (catalog, _dir) = make_catalog(); + catalog + .register(make_entry("ubuntu", "22.04", ImageType::Tarball, "/a")) + .unwrap(); + catalog + .register(make_entry("alpine", "3.18", ImageType::ZfsSnapshot, "/b")) + .unwrap(); + catalog + .register(make_entry("alpine", "latest", ImageType::Tarball, "/c")) + .unwrap(); + + let images = catalog.list().unwrap(); + assert_eq!(images.len(), 3); + // Sorted by reference + assert_eq!(images[0].reference(), "alpine:3.18"); + assert_eq!(images[1].reference(), "alpine:latest"); + assert_eq!(images[2].reference(), "ubuntu:22.04"); + } + + #[test] + fn test_delete() { + let (catalog, _dir) = make_catalog(); + catalog + .register(make_entry("ubuntu", "22.04", ImageType::Tarball, "/a")) + .unwrap(); + + assert!(catalog.delete("ubuntu", "22.04").unwrap()); + assert!(catalog.get("ubuntu", "22.04").unwrap().is_none()); + } + + #[test] + fn test_delete_nonexistent() { + let (catalog, _dir) = make_catalog(); + assert!(!catalog.delete("nonexistent", "latest").unwrap()); + } + + #[test] + fn test_register_overwrites() { + let (catalog, _dir) = make_catalog(); + catalog + .register(make_entry("ubuntu", "22.04", ImageType::Tarball, "/old")) + .unwrap(); + catalog + .register(make_entry("ubuntu", "22.04", ImageType::ZfsSnapshot, "/new")) + .unwrap(); + + let found = catalog.get("ubuntu", "22.04").unwrap().unwrap(); + assert_eq!(found.path, "/new"); + assert_eq!(found.image_type, ImageType::ZfsSnapshot); + } + + #[test] + fn test_parse_image_ref() { + assert_eq!(parse_image_ref("ubuntu:22.04"), ("ubuntu", "22.04")); + assert_eq!(parse_image_ref("ubuntu"), ("ubuntu", "latest")); + assert_eq!(parse_image_ref("my-registry.io/ubuntu:v1"), ("my-registry.io/ubuntu", "v1")); + assert_eq!(parse_image_ref(""), ("", "latest")); + } + + #[test] + fn test_image_entry_reference() { + let entry = make_entry("ubuntu", "22.04", ImageType::Tarball, "/a"); + assert_eq!(entry.reference(), "ubuntu:22.04"); + } +} diff --git a/crates/reddwarf-runtime/src/image/mod.rs b/crates/reddwarf-runtime/src/image/mod.rs new file mode 100644 index 0000000..a364bc4 --- /dev/null +++ b/crates/reddwarf-runtime/src/image/mod.rs @@ -0,0 +1,3 @@ +pub mod catalog; + +pub use catalog::{ImageCatalog, ImageEntry, ImageType}; diff --git a/crates/reddwarf-runtime/src/lib.rs b/crates/reddwarf-runtime/src/lib.rs index 582736c..8d7bd3d 100644 --- a/crates/reddwarf-runtime/src/lib.rs +++ b/crates/reddwarf-runtime/src/lib.rs @@ -7,6 +7,7 @@ pub mod command; pub mod controller; pub mod dns; pub mod error; +pub mod image; #[cfg(target_os = "illumos")] pub mod illumos; pub mod mock; @@ -38,6 +39,9 @@ pub use types::{ pub use storage::ZfsStorageEngine; pub use storage::{MockStorageEngine, StorageEngine, VolumeInfo}; +// Re-export image types +pub use image::{ImageCatalog, ImageEntry, ImageType}; + // Re-export controller and agent types pub use api_client::ApiClient; pub use controller::{PodController, PodControllerConfig}; diff --git a/crates/reddwarf-scheduler/src/filter.rs b/crates/reddwarf-scheduler/src/filter.rs index 70eebae..31f2f1a 100644 --- a/crates/reddwarf-scheduler/src/filter.rs +++ b/crates/reddwarf-scheduler/src/filter.rs @@ -33,6 +33,11 @@ impl FilterPredicate for PodFitsResources { let node_resources = ResourceQuantities::from_k8s_resource_map(&allocatable); + // Subtract resources already allocated to pods on this node + let already_used = context.node_allocated(&node_name); + let available_cpu = node_resources.cpu_millicores - already_used.cpu_millicores; + let available_memory = node_resources.memory_bytes - already_used.memory_bytes; + // Get pod requested resources let pod_spec = match &context.pod.spec { Some(spec) => spec, @@ -59,31 +64,54 @@ impl FilterPredicate for PodFitsResources { } debug!( - "Node {} has CPU: {} milli, Memory: {} bytes", - node_name, node_resources.cpu_millicores, node_resources.memory_bytes + "Node {} has CPU: {} milli (used: {}), Memory: {} bytes (used: {})", + node_name, + node_resources.cpu_millicores, + already_used.cpu_millicores, + node_resources.memory_bytes, + already_used.memory_bytes ); debug!( "Pod requests CPU: {} milli, Memory: {} bytes", total_cpu, total_memory ); - // Check if node has enough resources - if total_cpu > node_resources.cpu_millicores { + // Check if node has enough remaining resources + if total_cpu > available_cpu { return FilterResult::fail( node_name, format!( - "Insufficient CPU: requested {} milli, available {} milli", - total_cpu, node_resources.cpu_millicores + "Insufficient CPU: requested {} milli, available {} milli (allocatable {} - used {})", + total_cpu, available_cpu, node_resources.cpu_millicores, already_used.cpu_millicores ), ); } - if total_memory > node_resources.memory_bytes { + if total_memory > available_memory { return FilterResult::fail( node_name, format!( - "Insufficient memory: requested {} bytes, available {} bytes", - total_memory, node_resources.memory_bytes + "Insufficient memory: requested {} bytes, available {} bytes (allocatable {} - used {})", + total_memory, available_memory, node_resources.memory_bytes, already_used.memory_bytes + ), + ); + } + + // Check pod count limit + let max_pods = node + .status + .as_ref() + .and_then(|s| s.allocatable.as_ref()) + .and_then(|a| a.get("pods")) + .and_then(|q| q.0.parse::().ok()) + .unwrap_or(110); + + if already_used.pod_count >= max_pods { + return FilterResult::fail( + node_name, + format!( + "Maximum pod count reached: {} pods (limit {})", + already_used.pod_count, max_pods ), ); } diff --git a/crates/reddwarf-scheduler/src/lib.rs b/crates/reddwarf-scheduler/src/lib.rs index 51ae750..2bc4208 100644 --- a/crates/reddwarf-scheduler/src/lib.rs +++ b/crates/reddwarf-scheduler/src/lib.rs @@ -15,4 +15,4 @@ pub mod types; // Re-export commonly used types pub use error::{Result, SchedulerError}; pub use scheduler::Scheduler; -pub use types::{FilterResult, SchedulingContext, ScoreResult}; +pub use types::{FilterResult, NodeAllocatedResources, SchedulingContext, ScoreResult}; diff --git a/crates/reddwarf-scheduler/src/scheduler.rs b/crates/reddwarf-scheduler/src/scheduler.rs index cc44ab6..b7284f3 100644 --- a/crates/reddwarf-scheduler/src/scheduler.rs +++ b/crates/reddwarf-scheduler/src/scheduler.rs @@ -1,10 +1,11 @@ use crate::filter::{default_filters, FilterPredicate}; use crate::score::{calculate_weighted_score, default_scores, ScoreFunction}; -use crate::types::SchedulingContext; +use crate::types::{NodeAllocatedResources, SchedulingContext}; use crate::{Result, SchedulerError}; -use reddwarf_core::{Node, Pod, ResourceEvent}; +use reddwarf_core::{Node, Pod, ResourceEvent, ResourceQuantities}; use reddwarf_storage::{KVStore, KeyEncoder, RedbBackend}; use reddwarf_versioning::{Change, CommitBuilder, VersionStore}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; @@ -98,6 +99,9 @@ impl Scheduler { info!("Found {} available nodes", nodes.len()); + // Compute resources already allocated on each node from scheduled pods + let mut allocated = self.compute_allocated_resources().await?; + // Schedule each pod for pod in unscheduled_pods { let pod_name = pod @@ -107,9 +111,34 @@ impl Scheduler { .unwrap_or(&"unknown".to_string()) .clone(); - match self.schedule_pod(pod, &nodes).await { + match self + .schedule_pod(pod.clone(), &nodes, &allocated) + .await + { Ok(node_name) => { info!("Scheduled pod {} to node {}", pod_name, node_name); + // Update allocated resources so the next pod in this cycle + // sees an accurate picture + let entry = allocated.entry(node_name).or_default(); + entry.pod_count += 1; + if let Some(spec) = &pod.spec { + for container in &spec.containers { + if let Some(resources) = &container.resources { + if let Some(requests) = &resources.requests { + entry.cpu_millicores += requests + .get("cpu") + .and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok()) + .unwrap_or(0); + entry.memory_bytes += requests + .get("memory") + .and_then(|s| { + ResourceQuantities::parse_memory(&s.0).ok() + }) + .unwrap_or(0); + } + } + } + } } Err(e) => { error!("Failed to schedule pod {}: {}", pod_name, e); @@ -160,8 +189,63 @@ impl Scheduler { Ok(nodes) } + /// Compute per-node allocated resources by summing requests of all scheduled pods + async fn compute_allocated_resources( + &self, + ) -> Result> { + let prefix = KeyEncoder::encode_prefix("v1", "Pod", None); + let results = self.storage.as_ref().scan(prefix.as_bytes())?; + + let mut allocated: HashMap = HashMap::new(); + + for (_key, data) in results.iter() { + let pod: Pod = match serde_json::from_slice(data) { + Ok(p) => p, + Err(_) => continue, + }; + + // Only count pods that are already scheduled to a node + let node_name = match pod.spec.as_ref().and_then(|s| s.node_name.as_ref()) { + Some(n) => n.clone(), + None => continue, + }; + + let entry = allocated.entry(node_name).or_default(); + entry.pod_count += 1; + + if let Some(spec) = &pod.spec { + for container in &spec.containers { + if let Some(resources) = &container.resources { + if let Some(requests) = &resources.requests { + entry.cpu_millicores += requests + .get("cpu") + .and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok()) + .unwrap_or(0); + entry.memory_bytes += requests + .get("memory") + .and_then(|s| ResourceQuantities::parse_memory(&s.0).ok()) + .unwrap_or(0); + } + } + } + } + } + + debug!( + "Computed allocated resources for {} nodes", + allocated.len() + ); + + Ok(allocated) + } + /// Schedule a single pod - async fn schedule_pod(&self, mut pod: Pod, nodes: &[Node]) -> Result { + async fn schedule_pod( + &self, + mut pod: Pod, + nodes: &[Node], + allocated: &HashMap, + ) -> Result { let pod_name = pod .metadata .name @@ -169,7 +253,8 @@ impl Scheduler { .ok_or_else(|| SchedulerError::internal_error("Pod has no name"))? .clone(); - let context = SchedulingContext::new(pod.clone(), nodes.to_vec()); + let context = + SchedulingContext::with_allocated(pod.clone(), nodes.to_vec(), allocated.clone()); // Phase 1: Filter nodes let mut feasible_nodes = Vec::new(); @@ -354,6 +439,7 @@ impl Scheduler { #[cfg(test)] mod tests { use super::*; + use crate::types::NodeAllocatedResources; use reddwarf_core::WatchEventType; use reddwarf_storage::RedbBackend; use reddwarf_versioning::VersionStore; @@ -468,7 +554,8 @@ mod tests { store_pod(&scheduler, &pod); // Schedule pod - let result = scheduler.schedule_pod(pod, &nodes).await; + let allocated = HashMap::new(); + let result = scheduler.schedule_pod(pod, &nodes, &allocated).await; assert!(result.is_ok()); let node_name = result.unwrap(); @@ -486,11 +573,43 @@ mod tests { let pod = create_test_pod("test-pod", "default", "2", "2Gi"); // Schedule pod should fail - let result = scheduler.schedule_pod(pod, &nodes).await; + let allocated = HashMap::new(); + let result = scheduler.schedule_pod(pod, &nodes, &allocated).await; assert!(result.is_err()); } + #[tokio::test] + async fn test_schedule_pod_respects_allocated_resources() { + let (scheduler, _rx) = create_test_scheduler(); + + // Node with 4 CPU, 8Gi memory + let nodes = vec![ + create_test_node("node1", "4", "8Gi"), + create_test_node("node2", "4", "8Gi"), + ]; + + // node1 already has 3.5 CPU allocated, node2 is empty + let mut allocated = HashMap::new(); + allocated.insert( + "node1".to_string(), + NodeAllocatedResources { + cpu_millicores: 3500, + memory_bytes: 4 * 1024 * 1024 * 1024, + pod_count: 3, + }, + ); + + let pod = create_test_pod("test-pod", "default", "1", "1Gi"); + store_pod(&scheduler, &pod); + + // node1 only has 500m free, pod needs 1000m → node1 should be filtered out + // Pod should land on node2 + let result = scheduler.schedule_pod(pod, &nodes, &allocated).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "node2"); + } + #[tokio::test] async fn test_bind_pod_publishes_modified_event() { let (scheduler, mut rx) = create_test_scheduler(); diff --git a/crates/reddwarf-scheduler/src/score.rs b/crates/reddwarf-scheduler/src/score.rs index aa7f3fd..a9d544b 100644 --- a/crates/reddwarf-scheduler/src/score.rs +++ b/crates/reddwarf-scheduler/src/score.rs @@ -38,6 +38,9 @@ impl ScoreFunction for LeastAllocated { return ScoreResult::new(node_name, 0); } + // Get already-allocated resources on this node + let already_used = context.node_allocated(&node_name); + // Get pod requested resources let pod_spec = match &context.pod.spec { Some(spec) => spec, @@ -63,23 +66,19 @@ impl ScoreFunction for LeastAllocated { } } - // Calculate remaining resources after scheduling this pod - let remaining_cpu = node_resources.cpu_millicores - total_cpu; - let remaining_memory = node_resources.memory_bytes - total_memory; + // Total usage after scheduling = already used + this pod's requests + let used_cpu = already_used.cpu_millicores + total_cpu; + let used_memory = already_used.memory_bytes + total_memory; // Calculate utilization percentage for each resource let cpu_utilization = if node_resources.cpu_millicores > 0 { - ((node_resources.cpu_millicores - remaining_cpu) as f64 - / node_resources.cpu_millicores as f64) - * 100.0 + (used_cpu as f64 / node_resources.cpu_millicores as f64) * 100.0 } else { 100.0 }; let memory_utilization = if node_resources.memory_bytes > 0 { - ((node_resources.memory_bytes - remaining_memory) as f64 - / node_resources.memory_bytes as f64) - * 100.0 + (used_memory as f64 / node_resources.memory_bytes as f64) * 100.0 } else { 100.0 }; @@ -90,8 +89,9 @@ impl ScoreFunction for LeastAllocated { let score = (100.0 - avg_utilization).clamp(0.0, 100.0) as i32; debug!( - "Node {} score: {} (CPU util: {:.1}%, Memory util: {:.1}%)", - node_name, score, cpu_utilization, memory_utilization + "Node {} score: {} (CPU util: {:.1}%, Memory util: {:.1}%, already used: {}m CPU, {} mem)", + node_name, score, cpu_utilization, memory_utilization, + already_used.cpu_millicores, already_used.memory_bytes ); ScoreResult::new(node_name, score) @@ -128,6 +128,9 @@ impl ScoreFunction for BalancedAllocation { return ScoreResult::new(node_name, 0); } + // Get already-allocated resources on this node + let already_used = context.node_allocated(&node_name); + // Get pod requested resources let pod_spec = match &context.pod.spec { Some(spec) => spec, @@ -153,9 +156,12 @@ impl ScoreFunction for BalancedAllocation { } } - // Calculate utilization after scheduling - let cpu_fraction = total_cpu as f64 / node_resources.cpu_millicores as f64; - let memory_fraction = total_memory as f64 / node_resources.memory_bytes as f64; + // Total utilization after scheduling = already used + this pod + let used_cpu = already_used.cpu_millicores + total_cpu; + let used_memory = already_used.memory_bytes + total_memory; + + let cpu_fraction = used_cpu as f64 / node_resources.cpu_millicores as f64; + let memory_fraction = used_memory as f64 / node_resources.memory_bytes as f64; // Prefer balanced resource usage (CPU and memory usage should be similar) let variance = (cpu_fraction - memory_fraction).abs(); diff --git a/crates/reddwarf-scheduler/src/types.rs b/crates/reddwarf-scheduler/src/types.rs index 1c1c6eb..80fa1e0 100644 --- a/crates/reddwarf-scheduler/src/types.rs +++ b/crates/reddwarf-scheduler/src/types.rs @@ -1,5 +1,17 @@ pub use reddwarf_core::ResourceQuantities; use reddwarf_core::{Node, Pod}; +use std::collections::HashMap; + +/// Per-node resource usage from already-scheduled pods +#[derive(Debug, Clone, Default)] +pub struct NodeAllocatedResources { + /// Total CPU millicores requested by pods on this node + pub cpu_millicores: i64, + /// Total memory bytes requested by pods on this node + pub memory_bytes: i64, + /// Number of pods currently scheduled on this node + pub pod_count: u32, +} /// Scheduling context containing pod and available nodes #[derive(Debug, Clone)] @@ -8,12 +20,41 @@ pub struct SchedulingContext { pub pod: Pod, /// Available nodes pub nodes: Vec, + /// Resources already allocated on each node (by node name) + pub allocated: HashMap, } impl SchedulingContext { /// Create a new scheduling context pub fn new(pod: Pod, nodes: Vec) -> Self { - Self { pod, nodes } + Self { + pod, + nodes, + allocated: HashMap::new(), + } + } + + /// Create a scheduling context with pre-computed allocated resources + pub fn with_allocated( + pod: Pod, + nodes: Vec, + allocated: HashMap, + ) -> Self { + Self { + pod, + nodes, + allocated, + } + } + + /// Get the allocated resources for a node, defaulting to zero + pub fn node_allocated(&self, node_name: &str) -> &NodeAllocatedResources { + static DEFAULT: NodeAllocatedResources = NodeAllocatedResources { + cpu_millicores: 0, + memory_bytes: 0, + pod_count: 0, + }; + self.allocated.get(node_name).unwrap_or(&DEFAULT) } } diff --git a/crates/reddwarf/Cargo.toml b/crates/reddwarf/Cargo.toml index 248eaab..6c15405 100644 --- a/crates/reddwarf/Cargo.toml +++ b/crates/reddwarf/Cargo.toml @@ -24,3 +24,4 @@ clap = { workspace = true } miette = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } +chrono = { workspace = true } diff --git a/crates/reddwarf/src/main.rs b/crates/reddwarf/src/main.rs index c046d2b..b6c24a4 100644 --- a/crates/reddwarf/src/main.rs +++ b/crates/reddwarf/src/main.rs @@ -2,10 +2,10 @@ use clap::{Parser, Subcommand}; use reddwarf_apiserver::{ApiError, ApiServer, AppState, Config as ApiConfig, TlsMode}; use reddwarf_core::{Namespace, ResourceQuantities}; use reddwarf_runtime::{ - ApiClient, DnsServer, DnsServerConfig, Ipam, MockRuntime, MockStorageEngine, NatManager, - NodeAgent, NodeAgentConfig, NodeHealthChecker, NodeHealthCheckerConfig, PodController, - PodControllerConfig, ServiceController, ServiceControllerConfig, StorageEngine, - StoragePoolConfig, ZoneBrand, + ApiClient, DnsServer, DnsServerConfig, ImageCatalog, ImageEntry, ImageType, Ipam, MockRuntime, + MockStorageEngine, NatManager, NodeAgent, NodeAgentConfig, NodeHealthChecker, + NodeHealthCheckerConfig, PodController, PodControllerConfig, ServiceController, + ServiceControllerConfig, StorageEngine, StoragePoolConfig, ZoneBrand, }; use reddwarf_scheduler::scheduler::SchedulerConfig; use reddwarf_scheduler::Scheduler; @@ -40,6 +40,35 @@ struct TlsArgs { tls_key: Option, } +#[derive(Subcommand)] +enum ImageCommands { + /// Import a local tarball or ZFS snapshot as an image + Import { + /// Image name (e.g., "ubuntu") + #[arg(long)] + name: String, + /// Image tag (e.g., "22.04") + #[arg(long, default_value = "latest")] + tag: String, + /// Image type: "tarball" or "zfs-snapshot" + #[arg(long, default_value = "tarball")] + image_type: String, + /// Path to the tarball file or ZFS snapshot name + path: String, + }, + /// List all registered images + List, + /// Remove an image from the catalog + Delete { + /// Image name + #[arg(long)] + name: String, + /// Image tag + #[arg(long, default_value = "latest")] + tag: String, + }, +} + #[derive(Subcommand)] #[allow(clippy::large_enum_variant)] enum Commands { @@ -107,6 +136,14 @@ enum Commands { #[command(flatten)] tls_args: TlsArgs, }, + /// Manage local container images + Image { + /// Path to the redb database file + #[arg(long, default_value = "./reddwarf.redb")] + data_dir: String, + #[command(subcommand)] + action: ImageCommands, + }, } #[tokio::main] @@ -127,6 +164,7 @@ async fn main() -> miette::Result<()> { data_dir, tls_args, } => run_serve(&bind, &data_dir, &tls_args).await, + Commands::Image { data_dir, action } => run_image_command(&data_dir, action), Commands::Agent { node_name, bind, @@ -235,6 +273,84 @@ fn tls_mode_from_args(args: &TlsArgs, data_dir: &str) -> miette::Result } } +/// Handle image management subcommands +fn run_image_command(data_dir: &str, action: ImageCommands) -> miette::Result<()> { + let storage = Arc::new( + RedbBackend::new(std::path::Path::new(data_dir)) + .map_err(|e| miette::miette!("Failed to open storage at '{}': {}", data_dir, e))?, + ); + let catalog = ImageCatalog::new(storage); + + match action { + ImageCommands::Import { + name, + tag, + image_type, + path, + } => { + let image_type = match image_type.as_str() { + "tarball" => ImageType::Tarball, + "zfs-snapshot" => ImageType::ZfsSnapshot, + other => { + return Err(miette::miette!( + help = "Use 'tarball' or 'zfs-snapshot'", + "Unknown image type: '{}'", + other + )) + } + }; + + let entry = ImageEntry { + name: name.clone(), + tag: tag.clone(), + image_type, + path: path.clone(), + created_at: chrono::Utc::now().to_rfc3339(), + }; + + catalog + .register(entry) + .map_err(|e| miette::miette!("Failed to register image: {}", e))?; + info!("Registered image {}:{} from {}", name, tag, path); + } + ImageCommands::List => { + let images = catalog + .list() + .map_err(|e| miette::miette!("Failed to list images: {}", e))?; + if images.is_empty() { + println!("No images registered"); + } else { + println!("{:<30} {:<10} {:<15} {}", "NAME:TAG", "TYPE", "CREATED", "PATH"); + for img in &images { + let type_str = match img.image_type { + ImageType::Tarball => "tarball", + ImageType::ZfsSnapshot => "zfs-snapshot", + }; + println!( + "{:<30} {:<10} {:<15} {}", + img.reference(), + type_str, + &img.created_at[..10.min(img.created_at.len())], + img.path + ); + } + } + } + ImageCommands::Delete { name, tag } => { + let deleted = catalog + .delete(&name, &tag) + .map_err(|e| miette::miette!("Failed to delete image: {}", e))?; + if deleted { + info!("Deleted image {}:{}", name, tag); + } else { + return Err(miette::miette!("Image {}:{} not found", name, tag)); + } + } + } + + Ok(()) +} + /// Run only the API server async fn run_serve(bind: &str, data_dir: &str, tls_args: &TlsArgs) -> miette::Result<()> { info!("Starting reddwarf API server"); @@ -399,12 +515,16 @@ async fn run_agent( reconcile_interval: std::time::Duration::from_secs(30), }; - let controller = PodController::new( + // 5b. Create image catalog for resolving container images + let image_catalog = Arc::new(ImageCatalog::new(state.storage.clone())); + + let controller = PodController::with_image_catalog( runtime.clone(), api_client.clone(), state.event_tx.clone(), controller_config, ipam, + image_catalog, ); let controller_token = token.clone(); let controller_handle = tokio::spawn(async move {