Add image management catalog and scheduler resource-aware scheduling

Image management:
- ImageCatalog backed by KVStore with register/resolve/list/delete
- CLI `image import`, `image list`, `image delete` subcommands
- PodController resolves container image field to local path
  (tarball → lx_image_path, ZFS snapshot → clone_from)

Scheduler runtime metrics:
- compute_allocated_resources() sums requests of all scheduled pods per node
- PodFitsResources filter subtracts used resources from allocatable
- LeastAllocated/BalancedAllocation scorers account for existing load
- Pod count limits enforced against node max-pods
- Allocated resources updated within scheduling cycle for multi-pod batches

https://claude.ai/code/session_016QLFjAyYGzMPbBjEGMe75j
This commit is contained in:
Claude 2026-03-19 21:35:48 +00:00
parent d8425ad85d
commit 710d353924
No known key found for this signature in database
13 changed files with 683 additions and 42 deletions

View file

@ -16,7 +16,7 @@
| Memory limits to capped-memory | DONE | Aggregates across containers, illumos G/M/K suffixes |
| Network to Crossbow VNIC | DONE | `dladm create-etherstub`, `create-vnic`, per-pod VNIC+IP |
| Volumes to ZFS datasets | DONE | Create, destroy, clone, quota, snapshot support |
| Image pull / clone | PARTIAL | ZFS clone works; LX tarball `-s` works. Missing: no image pull/registry, no `.zar` archive, no golden image bootstrap |
| Image pull / clone | DONE | ZFS clone works; LX tarball `-s` works. `ImageCatalog` backed by KVStore with register/resolve/list/delete. CLI `image import`/`list`/`delete` subcommands. Controller resolves container `image` field to local path (tarball → `lx_image_path`, ZFS snapshot → `clone_from`). Missing: remote registry pull, `.zar` archive format |
| Health probes (zlogin) | DONE | exec-in-zone via `zlogin`, liveness/readiness/startup probes with exec/HTTP/TCP actions, probe tracker state machine integrated into reconcile loop. v1 limitation: probes run at reconcile cadence, not per-probe `periodSeconds` |
## 2. Reconciliation / Controller Loop
@ -72,7 +72,7 @@
|---|---|---|
| Versioned bind_pod() | DONE | Fixed in `c50ecb2` — creates versioned commits |
| Zone brand constraints | DONE | `ZoneBrandMatch` filter checks `reddwarf.io/zone-brand` annotation vs `reddwarf.io/zone-brands` node label. Done in `4c7f50a` |
| Actual resource usage | NOT DONE | Only compares requests vs static allocatable — no runtime metrics |
| Actual resource usage | DONE | Scheduler computes per-node allocated resources by summing requests of all scheduled pods. Filters and scorers subtract used from allocatable. Pod count limits enforced. Allocated resources updated within each scheduling cycle for multi-pod batches |
---
@ -90,7 +90,7 @@
### Medium (limits functionality)
- [x] Service networking — ClusterIP IPAM, ServiceController endpoint tracking, ipnat NAT rules, embedded DNS server
- [x] Health probes — exec/HTTP/TCP liveness/readiness/startup probes via zlogin
- [ ] Image management — no pull/registry, no `.zar` support, no golden image bootstrap
- [x] Image management — local catalog with register/resolve/list/delete, CLI `image import/list/delete`, controller image resolution
- [x] Dynamic node resources — done in `d3eb0b2`
### Low (nice to have)

1
Cargo.lock generated
View file

@ -1468,6 +1468,7 @@ dependencies = [
name = "reddwarf"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"miette",
"reddwarf-apiserver",

View file

@ -1,5 +1,6 @@
use crate::api_client::ApiClient;
use crate::error::{Result, RuntimeError};
use crate::image::{ImageCatalog, ImageType};
use crate::network::{vnic_name_for_pod, Ipam};
use crate::probes::executor::ProbeExecutor;
use crate::probes::tracker::ProbeTracker;
@ -42,6 +43,7 @@ pub struct PodController {
config: PodControllerConfig,
ipam: Ipam,
probe_tracker: Mutex<ProbeTracker>,
image_catalog: Option<Arc<ImageCatalog>>,
}
impl PodController {
@ -61,6 +63,29 @@ impl PodController {
config,
ipam,
probe_tracker,
image_catalog: None,
}
}
/// Create a pod controller with an image catalog for resolving container images
pub fn with_image_catalog(
runtime: Arc<dyn ZoneRuntime>,
api_client: Arc<ApiClient>,
event_tx: broadcast::Sender<ResourceEvent>,
config: PodControllerConfig,
ipam: Ipam,
image_catalog: Arc<ImageCatalog>,
) -> Self {
let probe_executor = ProbeExecutor::new(Arc::clone(&runtime));
let probe_tracker = Mutex::new(ProbeTracker::new(probe_executor));
Self {
runtime,
api_client,
event_tx,
config,
ipam,
probe_tracker,
image_catalog: Some(image_catalog),
}
}
@ -765,13 +790,42 @@ impl PodController {
})
.unwrap_or_else(|| self.config.default_brand.clone());
// Resolve container image to local path via image catalog
let (mut lx_image_path, mut storage_opts) = (None, ZoneStorageOpts::default());
if let Some(ref catalog) = self.image_catalog {
if let Some(first_container) = spec.containers.first() {
let image_ref = first_container.image.as_deref().unwrap_or("");
if !image_ref.is_empty() {
match catalog.resolve(image_ref) {
Ok(Some(entry)) => {
debug!("Resolved image '{}' to local path '{}'", image_ref, entry.path);
match entry.image_type {
ImageType::Tarball => {
lx_image_path = Some(entry.path);
}
ImageType::ZfsSnapshot => {
storage_opts.clone_from = Some(entry.path);
}
}
}
Ok(None) => {
warn!("Image '{}' not found in local catalog", image_ref);
}
Err(e) => {
warn!("Failed to resolve image '{}': {}", image_ref, e);
}
}
}
}
}
Ok(ZoneConfig {
zone_name,
brand,
zonepath,
network,
storage: ZoneStorageOpts::default(),
lx_image_path: None,
storage: storage_opts,
lx_image_path,
bhyve_disk_image: None,
processes,
cpu_cap,

View file

@ -0,0 +1,264 @@
use crate::error::{Result, RuntimeError};
use reddwarf_storage::KVStore;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::debug;
/// Type of image stored locally
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ImageType {
/// A tarball archive (e.g., .tar.gz for LX brand zones)
Tarball,
/// A ZFS snapshot that can be cloned for fast provisioning
ZfsSnapshot,
}
/// A locally registered image entry
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImageEntry {
/// Image name (e.g., "ubuntu", "alpine")
pub name: String,
/// Image tag (e.g., "latest", "22.04")
pub tag: String,
/// Type of the image
pub image_type: ImageType,
/// Local path to the image (tarball path or ZFS snapshot name)
pub path: String,
/// When the image was registered (RFC 3339)
pub created_at: String,
}
impl ImageEntry {
/// The canonical reference string for this image (e.g., "ubuntu:latest")
pub fn reference(&self) -> String {
format!("{}:{}", self.name, self.tag)
}
}
/// Image catalog backed by a KVStore
///
/// Storage keys:
/// - `images/{name}:{tag}` → JSON-serialized `ImageEntry`
pub struct ImageCatalog {
storage: Arc<dyn KVStore>,
prefix: Vec<u8>,
}
impl ImageCatalog {
/// Create a new image catalog
pub fn new(storage: Arc<dyn KVStore>) -> Self {
Self {
storage,
prefix: b"images/".to_vec(),
}
}
/// Register an image in the catalog. Overwrites if already exists.
pub fn register(&self, entry: ImageEntry) -> Result<()> {
let key = self.entry_key(&entry.name, &entry.tag);
let data = serde_json::to_vec(&entry).map_err(|e| {
RuntimeError::internal_error(format!("Failed to serialize image entry: {}", e))
})?;
self.storage.put(key.as_bytes(), &data)?;
debug!("Registered image {}:{} at {}", entry.name, entry.tag, entry.path);
Ok(())
}
/// Look up an image by name and tag
pub fn get(&self, name: &str, tag: &str) -> Result<Option<ImageEntry>> {
let key = self.entry_key(name, tag);
match self.storage.get(key.as_bytes())? {
Some(data) => {
let entry: ImageEntry = serde_json::from_slice(&data).map_err(|e| {
RuntimeError::internal_error(format!(
"Failed to deserialize image entry: {}",
e
))
})?;
Ok(Some(entry))
}
None => Ok(None),
}
}
/// Resolve an image reference string (e.g., "ubuntu:22.04" or "ubuntu") to an entry.
/// If no tag is specified, defaults to "latest".
pub fn resolve(&self, image_ref: &str) -> Result<Option<ImageEntry>> {
let (name, tag) = parse_image_ref(image_ref);
self.get(name, tag)
}
/// List all registered images
pub fn list(&self) -> Result<Vec<ImageEntry>> {
let entries = self.storage.scan(&self.prefix)?;
let mut images = Vec::new();
for (_key, data) in &entries {
match serde_json::from_slice::<ImageEntry>(data) {
Ok(entry) => images.push(entry),
Err(e) => {
debug!("Skipping malformed image entry: {}", e);
}
}
}
images.sort_by(|a, b| a.reference().cmp(&b.reference()));
Ok(images)
}
/// Remove an image from the catalog
pub fn delete(&self, name: &str, tag: &str) -> Result<bool> {
let key = self.entry_key(name, tag);
match self.storage.get(key.as_bytes())? {
Some(_) => {
self.storage.delete(key.as_bytes())?;
debug!("Deleted image {}:{}", name, tag);
Ok(true)
}
None => Ok(false),
}
}
fn entry_key(&self, name: &str, tag: &str) -> String {
format!("images/{}:{}", name, tag)
}
}
/// Parse an image reference like "ubuntu:22.04" or "ubuntu" into (name, tag).
/// Defaults tag to "latest" if not specified.
fn parse_image_ref(image_ref: &str) -> (&str, &str) {
match image_ref.rsplit_once(':') {
Some((name, tag)) if !name.is_empty() && !tag.is_empty() => (name, tag),
_ => (image_ref, "latest"),
}
}
#[cfg(test)]
mod tests {
use super::*;
use reddwarf_storage::RedbBackend;
use tempfile::tempdir;
fn make_catalog() -> (ImageCatalog, tempfile::TempDir) {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test-images.redb");
let storage = Arc::new(RedbBackend::new(&db_path).unwrap());
(ImageCatalog::new(storage), dir)
}
fn make_entry(name: &str, tag: &str, image_type: ImageType, path: &str) -> ImageEntry {
ImageEntry {
name: name.to_string(),
tag: tag.to_string(),
image_type,
path: path.to_string(),
created_at: "2026-03-19T00:00:00Z".to_string(),
}
}
#[test]
fn test_register_and_get() {
let (catalog, _dir) = make_catalog();
let entry = make_entry("ubuntu", "22.04", ImageType::Tarball, "/images/ubuntu.tar.gz");
catalog.register(entry.clone()).unwrap();
let found = catalog.get("ubuntu", "22.04").unwrap().unwrap();
assert_eq!(found.name, "ubuntu");
assert_eq!(found.tag, "22.04");
assert_eq!(found.image_type, ImageType::Tarball);
assert_eq!(found.path, "/images/ubuntu.tar.gz");
}
#[test]
fn test_get_nonexistent() {
let (catalog, _dir) = make_catalog();
let found = catalog.get("nonexistent", "latest").unwrap();
assert!(found.is_none());
}
#[test]
fn test_resolve_with_tag() {
let (catalog, _dir) = make_catalog();
let entry = make_entry("alpine", "3.18", ImageType::ZfsSnapshot, "rpool/images/alpine@base");
catalog.register(entry).unwrap();
let found = catalog.resolve("alpine:3.18").unwrap().unwrap();
assert_eq!(found.name, "alpine");
assert_eq!(found.tag, "3.18");
}
#[test]
fn test_resolve_without_tag_defaults_to_latest() {
let (catalog, _dir) = make_catalog();
let entry = make_entry("alpine", "latest", ImageType::Tarball, "/images/alpine.tar.gz");
catalog.register(entry).unwrap();
let found = catalog.resolve("alpine").unwrap().unwrap();
assert_eq!(found.tag, "latest");
}
#[test]
fn test_list() {
let (catalog, _dir) = make_catalog();
catalog
.register(make_entry("ubuntu", "22.04", ImageType::Tarball, "/a"))
.unwrap();
catalog
.register(make_entry("alpine", "3.18", ImageType::ZfsSnapshot, "/b"))
.unwrap();
catalog
.register(make_entry("alpine", "latest", ImageType::Tarball, "/c"))
.unwrap();
let images = catalog.list().unwrap();
assert_eq!(images.len(), 3);
// Sorted by reference
assert_eq!(images[0].reference(), "alpine:3.18");
assert_eq!(images[1].reference(), "alpine:latest");
assert_eq!(images[2].reference(), "ubuntu:22.04");
}
#[test]
fn test_delete() {
let (catalog, _dir) = make_catalog();
catalog
.register(make_entry("ubuntu", "22.04", ImageType::Tarball, "/a"))
.unwrap();
assert!(catalog.delete("ubuntu", "22.04").unwrap());
assert!(catalog.get("ubuntu", "22.04").unwrap().is_none());
}
#[test]
fn test_delete_nonexistent() {
let (catalog, _dir) = make_catalog();
assert!(!catalog.delete("nonexistent", "latest").unwrap());
}
#[test]
fn test_register_overwrites() {
let (catalog, _dir) = make_catalog();
catalog
.register(make_entry("ubuntu", "22.04", ImageType::Tarball, "/old"))
.unwrap();
catalog
.register(make_entry("ubuntu", "22.04", ImageType::ZfsSnapshot, "/new"))
.unwrap();
let found = catalog.get("ubuntu", "22.04").unwrap().unwrap();
assert_eq!(found.path, "/new");
assert_eq!(found.image_type, ImageType::ZfsSnapshot);
}
#[test]
fn test_parse_image_ref() {
assert_eq!(parse_image_ref("ubuntu:22.04"), ("ubuntu", "22.04"));
assert_eq!(parse_image_ref("ubuntu"), ("ubuntu", "latest"));
assert_eq!(parse_image_ref("my-registry.io/ubuntu:v1"), ("my-registry.io/ubuntu", "v1"));
assert_eq!(parse_image_ref(""), ("", "latest"));
}
#[test]
fn test_image_entry_reference() {
let entry = make_entry("ubuntu", "22.04", ImageType::Tarball, "/a");
assert_eq!(entry.reference(), "ubuntu:22.04");
}
}

View file

@ -0,0 +1,3 @@
pub mod catalog;
pub use catalog::{ImageCatalog, ImageEntry, ImageType};

View file

@ -7,6 +7,7 @@ pub mod command;
pub mod controller;
pub mod dns;
pub mod error;
pub mod image;
#[cfg(target_os = "illumos")]
pub mod illumos;
pub mod mock;
@ -38,6 +39,9 @@ pub use types::{
pub use storage::ZfsStorageEngine;
pub use storage::{MockStorageEngine, StorageEngine, VolumeInfo};
// Re-export image types
pub use image::{ImageCatalog, ImageEntry, ImageType};
// Re-export controller and agent types
pub use api_client::ApiClient;
pub use controller::{PodController, PodControllerConfig};

View file

@ -33,6 +33,11 @@ impl FilterPredicate for PodFitsResources {
let node_resources = ResourceQuantities::from_k8s_resource_map(&allocatable);
// Subtract resources already allocated to pods on this node
let already_used = context.node_allocated(&node_name);
let available_cpu = node_resources.cpu_millicores - already_used.cpu_millicores;
let available_memory = node_resources.memory_bytes - already_used.memory_bytes;
// Get pod requested resources
let pod_spec = match &context.pod.spec {
Some(spec) => spec,
@ -59,31 +64,54 @@ impl FilterPredicate for PodFitsResources {
}
debug!(
"Node {} has CPU: {} milli, Memory: {} bytes",
node_name, node_resources.cpu_millicores, node_resources.memory_bytes
"Node {} has CPU: {} milli (used: {}), Memory: {} bytes (used: {})",
node_name,
node_resources.cpu_millicores,
already_used.cpu_millicores,
node_resources.memory_bytes,
already_used.memory_bytes
);
debug!(
"Pod requests CPU: {} milli, Memory: {} bytes",
total_cpu, total_memory
);
// Check if node has enough resources
if total_cpu > node_resources.cpu_millicores {
// Check if node has enough remaining resources
if total_cpu > available_cpu {
return FilterResult::fail(
node_name,
format!(
"Insufficient CPU: requested {} milli, available {} milli",
total_cpu, node_resources.cpu_millicores
"Insufficient CPU: requested {} milli, available {} milli (allocatable {} - used {})",
total_cpu, available_cpu, node_resources.cpu_millicores, already_used.cpu_millicores
),
);
}
if total_memory > node_resources.memory_bytes {
if total_memory > available_memory {
return FilterResult::fail(
node_name,
format!(
"Insufficient memory: requested {} bytes, available {} bytes",
total_memory, node_resources.memory_bytes
"Insufficient memory: requested {} bytes, available {} bytes (allocatable {} - used {})",
total_memory, available_memory, node_resources.memory_bytes, already_used.memory_bytes
),
);
}
// Check pod count limit
let max_pods = node
.status
.as_ref()
.and_then(|s| s.allocatable.as_ref())
.and_then(|a| a.get("pods"))
.and_then(|q| q.0.parse::<u32>().ok())
.unwrap_or(110);
if already_used.pod_count >= max_pods {
return FilterResult::fail(
node_name,
format!(
"Maximum pod count reached: {} pods (limit {})",
already_used.pod_count, max_pods
),
);
}

View file

@ -15,4 +15,4 @@ pub mod types;
// Re-export commonly used types
pub use error::{Result, SchedulerError};
pub use scheduler::Scheduler;
pub use types::{FilterResult, SchedulingContext, ScoreResult};
pub use types::{FilterResult, NodeAllocatedResources, SchedulingContext, ScoreResult};

View file

@ -1,10 +1,11 @@
use crate::filter::{default_filters, FilterPredicate};
use crate::score::{calculate_weighted_score, default_scores, ScoreFunction};
use crate::types::SchedulingContext;
use crate::types::{NodeAllocatedResources, SchedulingContext};
use crate::{Result, SchedulerError};
use reddwarf_core::{Node, Pod, ResourceEvent};
use reddwarf_core::{Node, Pod, ResourceEvent, ResourceQuantities};
use reddwarf_storage::{KVStore, KeyEncoder, RedbBackend};
use reddwarf_versioning::{Change, CommitBuilder, VersionStore};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
@ -98,6 +99,9 @@ impl Scheduler {
info!("Found {} available nodes", nodes.len());
// Compute resources already allocated on each node from scheduled pods
let mut allocated = self.compute_allocated_resources().await?;
// Schedule each pod
for pod in unscheduled_pods {
let pod_name = pod
@ -107,9 +111,34 @@ impl Scheduler {
.unwrap_or(&"unknown".to_string())
.clone();
match self.schedule_pod(pod, &nodes).await {
match self
.schedule_pod(pod.clone(), &nodes, &allocated)
.await
{
Ok(node_name) => {
info!("Scheduled pod {} to node {}", pod_name, node_name);
// Update allocated resources so the next pod in this cycle
// sees an accurate picture
let entry = allocated.entry(node_name).or_default();
entry.pod_count += 1;
if let Some(spec) = &pod.spec {
for container in &spec.containers {
if let Some(resources) = &container.resources {
if let Some(requests) = &resources.requests {
entry.cpu_millicores += requests
.get("cpu")
.and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok())
.unwrap_or(0);
entry.memory_bytes += requests
.get("memory")
.and_then(|s| {
ResourceQuantities::parse_memory(&s.0).ok()
})
.unwrap_or(0);
}
}
}
}
}
Err(e) => {
error!("Failed to schedule pod {}: {}", pod_name, e);
@ -160,8 +189,63 @@ impl Scheduler {
Ok(nodes)
}
/// Compute per-node allocated resources by summing requests of all scheduled pods
async fn compute_allocated_resources(
&self,
) -> Result<HashMap<String, NodeAllocatedResources>> {
let prefix = KeyEncoder::encode_prefix("v1", "Pod", None);
let results = self.storage.as_ref().scan(prefix.as_bytes())?;
let mut allocated: HashMap<String, NodeAllocatedResources> = HashMap::new();
for (_key, data) in results.iter() {
let pod: Pod = match serde_json::from_slice(data) {
Ok(p) => p,
Err(_) => continue,
};
// Only count pods that are already scheduled to a node
let node_name = match pod.spec.as_ref().and_then(|s| s.node_name.as_ref()) {
Some(n) => n.clone(),
None => continue,
};
let entry = allocated.entry(node_name).or_default();
entry.pod_count += 1;
if let Some(spec) = &pod.spec {
for container in &spec.containers {
if let Some(resources) = &container.resources {
if let Some(requests) = &resources.requests {
entry.cpu_millicores += requests
.get("cpu")
.and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok())
.unwrap_or(0);
entry.memory_bytes += requests
.get("memory")
.and_then(|s| ResourceQuantities::parse_memory(&s.0).ok())
.unwrap_or(0);
}
}
}
}
}
debug!(
"Computed allocated resources for {} nodes",
allocated.len()
);
Ok(allocated)
}
/// Schedule a single pod
async fn schedule_pod(&self, mut pod: Pod, nodes: &[Node]) -> Result<String> {
async fn schedule_pod(
&self,
mut pod: Pod,
nodes: &[Node],
allocated: &HashMap<String, NodeAllocatedResources>,
) -> Result<String> {
let pod_name = pod
.metadata
.name
@ -169,7 +253,8 @@ impl Scheduler {
.ok_or_else(|| SchedulerError::internal_error("Pod has no name"))?
.clone();
let context = SchedulingContext::new(pod.clone(), nodes.to_vec());
let context =
SchedulingContext::with_allocated(pod.clone(), nodes.to_vec(), allocated.clone());
// Phase 1: Filter nodes
let mut feasible_nodes = Vec::new();
@ -354,6 +439,7 @@ impl Scheduler {
#[cfg(test)]
mod tests {
use super::*;
use crate::types::NodeAllocatedResources;
use reddwarf_core::WatchEventType;
use reddwarf_storage::RedbBackend;
use reddwarf_versioning::VersionStore;
@ -468,7 +554,8 @@ mod tests {
store_pod(&scheduler, &pod);
// Schedule pod
let result = scheduler.schedule_pod(pod, &nodes).await;
let allocated = HashMap::new();
let result = scheduler.schedule_pod(pod, &nodes, &allocated).await;
assert!(result.is_ok());
let node_name = result.unwrap();
@ -486,11 +573,43 @@ mod tests {
let pod = create_test_pod("test-pod", "default", "2", "2Gi");
// Schedule pod should fail
let result = scheduler.schedule_pod(pod, &nodes).await;
let allocated = HashMap::new();
let result = scheduler.schedule_pod(pod, &nodes, &allocated).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_schedule_pod_respects_allocated_resources() {
let (scheduler, _rx) = create_test_scheduler();
// Node with 4 CPU, 8Gi memory
let nodes = vec![
create_test_node("node1", "4", "8Gi"),
create_test_node("node2", "4", "8Gi"),
];
// node1 already has 3.5 CPU allocated, node2 is empty
let mut allocated = HashMap::new();
allocated.insert(
"node1".to_string(),
NodeAllocatedResources {
cpu_millicores: 3500,
memory_bytes: 4 * 1024 * 1024 * 1024,
pod_count: 3,
},
);
let pod = create_test_pod("test-pod", "default", "1", "1Gi");
store_pod(&scheduler, &pod);
// node1 only has 500m free, pod needs 1000m → node1 should be filtered out
// Pod should land on node2
let result = scheduler.schedule_pod(pod, &nodes, &allocated).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "node2");
}
#[tokio::test]
async fn test_bind_pod_publishes_modified_event() {
let (scheduler, mut rx) = create_test_scheduler();

View file

@ -38,6 +38,9 @@ impl ScoreFunction for LeastAllocated {
return ScoreResult::new(node_name, 0);
}
// Get already-allocated resources on this node
let already_used = context.node_allocated(&node_name);
// Get pod requested resources
let pod_spec = match &context.pod.spec {
Some(spec) => spec,
@ -63,23 +66,19 @@ impl ScoreFunction for LeastAllocated {
}
}
// Calculate remaining resources after scheduling this pod
let remaining_cpu = node_resources.cpu_millicores - total_cpu;
let remaining_memory = node_resources.memory_bytes - total_memory;
// Total usage after scheduling = already used + this pod's requests
let used_cpu = already_used.cpu_millicores + total_cpu;
let used_memory = already_used.memory_bytes + total_memory;
// Calculate utilization percentage for each resource
let cpu_utilization = if node_resources.cpu_millicores > 0 {
((node_resources.cpu_millicores - remaining_cpu) as f64
/ node_resources.cpu_millicores as f64)
* 100.0
(used_cpu as f64 / node_resources.cpu_millicores as f64) * 100.0
} else {
100.0
};
let memory_utilization = if node_resources.memory_bytes > 0 {
((node_resources.memory_bytes - remaining_memory) as f64
/ node_resources.memory_bytes as f64)
* 100.0
(used_memory as f64 / node_resources.memory_bytes as f64) * 100.0
} else {
100.0
};
@ -90,8 +89,9 @@ impl ScoreFunction for LeastAllocated {
let score = (100.0 - avg_utilization).clamp(0.0, 100.0) as i32;
debug!(
"Node {} score: {} (CPU util: {:.1}%, Memory util: {:.1}%)",
node_name, score, cpu_utilization, memory_utilization
"Node {} score: {} (CPU util: {:.1}%, Memory util: {:.1}%, already used: {}m CPU, {} mem)",
node_name, score, cpu_utilization, memory_utilization,
already_used.cpu_millicores, already_used.memory_bytes
);
ScoreResult::new(node_name, score)
@ -128,6 +128,9 @@ impl ScoreFunction for BalancedAllocation {
return ScoreResult::new(node_name, 0);
}
// Get already-allocated resources on this node
let already_used = context.node_allocated(&node_name);
// Get pod requested resources
let pod_spec = match &context.pod.spec {
Some(spec) => spec,
@ -153,9 +156,12 @@ impl ScoreFunction for BalancedAllocation {
}
}
// Calculate utilization after scheduling
let cpu_fraction = total_cpu as f64 / node_resources.cpu_millicores as f64;
let memory_fraction = total_memory as f64 / node_resources.memory_bytes as f64;
// Total utilization after scheduling = already used + this pod
let used_cpu = already_used.cpu_millicores + total_cpu;
let used_memory = already_used.memory_bytes + total_memory;
let cpu_fraction = used_cpu as f64 / node_resources.cpu_millicores as f64;
let memory_fraction = used_memory as f64 / node_resources.memory_bytes as f64;
// Prefer balanced resource usage (CPU and memory usage should be similar)
let variance = (cpu_fraction - memory_fraction).abs();

View file

@ -1,5 +1,17 @@
pub use reddwarf_core::ResourceQuantities;
use reddwarf_core::{Node, Pod};
use std::collections::HashMap;
/// Per-node resource usage from already-scheduled pods
#[derive(Debug, Clone, Default)]
pub struct NodeAllocatedResources {
/// Total CPU millicores requested by pods on this node
pub cpu_millicores: i64,
/// Total memory bytes requested by pods on this node
pub memory_bytes: i64,
/// Number of pods currently scheduled on this node
pub pod_count: u32,
}
/// Scheduling context containing pod and available nodes
#[derive(Debug, Clone)]
@ -8,12 +20,41 @@ pub struct SchedulingContext {
pub pod: Pod,
/// Available nodes
pub nodes: Vec<Node>,
/// Resources already allocated on each node (by node name)
pub allocated: HashMap<String, NodeAllocatedResources>,
}
impl SchedulingContext {
/// Create a new scheduling context
pub fn new(pod: Pod, nodes: Vec<Node>) -> Self {
Self { pod, nodes }
Self {
pod,
nodes,
allocated: HashMap::new(),
}
}
/// Create a scheduling context with pre-computed allocated resources
pub fn with_allocated(
pod: Pod,
nodes: Vec<Node>,
allocated: HashMap<String, NodeAllocatedResources>,
) -> Self {
Self {
pod,
nodes,
allocated,
}
}
/// Get the allocated resources for a node, defaulting to zero
pub fn node_allocated(&self, node_name: &str) -> &NodeAllocatedResources {
static DEFAULT: NodeAllocatedResources = NodeAllocatedResources {
cpu_millicores: 0,
memory_bytes: 0,
pod_count: 0,
};
self.allocated.get(node_name).unwrap_or(&DEFAULT)
}
}

View file

@ -24,3 +24,4 @@ clap = { workspace = true }
miette = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
chrono = { workspace = true }

View file

@ -2,10 +2,10 @@ use clap::{Parser, Subcommand};
use reddwarf_apiserver::{ApiError, ApiServer, AppState, Config as ApiConfig, TlsMode};
use reddwarf_core::{Namespace, ResourceQuantities};
use reddwarf_runtime::{
ApiClient, DnsServer, DnsServerConfig, Ipam, MockRuntime, MockStorageEngine, NatManager,
NodeAgent, NodeAgentConfig, NodeHealthChecker, NodeHealthCheckerConfig, PodController,
PodControllerConfig, ServiceController, ServiceControllerConfig, StorageEngine,
StoragePoolConfig, ZoneBrand,
ApiClient, DnsServer, DnsServerConfig, ImageCatalog, ImageEntry, ImageType, Ipam, MockRuntime,
MockStorageEngine, NatManager, NodeAgent, NodeAgentConfig, NodeHealthChecker,
NodeHealthCheckerConfig, PodController, PodControllerConfig, ServiceController,
ServiceControllerConfig, StorageEngine, StoragePoolConfig, ZoneBrand,
};
use reddwarf_scheduler::scheduler::SchedulerConfig;
use reddwarf_scheduler::Scheduler;
@ -40,6 +40,35 @@ struct TlsArgs {
tls_key: Option<String>,
}
#[derive(Subcommand)]
enum ImageCommands {
/// Import a local tarball or ZFS snapshot as an image
Import {
/// Image name (e.g., "ubuntu")
#[arg(long)]
name: String,
/// Image tag (e.g., "22.04")
#[arg(long, default_value = "latest")]
tag: String,
/// Image type: "tarball" or "zfs-snapshot"
#[arg(long, default_value = "tarball")]
image_type: String,
/// Path to the tarball file or ZFS snapshot name
path: String,
},
/// List all registered images
List,
/// Remove an image from the catalog
Delete {
/// Image name
#[arg(long)]
name: String,
/// Image tag
#[arg(long, default_value = "latest")]
tag: String,
},
}
#[derive(Subcommand)]
#[allow(clippy::large_enum_variant)]
enum Commands {
@ -107,6 +136,14 @@ enum Commands {
#[command(flatten)]
tls_args: TlsArgs,
},
/// Manage local container images
Image {
/// Path to the redb database file
#[arg(long, default_value = "./reddwarf.redb")]
data_dir: String,
#[command(subcommand)]
action: ImageCommands,
},
}
#[tokio::main]
@ -127,6 +164,7 @@ async fn main() -> miette::Result<()> {
data_dir,
tls_args,
} => run_serve(&bind, &data_dir, &tls_args).await,
Commands::Image { data_dir, action } => run_image_command(&data_dir, action),
Commands::Agent {
node_name,
bind,
@ -235,6 +273,84 @@ fn tls_mode_from_args(args: &TlsArgs, data_dir: &str) -> miette::Result<TlsMode>
}
}
/// Handle image management subcommands
fn run_image_command(data_dir: &str, action: ImageCommands) -> miette::Result<()> {
let storage = Arc::new(
RedbBackend::new(std::path::Path::new(data_dir))
.map_err(|e| miette::miette!("Failed to open storage at '{}': {}", data_dir, e))?,
);
let catalog = ImageCatalog::new(storage);
match action {
ImageCommands::Import {
name,
tag,
image_type,
path,
} => {
let image_type = match image_type.as_str() {
"tarball" => ImageType::Tarball,
"zfs-snapshot" => ImageType::ZfsSnapshot,
other => {
return Err(miette::miette!(
help = "Use 'tarball' or 'zfs-snapshot'",
"Unknown image type: '{}'",
other
))
}
};
let entry = ImageEntry {
name: name.clone(),
tag: tag.clone(),
image_type,
path: path.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
};
catalog
.register(entry)
.map_err(|e| miette::miette!("Failed to register image: {}", e))?;
info!("Registered image {}:{} from {}", name, tag, path);
}
ImageCommands::List => {
let images = catalog
.list()
.map_err(|e| miette::miette!("Failed to list images: {}", e))?;
if images.is_empty() {
println!("No images registered");
} else {
println!("{:<30} {:<10} {:<15} {}", "NAME:TAG", "TYPE", "CREATED", "PATH");
for img in &images {
let type_str = match img.image_type {
ImageType::Tarball => "tarball",
ImageType::ZfsSnapshot => "zfs-snapshot",
};
println!(
"{:<30} {:<10} {:<15} {}",
img.reference(),
type_str,
&img.created_at[..10.min(img.created_at.len())],
img.path
);
}
}
}
ImageCommands::Delete { name, tag } => {
let deleted = catalog
.delete(&name, &tag)
.map_err(|e| miette::miette!("Failed to delete image: {}", e))?;
if deleted {
info!("Deleted image {}:{}", name, tag);
} else {
return Err(miette::miette!("Image {}:{} not found", name, tag));
}
}
}
Ok(())
}
/// Run only the API server
async fn run_serve(bind: &str, data_dir: &str, tls_args: &TlsArgs) -> miette::Result<()> {
info!("Starting reddwarf API server");
@ -399,12 +515,16 @@ async fn run_agent(
reconcile_interval: std::time::Duration::from_secs(30),
};
let controller = PodController::new(
// 5b. Create image catalog for resolving container images
let image_catalog = Arc::new(ImageCatalog::new(state.storage.clone()));
let controller = PodController::with_image_catalog(
runtime.clone(),
api_client.clone(),
state.event_tx.clone(),
controller_config,
ipam,
image_catalog,
);
let controller_token = token.clone();
let controller_handle = tokio::spawn(async move {