From c50ecb26642cd59bf54fb3b8c1e8d30d0fb2db7f Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sun, 8 Feb 2026 23:21:53 +0100 Subject: [PATCH] Close the control loop: versioned bind, event-driven controller, graceful shutdown - Move WatchEventType and ResourceEvent to reddwarf-core so scheduler and runtime can use them without depending on the apiserver crate - Fix scheduler bind_pod to create versioned commits and publish MODIFIED events so the pod controller learns about scheduled pods - Replace polling loop in pod controller with event bus subscription, wire handle_delete for DELETED events, keep reconcile_all for startup sync and lag recovery - Add allocatable/capacity resources (cpu, memory, pods) to node agent build_node so the scheduler's resource filter accepts nodes - Bootstrap "default" namespace on startup to prevent pod creation failures in the default namespace - Replace .abort() shutdown with CancellationToken-based graceful shutdown across scheduler, controller, and node agent Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 5 + Cargo.toml | 1 + crates/reddwarf-apiserver/src/event_bus.rs | 69 +------- crates/reddwarf-apiserver/src/watch.rs | 11 +- crates/reddwarf-core/src/events.rs | 74 ++++++++ crates/reddwarf-core/src/lib.rs | 2 + crates/reddwarf-runtime/Cargo.toml | 1 + crates/reddwarf-runtime/src/controller.rs | 80 +++++++-- crates/reddwarf-runtime/src/node_agent.rs | 69 +++++++- crates/reddwarf-scheduler/Cargo.toml | 2 + crates/reddwarf-scheduler/src/scheduler.rs | 187 +++++++++++++++++---- crates/reddwarf/Cargo.toml | 1 + crates/reddwarf/src/main.rs | 88 ++++++++-- 13 files changed, 452 insertions(+), 138 deletions(-) create mode 100644 crates/reddwarf-core/src/events.rs diff --git a/Cargo.lock b/Cargo.lock index 897c4a1..d688f58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1337,6 +1337,7 @@ dependencies = [ "reddwarf-storage", "reddwarf-versioning", "tokio", + "tokio-util", "tracing", "tracing-subscriber", ] @@ -1399,6 +1400,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-stream", + "tokio-util", "tracing", "uuid", ] @@ -1411,11 +1413,13 @@ dependencies = [ "miette", "reddwarf-core", "reddwarf-storage", + "reddwarf-versioning", "serde", "serde_json", "tempfile", "thiserror 2.0.18", "tokio", + "tokio-util", "tracing", ] @@ -2051,6 +2055,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 60565a0..e75c10f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ anyhow = "1.0" # Async runtime tokio = { version = "1.40", features = ["full"] } tokio-stream = { version = "0.1", features = ["sync"] } +tokio-util = { version = "0.7", features = ["rt"] } futures-util = "0.3" async-trait = "0.1" diff --git a/crates/reddwarf-apiserver/src/event_bus.rs b/crates/reddwarf-apiserver/src/event_bus.rs index d992cee..3a6122e 100644 --- a/crates/reddwarf-apiserver/src/event_bus.rs +++ b/crates/reddwarf-apiserver/src/event_bus.rs @@ -1,7 +1,4 @@ -use reddwarf_core::{GroupVersionKind, ResourceKey}; -use serde::{Deserialize, Serialize}; - -use crate::watch::WatchEventType; +pub use reddwarf_core::{ResourceEvent, WatchEventType}; /// Configuration for the event bus #[derive(Debug, Clone)] @@ -16,74 +13,12 @@ impl Default for EventBusConfig { } } -/// A resource event emitted by the API server on mutations -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ResourceEvent { - /// Type of watch event (ADDED, MODIFIED, DELETED) - pub event_type: WatchEventType, - /// GroupVersionKind of the resource - pub gvk: GroupVersionKind, - /// Full resource key (gvk + namespace + name) - pub resource_key: ResourceKey, - /// The serialized resource object - pub object: serde_json::Value, - /// Resource version at the time of the event - pub resource_version: String, -} - -impl ResourceEvent { - /// Create an ADDED event - pub fn added( - resource_key: ResourceKey, - object: serde_json::Value, - resource_version: String, - ) -> Self { - Self { - event_type: WatchEventType::Added, - gvk: resource_key.gvk.clone(), - resource_key, - object, - resource_version, - } - } - - /// Create a MODIFIED event - pub fn modified( - resource_key: ResourceKey, - object: serde_json::Value, - resource_version: String, - ) -> Self { - Self { - event_type: WatchEventType::Modified, - gvk: resource_key.gvk.clone(), - resource_key, - object, - resource_version, - } - } - - /// Create a DELETED event - pub fn deleted( - resource_key: ResourceKey, - object: serde_json::Value, - resource_version: String, - ) -> Self { - Self { - event_type: WatchEventType::Deleted, - gvk: resource_key.gvk.clone(), - resource_key, - object, - resource_version, - } - } -} - #[cfg(test)] mod tests { use super::*; use crate::handlers::common::{create_resource, delete_resource, update_resource}; use crate::AppState; - use reddwarf_core::{GroupVersionKind, Pod, Resource}; + use reddwarf_core::{GroupVersionKind, Pod, Resource, ResourceKey, WatchEventType}; use reddwarf_storage::RedbBackend; use reddwarf_versioning::VersionStore; use std::sync::Arc; diff --git a/crates/reddwarf-apiserver/src/watch.rs b/crates/reddwarf-apiserver/src/watch.rs index 7c773f8..89c6b40 100644 --- a/crates/reddwarf-apiserver/src/watch.rs +++ b/crates/reddwarf-apiserver/src/watch.rs @@ -3,21 +3,12 @@ use crate::AppState; use axum::response::sse::{Event, KeepAlive, Sse}; use futures_util::StreamExt; use reddwarf_core::GroupVersionKind; +pub use reddwarf_core::WatchEventType; use serde::{Deserialize, Serialize}; use std::convert::Infallible; use std::sync::Arc; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; -/// Watch event type -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "UPPERCASE")] -pub enum WatchEventType { - Added, - Modified, - Deleted, - Error, -} - /// Watch event #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WatchEvent { diff --git a/crates/reddwarf-core/src/events.rs b/crates/reddwarf-core/src/events.rs new file mode 100644 index 0000000..0157171 --- /dev/null +++ b/crates/reddwarf-core/src/events.rs @@ -0,0 +1,74 @@ +use crate::types::{GroupVersionKind, ResourceKey}; +use serde::{Deserialize, Serialize}; + +/// Watch event type +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum WatchEventType { + Added, + Modified, + Deleted, + Error, +} + +/// A resource event emitted by the API server on mutations +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResourceEvent { + /// Type of watch event (ADDED, MODIFIED, DELETED) + pub event_type: WatchEventType, + /// GroupVersionKind of the resource + pub gvk: GroupVersionKind, + /// Full resource key (gvk + namespace + name) + pub resource_key: ResourceKey, + /// The serialized resource object + pub object: serde_json::Value, + /// Resource version at the time of the event + pub resource_version: String, +} + +impl ResourceEvent { + /// Create an ADDED event + pub fn added( + resource_key: ResourceKey, + object: serde_json::Value, + resource_version: String, + ) -> Self { + Self { + event_type: WatchEventType::Added, + gvk: resource_key.gvk.clone(), + resource_key, + object, + resource_version, + } + } + + /// Create a MODIFIED event + pub fn modified( + resource_key: ResourceKey, + object: serde_json::Value, + resource_version: String, + ) -> Self { + Self { + event_type: WatchEventType::Modified, + gvk: resource_key.gvk.clone(), + resource_key, + object, + resource_version, + } + } + + /// Create a DELETED event + pub fn deleted( + resource_key: ResourceKey, + object: serde_json::Value, + resource_version: String, + ) -> Self { + Self { + event_type: WatchEventType::Deleted, + gvk: resource_key.gvk.clone(), + resource_key, + object, + resource_version, + } + } +} diff --git a/crates/reddwarf-core/src/lib.rs b/crates/reddwarf-core/src/lib.rs index 4e5d8e0..98a4799 100644 --- a/crates/reddwarf-core/src/lib.rs +++ b/crates/reddwarf-core/src/lib.rs @@ -7,11 +7,13 @@ //! - Serialization helpers pub mod error; +pub mod events; pub mod resources; pub mod types; // Re-export commonly used types pub use error::{ReddwarfError, Result}; +pub use events::{ResourceEvent, WatchEventType}; pub use resources::{is_valid_name, Resource, ResourceError}; pub use types::{GroupVersionKind, ResourceKey, ResourceVersion}; diff --git a/crates/reddwarf-runtime/Cargo.toml b/crates/reddwarf-runtime/Cargo.toml index 4fab521..150fda4 100644 --- a/crates/reddwarf-runtime/Cargo.toml +++ b/crates/reddwarf-runtime/Cargo.toml @@ -12,6 +12,7 @@ reddwarf-core = { workspace = true } k8s-openapi = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } +tokio-util = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } miette = { workspace = true } diff --git a/crates/reddwarf-runtime/src/controller.rs b/crates/reddwarf-runtime/src/controller.rs index caa137d..362fd7c 100644 --- a/crates/reddwarf-runtime/src/controller.rs +++ b/crates/reddwarf-runtime/src/controller.rs @@ -3,7 +3,10 @@ use crate::error::{Result, RuntimeError}; use crate::traits::ZoneRuntime; use crate::types::*; use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus}; +use reddwarf_core::{ResourceEvent, WatchEventType}; use std::sync::Arc; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// Configuration for the pod controller @@ -27,6 +30,7 @@ pub struct PodControllerConfig { pub struct PodController { runtime: Arc, api_client: Arc, + event_tx: broadcast::Sender, config: PodControllerConfig, } @@ -34,33 +38,89 @@ impl PodController { pub fn new( runtime: Arc, api_client: Arc, + event_tx: broadcast::Sender, config: PodControllerConfig, ) -> Self { Self { runtime, api_client, + event_tx, config, } } - /// Run the controller — polls for unscheduled-to-this-node pods in a loop. + /// Run the controller — reacts to pod events from the in-process event bus. /// - /// In a real implementation, this would use SSE watch. For now, we receive - /// events via the in-process event bus by subscribing to the broadcast channel. - /// Since the controller runs in the same process as the API server, we use - /// a simpler polling approach over the HTTP API. - pub async fn run(&self) -> Result<()> { + /// On startup, performs a full reconcile to catch up on any pods that were + /// scheduled while the controller was down. Then switches to event-driven mode. + pub async fn run(&self, token: CancellationToken) -> Result<()> { info!( "Starting pod controller for node '{}'", self.config.node_name ); - // Poll loop — watches for pods via HTTP list + // Initial full sync + if let Err(e) = self.reconcile_all().await { + error!("Initial reconcile failed: {}", e); + } + + let mut rx = self.event_tx.subscribe(); + loop { - if let Err(e) = self.reconcile_all().await { - error!("Pod controller reconcile cycle failed: {}", e); + tokio::select! { + _ = token.cancelled() => { + info!("Pod controller shutting down"); + return Ok(()); + } + result = rx.recv() => { + match result { + Ok(event) => { + if event.gvk.kind != "Pod" { + continue; + } + match event.event_type { + WatchEventType::Added | WatchEventType::Modified => { + match serde_json::from_value::(event.object) { + Ok(pod) => { + if let Err(e) = self.reconcile(&pod).await { + let name = pod.metadata.name.as_deref().unwrap_or(""); + error!("Failed to reconcile pod {}: {}", name, e); + } + } + Err(e) => { + warn!("Failed to parse pod from event: {}", e); + } + } + } + WatchEventType::Deleted => { + match serde_json::from_value::(event.object) { + Ok(pod) => { + if let Err(e) = self.handle_delete(&pod).await { + let name = pod.metadata.name.as_deref().unwrap_or(""); + error!("Failed to handle pod deletion {}: {}", name, e); + } + } + Err(e) => { + warn!("Failed to parse pod from delete event: {}", e); + } + } + } + _ => {} + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("Missed {} events, doing full resync", n); + if let Err(e) = self.reconcile_all().await { + error!("Resync after lag failed: {}", e); + } + } + Err(broadcast::error::RecvError::Closed) => { + info!("Event bus closed, stopping pod controller"); + return Ok(()); + } + } + } } - tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } diff --git a/crates/reddwarf-runtime/src/node_agent.rs b/crates/reddwarf-runtime/src/node_agent.rs index 16cf437..c2d49b6 100644 --- a/crates/reddwarf-runtime/src/node_agent.rs +++ b/crates/reddwarf-runtime/src/node_agent.rs @@ -1,9 +1,12 @@ use crate::api_client::ApiClient; use crate::error::{Result, RuntimeError}; use k8s_openapi::api::core::v1::{Node, NodeAddress, NodeCondition, NodeStatus}; +use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; +use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; +use tokio_util::sync::CancellationToken; use tracing::{info, warn}; /// Configuration for the node agent @@ -65,7 +68,7 @@ impl NodeAgent { } /// Run the heartbeat loop - pub async fn run(&self) -> Result<()> { + pub async fn run(&self, token: CancellationToken) -> Result<()> { // Register first self.register().await?; @@ -75,10 +78,16 @@ impl NodeAgent { ); loop { - tokio::time::sleep(self.config.heartbeat_interval).await; - - if let Err(e) = self.heartbeat().await { - warn!("Heartbeat failed: {} — will retry", e); + tokio::select! { + _ = token.cancelled() => { + info!("Node agent shutting down"); + return Ok(()); + } + _ = tokio::time::sleep(self.config.heartbeat_interval) => { + if let Err(e) = self.heartbeat().await { + warn!("Heartbeat failed: {} — will retry", e); + } + } } } } @@ -99,6 +108,22 @@ impl NodeAgent { fn build_node(&self) -> Node { let hostname = self.config.node_name.clone(); + let cpu_count = std::thread::available_parallelism() + .map(|n| n.get().to_string()) + .unwrap_or_else(|_| "1".to_string()); + + let allocatable = BTreeMap::from([ + ("cpu".to_string(), Quantity(cpu_count.clone())), + ("memory".to_string(), Quantity("8Gi".to_string())), + ("pods".to_string(), Quantity("110".to_string())), + ]); + + let capacity = BTreeMap::from([ + ("cpu".to_string(), Quantity(cpu_count)), + ("memory".to_string(), Quantity("8Gi".to_string())), + ("pods".to_string(), Quantity("110".to_string())), + ]); + Node { metadata: ObjectMeta { name: Some(self.config.node_name.clone()), @@ -132,6 +157,8 @@ impl NodeAgent { type_: "Hostname".to_string(), address: hostname, }]), + allocatable: Some(allocatable), + capacity: Some(capacity), ..Default::default() }), ..Default::default() @@ -168,4 +195,36 @@ mod tests { assert_eq!(conditions[0].status, "True"); assert!(conditions[0].last_heartbeat_time.is_some()); } + + #[test] + fn test_build_node_has_allocatable_resources() { + let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); + let config = + NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); + let agent = NodeAgent::new(api_client, config); + + let node = agent.build_node(); + let status = node.status.unwrap(); + + // Check allocatable + let alloc = status.allocatable.unwrap(); + assert!(alloc.contains_key("cpu")); + assert!(alloc.contains_key("memory")); + assert!(alloc.contains_key("pods")); + assert_eq!(alloc["memory"].0, "8Gi"); + assert_eq!(alloc["pods"].0, "110"); + + // CPU should match available parallelism + let expected_cpu = std::thread::available_parallelism() + .map(|n| n.get().to_string()) + .unwrap_or_else(|_| "1".to_string()); + assert_eq!(alloc["cpu"].0, expected_cpu); + + // Check capacity + let cap = status.capacity.unwrap(); + assert!(cap.contains_key("cpu")); + assert!(cap.contains_key("memory")); + assert!(cap.contains_key("pods")); + assert_eq!(cap["cpu"].0, expected_cpu); + } } diff --git a/crates/reddwarf-scheduler/Cargo.toml b/crates/reddwarf-scheduler/Cargo.toml index 2ff0b59..dfbbd74 100644 --- a/crates/reddwarf-scheduler/Cargo.toml +++ b/crates/reddwarf-scheduler/Cargo.toml @@ -10,8 +10,10 @@ rust-version.workspace = true [dependencies] reddwarf-core = { workspace = true } reddwarf-storage = { workspace = true } +reddwarf-versioning = { workspace = true } k8s-openapi = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } miette = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } diff --git a/crates/reddwarf-scheduler/src/scheduler.rs b/crates/reddwarf-scheduler/src/scheduler.rs index 20086ef..cc44ab6 100644 --- a/crates/reddwarf-scheduler/src/scheduler.rs +++ b/crates/reddwarf-scheduler/src/scheduler.rs @@ -2,11 +2,14 @@ use crate::filter::{default_filters, FilterPredicate}; use crate::score::{calculate_weighted_score, default_scores, ScoreFunction}; use crate::types::SchedulingContext; use crate::{Result, SchedulerError}; -use reddwarf_core::{Node, Pod}; +use reddwarf_core::{Node, Pod, ResourceEvent}; use reddwarf_storage::{KVStore, KeyEncoder, RedbBackend}; +use reddwarf_versioning::{Change, CommitBuilder, VersionStore}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::broadcast; use tokio::time::sleep; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// Configuration for the scheduler @@ -27,6 +30,8 @@ impl Default for SchedulerConfig { /// Pod scheduler pub struct Scheduler { storage: Arc, + version_store: Arc, + event_tx: broadcast::Sender, config: SchedulerConfig, filters: Vec>, scorers: Vec>, @@ -34,9 +39,16 @@ pub struct Scheduler { impl Scheduler { /// Create a new scheduler - pub fn new(storage: Arc, config: SchedulerConfig) -> Self { + pub fn new( + storage: Arc, + version_store: Arc, + event_tx: broadcast::Sender, + config: SchedulerConfig, + ) -> Self { Self { storage, + version_store, + event_tx, config, filters: default_filters(), scorers: default_scores(), @@ -44,15 +56,21 @@ impl Scheduler { } /// Run the scheduler loop - pub async fn run(&self) -> Result<()> { + pub async fn run(&self, token: CancellationToken) -> Result<()> { info!("Starting scheduler"); loop { - if let Err(e) = self.schedule_cycle().await { - error!("Scheduling cycle failed: {}", e); + tokio::select! { + _ = token.cancelled() => { + info!("Scheduler shutting down"); + return Ok(()); + } + _ = sleep(self.config.schedule_interval) => { + if let Err(e) = self.schedule_cycle().await { + error!("Scheduling cycle failed: {}", e); + } + } } - - sleep(self.config.schedule_interval).await; } } @@ -240,21 +258,39 @@ impl Scheduler { Ok(best_node) } - /// Bind a pod to a node (update spec.nodeName) + /// Bind a pod to a node (update spec.nodeName) with versioning and event publishing async fn bind_pod(&self, pod: &mut Pod, node_name: &str) -> Result<()> { let pod_name = pod .metadata .name .as_ref() - .ok_or_else(|| SchedulerError::internal_error("Pod has no name"))?; + .ok_or_else(|| SchedulerError::internal_error("Pod has no name"))? + .clone(); let namespace = pod .metadata .namespace .as_ref() - .ok_or_else(|| SchedulerError::internal_error("Pod has no namespace"))?; + .ok_or_else(|| SchedulerError::internal_error("Pod has no namespace"))? + .clone(); info!("Binding pod {} to node {}", pod_name, node_name); + let key = reddwarf_core::ResourceKey::new( + reddwarf_core::GroupVersionKind::from_api_version_kind("v1", "Pod"), + &namespace, + &pod_name, + ); + let storage_key = KeyEncoder::encode_resource_key(&key); + + // Read the current pod bytes for version diff + let prev_data = self + .storage + .as_ref() + .get(storage_key.as_bytes())? + .ok_or_else(|| { + SchedulerError::internal_error(format!("Pod not found in storage: {}", pod_name)) + })?; + // Update pod spec if let Some(spec) = &mut pod.spec { spec.node_name = Some(node_name.to_string()); @@ -262,21 +298,54 @@ impl Scheduler { return Err(SchedulerError::internal_error("Pod has no spec")); } - // Save updated pod - let key = reddwarf_core::ResourceKey::new( - reddwarf_core::GroupVersionKind::from_api_version_kind("v1", "Pod"), - namespace, - pod_name, - ); - - let storage_key = KeyEncoder::encode_resource_key(&key); - let data = serde_json::to_vec(&pod).map_err(|e| { + // Serialize new pod + let new_data = serde_json::to_vec(&pod).map_err(|e| { SchedulerError::internal_error(format!("Failed to serialize pod: {}", e)) })?; - self.storage.as_ref().put(storage_key.as_bytes(), &data)?; + // Create a versioned commit + let change = Change::update( + storage_key.clone(), + String::from_utf8_lossy(&new_data).to_string(), + String::from_utf8_lossy(&prev_data).to_string(), + ); - info!("Successfully bound pod {} to node {}", pod_name, node_name); + let commit = self + .version_store + .create_commit( + CommitBuilder::new() + .change(change) + .message(format!("Bind pod {} to node {}", pod_name, node_name)), + ) + .map_err(|e| { + SchedulerError::internal_error(format!("Failed to create commit: {}", e)) + })?; + + // Set resource version to commit ID + pod.metadata.resource_version = Some(commit.id().to_string()); + + // Re-serialize with updated resource version + let final_data = serde_json::to_vec(&pod).map_err(|e| { + SchedulerError::internal_error(format!("Failed to serialize pod: {}", e)) + })?; + + // Write to storage + self.storage + .as_ref() + .put(storage_key.as_bytes(), &final_data)?; + + info!( + "Successfully bound pod {} to node {} at version {}", + pod_name, + node_name, + commit.id() + ); + + // Publish MODIFIED event (best-effort) + if let Ok(object) = serde_json::to_value(&*pod) { + let event = ResourceEvent::modified(key, object, commit.id().to_string()); + let _ = self.event_tx.send(event); + } Ok(()) } @@ -285,10 +354,23 @@ impl Scheduler { #[cfg(test)] mod tests { use super::*; + use reddwarf_core::WatchEventType; use reddwarf_storage::RedbBackend; + use reddwarf_versioning::VersionStore; use std::collections::BTreeMap; use tempfile::tempdir; + fn create_test_scheduler() -> (Scheduler, broadcast::Receiver) { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test.redb"); + let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); + let version_store = Arc::new(VersionStore::new(storage.clone()).unwrap()); + let (event_tx, event_rx) = broadcast::channel(64); + let scheduler = + Scheduler::new(storage, version_store, event_tx, SchedulerConfig::default()); + (scheduler, event_rx) + } + fn create_test_node(name: &str, cpu: &str, memory: &str) -> Node { let mut node = Node::default(); node.metadata.name = Some(name.to_string()); @@ -355,13 +437,25 @@ mod tests { pod } + /// Helper: store a pod in storage so bind_pod can read prev version + fn store_pod(scheduler: &Scheduler, pod: &Pod) { + let key = reddwarf_core::ResourceKey::new( + reddwarf_core::GroupVersionKind::from_api_version_kind("v1", "Pod"), + pod.metadata.namespace.as_deref().unwrap(), + pod.metadata.name.as_deref().unwrap(), + ); + let storage_key = KeyEncoder::encode_resource_key(&key); + let data = serde_json::to_vec(pod).unwrap(); + scheduler + .storage + .as_ref() + .put(storage_key.as_bytes(), &data) + .unwrap(); + } + #[tokio::test] async fn test_schedule_pod_success() { - let dir = tempdir().unwrap(); - let db_path = dir.path().join("test.redb"); - let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); - - let scheduler = Scheduler::new(storage.clone(), SchedulerConfig::default()); + let (scheduler, _rx) = create_test_scheduler(); // Create nodes let nodes = vec![ @@ -369,8 +463,9 @@ mod tests { create_test_node("node2", "2", "4Gi"), ]; - // Create pod + // Create pod and store it so bind_pod can read the previous version let pod = create_test_pod("test-pod", "default", "1", "1Gi"); + store_pod(&scheduler, &pod); // Schedule pod let result = scheduler.schedule_pod(pod, &nodes).await; @@ -382,11 +477,7 @@ mod tests { #[tokio::test] async fn test_schedule_pod_no_suitable_nodes() { - let dir = tempdir().unwrap(); - let db_path = dir.path().join("test.redb"); - let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); - - let scheduler = Scheduler::new(storage, SchedulerConfig::default()); + let (scheduler, _rx) = create_test_scheduler(); // Create node with insufficient resources let nodes = vec![create_test_node("node1", "1", "1Gi")]; @@ -399,4 +490,36 @@ mod tests { assert!(result.is_err()); } + + #[tokio::test] + async fn test_bind_pod_publishes_modified_event() { + let (scheduler, mut rx) = create_test_scheduler(); + + let mut pod = create_test_pod("event-pod", "default", "1", "1Gi"); + store_pod(&scheduler, &pod); + + scheduler.bind_pod(&mut pod, "node1").await.unwrap(); + + let event = rx.try_recv().unwrap(); + assert!(matches!(event.event_type, WatchEventType::Modified)); + assert_eq!(event.resource_key.name, "event-pod"); + assert_eq!(event.gvk.kind, "Pod"); + + // Verify the event object has the updated node name + let bound_pod: Pod = serde_json::from_value(event.object).unwrap(); + assert_eq!(bound_pod.spec.unwrap().node_name, Some("node1".to_string())); + } + + #[tokio::test] + async fn test_bind_pod_sets_resource_version() { + let (scheduler, _rx) = create_test_scheduler(); + + let mut pod = create_test_pod("version-pod", "default", "1", "1Gi"); + store_pod(&scheduler, &pod); + + scheduler.bind_pod(&mut pod, "node1").await.unwrap(); + + assert!(pod.metadata.resource_version.is_some()); + assert!(!pod.metadata.resource_version.as_ref().unwrap().is_empty()); + } } diff --git a/crates/reddwarf/Cargo.toml b/crates/reddwarf/Cargo.toml index ccf991c..248eaab 100644 --- a/crates/reddwarf/Cargo.toml +++ b/crates/reddwarf/Cargo.toml @@ -19,6 +19,7 @@ reddwarf-apiserver = { workspace = true } reddwarf-scheduler = { workspace = true } reddwarf-runtime = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } clap = { workspace = true } miette = { workspace = true } tracing = { workspace = true } diff --git a/crates/reddwarf/src/main.rs b/crates/reddwarf/src/main.rs index 5ac5945..5bf85f1 100644 --- a/crates/reddwarf/src/main.rs +++ b/crates/reddwarf/src/main.rs @@ -1,5 +1,6 @@ use clap::{Parser, Subcommand}; -use reddwarf_apiserver::{ApiServer, AppState, Config as ApiConfig}; +use reddwarf_apiserver::{ApiError, ApiServer, AppState, Config as ApiConfig}; +use reddwarf_core::Namespace; use reddwarf_runtime::{ ApiClient, EtherstubConfig, MockRuntime, NetworkMode, NodeAgent, NodeAgentConfig, PodController, PodControllerConfig, ZoneBrand, @@ -9,6 +10,7 @@ use reddwarf_scheduler::Scheduler; use reddwarf_storage::RedbBackend; use reddwarf_versioning::VersionStore; use std::sync::Arc; +use tokio_util::sync::CancellationToken; use tracing::{error, info}; #[derive(Parser)] @@ -79,6 +81,8 @@ async fn run_serve(bind: &str, data_dir: &str) -> miette::Result<()> { let state = create_app_state(data_dir)?; + bootstrap_default_namespace(&state).await?; + let config = ApiConfig { listen_addr: bind .parse() @@ -106,6 +110,8 @@ async fn run_agent( let state = create_app_state(data_dir)?; + bootstrap_default_namespace(&state).await?; + let listen_addr: std::net::SocketAddr = bind .parse() .map_err(|e| miette::miette!("Invalid bind address '{}': {}", bind, e))?; @@ -113,12 +119,22 @@ async fn run_agent( // Determine the API URL for internal components to connect to let api_url = format!("http://127.0.0.1:{}", listen_addr.port()); + let token = CancellationToken::new(); + // 1. Spawn API server let api_config = ApiConfig { listen_addr }; let api_server = ApiServer::new(api_config, state.clone()); + let api_token = token.clone(); let api_handle = tokio::spawn(async move { - if let Err(e) = api_server.run().await { - error!("API server error: {}", e); + tokio::select! { + result = api_server.run() => { + if let Err(e) = result { + error!("API server error: {}", e); + } + } + _ = api_token.cancelled() => { + info!("API server shutting down"); + } } }); @@ -126,9 +142,15 @@ async fn run_agent( tokio::time::sleep(std::time::Duration::from_millis(500)).await; // 2. Spawn scheduler - let scheduler = Scheduler::new(state.storage.clone(), SchedulerConfig::default()); + let scheduler = Scheduler::new( + state.storage.clone(), + state.version_store.clone(), + state.event_tx.clone(), + SchedulerConfig::default(), + ); + let scheduler_token = token.clone(); let scheduler_handle = tokio::spawn(async move { - if let Err(e) = scheduler.run().await { + if let Err(e) = scheduler.run(scheduler_token).await { error!("Scheduler error: {}", e); } }); @@ -152,9 +174,15 @@ async fn run_agent( }), }; - let controller = PodController::new(runtime, api_client.clone(), controller_config); + let controller = PodController::new( + runtime, + api_client.clone(), + state.event_tx.clone(), + controller_config, + ); + let controller_token = token.clone(); let controller_handle = tokio::spawn(async move { - if let Err(e) = controller.run().await { + if let Err(e) = controller.run(controller_token).await { error!("Pod controller error: {}", e); } }); @@ -162,8 +190,9 @@ async fn run_agent( // 5. Spawn node agent let node_agent_config = NodeAgentConfig::new(node_name.to_string(), api_url); let node_agent = NodeAgent::new(api_client, node_agent_config); + let agent_token = token.clone(); let node_agent_handle = tokio::spawn(async move { - if let Err(e) = node_agent.run().await { + if let Err(e) = node_agent.run(agent_token).await { error!("Node agent error: {}", e); } }); @@ -178,14 +207,45 @@ async fn run_agent( .await .map_err(|e| miette::miette!("Failed to listen for ctrl-c: {}", e))?; - info!("Shutting down..."); + info!("Shutting down gracefully..."); + token.cancel(); - // Abort all tasks - api_handle.abort(); - scheduler_handle.abort(); - controller_handle.abort(); - node_agent_handle.abort(); + // Wait for all tasks to finish with a timeout + let shutdown_timeout = std::time::Duration::from_secs(5); + let _ = tokio::time::timeout(shutdown_timeout, async { + let _ = tokio::join!( + api_handle, + scheduler_handle, + controller_handle, + node_agent_handle, + ); + }) + .await; + info!("Shutdown complete"); + + Ok(()) +} + +/// Bootstrap the "default" namespace if it doesn't already exist +async fn bootstrap_default_namespace(state: &AppState) -> miette::Result<()> { + use reddwarf_apiserver::handlers::common::create_resource; + + let mut ns = Namespace::default(); + ns.metadata.name = Some("default".to_string()); + + match create_resource(state, ns).await { + Ok(_) => info!("Created default namespace"), + Err(ApiError::AlreadyExists(_)) => { + // Already exists — fine + } + Err(e) => { + return Err(miette::miette!( + "Failed to bootstrap default namespace: {:?}", + e + )) + } + } Ok(()) }