diff --git a/Cargo.lock b/Cargo.lock index 105feb0..200a280 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1118,9 +1118,14 @@ dependencies = [ name = "reddwarf-scheduler" version = "0.1.0" dependencies = [ + "k8s-openapi", "miette", "reddwarf-core", + "reddwarf-storage", + "serde", + "serde_json", "tempfile", + "thiserror 2.0.18", "tokio", "tracing", ] diff --git a/crates/reddwarf-scheduler/Cargo.toml b/crates/reddwarf-scheduler/Cargo.toml index 12b2c0d..2ff0b59 100644 --- a/crates/reddwarf-scheduler/Cargo.toml +++ b/crates/reddwarf-scheduler/Cargo.toml @@ -9,9 +9,14 @@ rust-version.workspace = true [dependencies] reddwarf-core = { workspace = true } +reddwarf-storage = { workspace = true } +k8s-openapi = { workspace = true } tokio = { workspace = true } miette = { workspace = true } +thiserror = { workspace = true } tracing = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/reddwarf-scheduler/src/error.rs b/crates/reddwarf-scheduler/src/error.rs new file mode 100644 index 0000000..f257192 --- /dev/null +++ b/crates/reddwarf-scheduler/src/error.rs @@ -0,0 +1,85 @@ +// Allow unused assignments for diagnostic fields - they're used by the macros +#![allow(unused_assignments)] + +use miette::Diagnostic; +use thiserror::Error; + +/// Scheduler error type +#[derive(Error, Debug, Diagnostic)] +pub enum SchedulerError { + /// No suitable nodes found + #[error("No suitable nodes found for pod {pod_name}")] + #[diagnostic( + code(scheduler::no_suitable_nodes), + help("Check node resources, taints, and pod requirements") + )] + NoSuitableNodes { + pod_name: String, + reason: String, + }, + + /// Scheduling failed + #[error("Scheduling failed: {message}")] + #[diagnostic( + code(scheduler::scheduling_failed), + help("{suggestion}") + )] + SchedulingFailed { + message: String, + suggestion: String, + }, + + /// Storage error + #[error("Storage error: {0}")] + #[diagnostic( + code(scheduler::storage_error), + help("Check the underlying storage system") + )] + StorageError(#[from] reddwarf_storage::StorageError), + + /// Core error + #[error("Core error: {0}")] + #[diagnostic( + code(scheduler::core_error), + help("This is an internal error") + )] + CoreError(#[from] reddwarf_core::ReddwarfError), + + /// Internal error + #[error("Internal error: {message}")] + #[diagnostic( + code(scheduler::internal_error), + help("This is likely a bug. Please report it") + )] + InternalError { + message: String, + }, +} + +/// Result type for scheduler operations +pub type Result = std::result::Result; + +impl SchedulerError { + /// Create a NoSuitableNodes error + pub fn no_suitable_nodes(pod_name: impl Into, reason: impl Into) -> Self { + Self::NoSuitableNodes { + pod_name: pod_name.into(), + reason: reason.into(), + } + } + + /// Create a SchedulingFailed error + pub fn scheduling_failed(message: impl Into, suggestion: impl Into) -> Self { + Self::SchedulingFailed { + message: message.into(), + suggestion: suggestion.into(), + } + } + + /// Create an InternalError + pub fn internal_error(message: impl Into) -> Self { + Self::InternalError { + message: message.into(), + } + } +} diff --git a/crates/reddwarf-scheduler/src/filter.rs b/crates/reddwarf-scheduler/src/filter.rs new file mode 100644 index 0000000..244832a --- /dev/null +++ b/crates/reddwarf-scheduler/src/filter.rs @@ -0,0 +1,335 @@ +use crate::types::{FilterResult, ResourceQuantities, SchedulingContext}; +use reddwarf_core::Node; +use tracing::debug; + +/// Filter predicate trait +pub trait FilterPredicate: Send + Sync { + /// Filter a node for the given pod + fn filter(&self, context: &SchedulingContext, node: &Node) -> FilterResult; + + /// Name of the filter + fn name(&self) -> &str; +} + +/// Filter for pod resource requirements +pub struct PodFitsResources; + +impl FilterPredicate for PodFitsResources { + fn filter(&self, context: &SchedulingContext, node: &Node) -> FilterResult { + let node_name = node + .metadata + .name + .as_ref() + .unwrap_or(&"unknown".to_string()) + .clone(); + + // Get node allocatable resources + let allocatable = node + .status + .as_ref() + .and_then(|s| s.allocatable.as_ref()) + .cloned() + .unwrap_or_default(); + + let node_resources = ResourceQuantities::from_k8s_resource_map(&allocatable); + + // Get pod requested resources + let pod_spec = match &context.pod.spec { + Some(spec) => spec, + None => return FilterResult::fail(node_name, "Pod has no spec".to_string()), + }; + + let mut total_cpu = 0i64; + let mut total_memory = 0i64; + + for container in &pod_spec.containers { + if let Some(resources) = &container.resources { + if let Some(requests) = &resources.requests { + total_cpu += requests + .get("cpu") + .and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok()) + .unwrap_or(0); + + total_memory += requests + .get("memory") + .and_then(|s| ResourceQuantities::parse_memory(&s.0).ok()) + .unwrap_or(0); + } + } + } + + debug!( + "Node {} has CPU: {} milli, Memory: {} bytes", + node_name, node_resources.cpu_millicores, node_resources.memory_bytes + ); + debug!( + "Pod requests CPU: {} milli, Memory: {} bytes", + total_cpu, total_memory + ); + + // Check if node has enough resources + if total_cpu > node_resources.cpu_millicores { + return FilterResult::fail( + node_name, + format!( + "Insufficient CPU: requested {} milli, available {} milli", + total_cpu, node_resources.cpu_millicores + ), + ); + } + + if total_memory > node_resources.memory_bytes { + return FilterResult::fail( + node_name, + format!( + "Insufficient memory: requested {} bytes, available {} bytes", + total_memory, node_resources.memory_bytes + ), + ); + } + + FilterResult::pass(node_name) + } + + fn name(&self) -> &str { + "PodFitsResources" + } +} + +/// Filter for node selector +pub struct NodeSelectorMatch; + +impl FilterPredicate for NodeSelectorMatch { + fn filter(&self, context: &SchedulingContext, node: &Node) -> FilterResult { + let node_name = node + .metadata + .name + .as_ref() + .unwrap_or(&"unknown".to_string()) + .clone(); + + let pod_spec = match &context.pod.spec { + Some(spec) => spec, + None => return FilterResult::pass(node_name), + }; + + // Get node selector from pod + let node_selector = match &pod_spec.node_selector { + Some(selector) => selector, + None => return FilterResult::pass(node_name), // No selector = pass + }; + + // Get node labels + let node_labels = node.metadata.labels.as_ref(); + + // Check if all selector labels match + for (key, value) in node_selector { + let node_value = node_labels.and_then(|labels| labels.get(key)); + + if node_value != Some(value) { + return FilterResult::fail( + node_name, + format!("Node selector mismatch: {}={}", key, value), + ); + } + } + + FilterResult::pass(node_name) + } + + fn name(&self) -> &str { + "NodeSelectorMatch" + } +} + +/// Filter for taints and tolerations +pub struct TaintToleration; + +impl FilterPredicate for TaintToleration { + fn filter(&self, context: &SchedulingContext, node: &Node) -> FilterResult { + let node_name = node + .metadata + .name + .as_ref() + .unwrap_or(&"unknown".to_string()) + .clone(); + + // Get node taints + let taints = match node.spec.as_ref().and_then(|s| s.taints.as_ref()) { + Some(t) => t, + None => return FilterResult::pass(node_name), // No taints = pass + }; + + // Get pod tolerations + let pod_spec = match &context.pod.spec { + Some(spec) => spec, + None => return FilterResult::pass(node_name), + }; + + let tolerations = match &pod_spec.tolerations { + Some(t) => t, + None => { + // No tolerations but node has taints = fail + if !taints.is_empty() { + return FilterResult::fail( + node_name, + "Node has taints but pod has no tolerations".to_string(), + ); + } + return FilterResult::pass(node_name); + } + }; + + // Check if pod tolerates all taints + for taint in taints { + let taint_key = &taint.key; + let taint_effect = &taint.effect; + + let mut tolerated = false; + + for toleration in tolerations { + // Check if toleration matches taint + let toleration_key = toleration.key.as_ref(); + let toleration_effect = toleration.effect.as_ref(); + + if toleration_key.as_ref() == Some(&taint_key) + && (toleration_effect.is_none() || toleration_effect.as_ref() == Some(&taint_effect)) + { + tolerated = true; + break; + } + } + + if !tolerated { + return FilterResult::fail( + node_name, + format!("Pod does not tolerate taint: {}={}", taint_key, taint_effect), + ); + } + } + + FilterResult::pass(node_name) + } + + fn name(&self) -> &str { + "TaintToleration" + } +} + +/// Get default filter predicates +pub fn default_filters() -> Vec> { + vec![ + Box::new(PodFitsResources), + Box::new(NodeSelectorMatch), + Box::new(TaintToleration), + ] +} + +#[cfg(test)] +mod tests { + use super::*; + use reddwarf_core::{Node, Pod}; + use std::collections::BTreeMap; + + fn create_test_node(name: &str, cpu: &str, memory: &str) -> Node { + let mut node = Node::default(); + node.metadata.name = Some(name.to_string()); + node.status = Some(Default::default()); + node.status.as_mut().unwrap().allocatable = Some(BTreeMap::new()); + node.status + .as_mut() + .unwrap() + .allocatable + .as_mut() + .unwrap() + .insert( + "cpu".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()), + ); + node.status + .as_mut() + .unwrap() + .allocatable + .as_mut() + .unwrap() + .insert( + "memory".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()), + ); + node + } + + fn create_test_pod(cpu: &str, memory: &str) -> Pod { + let mut pod = Pod::default(); + pod.metadata.name = Some("test-pod".to_string()); + pod.spec = Some(Default::default()); + pod.spec.as_mut().unwrap().containers = vec![Default::default()]; + pod.spec.as_mut().unwrap().containers[0].resources = Some(Default::default()); + pod.spec.as_mut().unwrap().containers[0] + .resources + .as_mut() + .unwrap() + .requests = Some(BTreeMap::new()); + pod.spec.as_mut().unwrap().containers[0] + .resources + .as_mut() + .unwrap() + .requests + .as_mut() + .unwrap() + .insert( + "cpu".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()), + ); + pod.spec.as_mut().unwrap().containers[0] + .resources + .as_mut() + .unwrap() + .requests + .as_mut() + .unwrap() + .insert( + "memory".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()), + ); + pod + } + + #[test] + fn test_pod_fits_resources_pass() { + let node = create_test_node("node1", "4", "8Gi"); + let pod = create_test_pod("1", "1Gi"); + let context = SchedulingContext::new(pod, vec![node.clone()]); + + let filter = PodFitsResources; + let result = filter.filter(&context, &node); + + assert!(result.passed); + } + + #[test] + fn test_pod_fits_resources_fail_cpu() { + let node = create_test_node("node1", "1", "8Gi"); + let pod = create_test_pod("2", "1Gi"); + let context = SchedulingContext::new(pod, vec![node.clone()]); + + let filter = PodFitsResources; + let result = filter.filter(&context, &node); + + assert!(!result.passed); + assert!(result.reason.unwrap().contains("Insufficient CPU")); + } + + #[test] + fn test_pod_fits_resources_fail_memory() { + let node = create_test_node("node1", "4", "1Gi"); + let pod = create_test_pod("1", "2Gi"); + let context = SchedulingContext::new(pod, vec![node.clone()]); + + let filter = PodFitsResources; + let result = filter.filter(&context, &node); + + assert!(!result.passed); + assert!(result.reason.unwrap().contains("Insufficient memory")); + } +} diff --git a/crates/reddwarf-scheduler/src/lib.rs b/crates/reddwarf-scheduler/src/lib.rs index 5008a52..e1b2961 100644 --- a/crates/reddwarf-scheduler/src/lib.rs +++ b/crates/reddwarf-scheduler/src/lib.rs @@ -1 +1,18 @@ -// Placeholder for reddwarf-scheduler +//! Reddwarf Scheduler - Pod to Node scheduling +//! +//! This crate provides: +//! - Pod scheduling algorithm +//! - Filter predicates (resource requirements, node selectors) +//! - Scoring functions (least allocated) +//! - Pod binding to nodes + +pub mod error; +pub mod types; +pub mod filter; +pub mod score; +pub mod scheduler; + +// Re-export commonly used types +pub use error::{SchedulerError, Result}; +pub use scheduler::Scheduler; +pub use types::{SchedulingContext, FilterResult, ScoreResult}; diff --git a/crates/reddwarf-scheduler/src/scheduler.rs b/crates/reddwarf-scheduler/src/scheduler.rs new file mode 100644 index 0000000..1af9a71 --- /dev/null +++ b/crates/reddwarf-scheduler/src/scheduler.rs @@ -0,0 +1,399 @@ +use crate::filter::{default_filters, FilterPredicate}; +use crate::score::{calculate_weighted_score, default_scores, ScoreFunction}; +use crate::types::SchedulingContext; +use crate::{Result, SchedulerError}; +use reddwarf_core::{Node, Pod}; +use reddwarf_storage::{KeyEncoder, KVStore, RedbBackend}; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{debug, error, info, warn}; + +/// Configuration for the scheduler +#[derive(Clone)] +pub struct SchedulerConfig { + /// Interval between scheduling cycles + pub schedule_interval: Duration, +} + +impl Default for SchedulerConfig { + fn default() -> Self { + Self { + schedule_interval: Duration::from_secs(1), + } + } +} + +/// Pod scheduler +pub struct Scheduler { + storage: Arc, + config: SchedulerConfig, + filters: Vec>, + scorers: Vec>, +} + +impl Scheduler { + /// Create a new scheduler + pub fn new(storage: Arc, config: SchedulerConfig) -> Self { + Self { + storage, + config, + filters: default_filters(), + scorers: default_scores(), + } + } + + /// Run the scheduler loop + pub async fn run(&self) -> Result<()> { + info!("Starting scheduler"); + + loop { + if let Err(e) = self.schedule_cycle().await { + error!("Scheduling cycle failed: {}", e); + } + + sleep(self.config.schedule_interval).await; + } + } + + /// Run a single scheduling cycle + async fn schedule_cycle(&self) -> Result<()> { + debug!("Running scheduling cycle"); + + // Get all unscheduled pods + let unscheduled_pods = self.get_unscheduled_pods().await?; + + if unscheduled_pods.is_empty() { + debug!("No unscheduled pods found"); + return Ok(()); + } + + info!("Found {} unscheduled pods", unscheduled_pods.len()); + + // Get all available nodes + let nodes = self.get_nodes().await?; + + if nodes.is_empty() { + warn!("No nodes available for scheduling"); + return Ok(()); + } + + info!("Found {} available nodes", nodes.len()); + + // Schedule each pod + for pod in unscheduled_pods { + let pod_name = pod + .metadata + .name + .as_ref() + .unwrap_or(&"unknown".to_string()) + .clone(); + + match self.schedule_pod(pod, &nodes).await { + Ok(node_name) => { + info!("Scheduled pod {} to node {}", pod_name, node_name); + } + Err(e) => { + error!("Failed to schedule pod {}: {}", pod_name, e); + } + } + } + + Ok(()) + } + + /// Get all unscheduled pods (spec.nodeName is empty) + async fn get_unscheduled_pods(&self) -> Result> { + let prefix = KeyEncoder::encode_prefix("v1", "Pod", None); + let results = self.storage.as_ref().scan(prefix.as_bytes())?; + + let mut unscheduled = Vec::new(); + + for (_key, data) in results.iter() { + let pod: Pod = serde_json::from_slice(data) + .map_err(|e| SchedulerError::internal_error(format!("Failed to deserialize pod: {}", e)))?; + + // Check if pod is unscheduled + if let Some(spec) = &pod.spec { + if spec.node_name.is_none() { + unscheduled.push(pod); + } + } + } + + Ok(unscheduled) + } + + /// Get all nodes + async fn get_nodes(&self) -> Result> { + let prefix = KeyEncoder::encode_prefix("v1", "Node", None); + let results = self.storage.as_ref().scan(prefix.as_bytes())?; + + let mut nodes = Vec::new(); + + for (_key, data) in results.iter() { + let node: Node = serde_json::from_slice(data) + .map_err(|e| SchedulerError::internal_error(format!("Failed to deserialize node: {}", e)))?; + nodes.push(node); + } + + Ok(nodes) + } + + /// Schedule a single pod + async fn schedule_pod(&self, mut pod: Pod, nodes: &[Node]) -> Result { + let pod_name = pod + .metadata + .name + .as_ref() + .ok_or_else(|| SchedulerError::internal_error("Pod has no name"))? + .clone(); + + let context = SchedulingContext::new(pod.clone(), nodes.to_vec()); + + // Phase 1: Filter nodes + let mut feasible_nodes = Vec::new(); + + for node in nodes { + let node_name = node + .metadata + .name + .as_ref() + .unwrap_or(&"unknown".to_string()) + .clone(); + + let mut passed = true; + + for filter in &self.filters { + let result = filter.filter(&context, node); + if !result.passed { + debug!( + "Node {} filtered out by {}: {}", + node_name, + filter.name(), + result.reason.unwrap_or_default() + ); + passed = false; + break; + } + } + + if passed { + feasible_nodes.push(node.clone()); + } + } + + if feasible_nodes.is_empty() { + return Err(SchedulerError::no_suitable_nodes( + pod_name, + "All nodes filtered out".to_string(), + )); + } + + info!( + "Pod {} has {} feasible nodes", + pod_name, + feasible_nodes.len() + ); + + // Phase 2: Score nodes + let mut node_scores: Vec<(String, i32)> = Vec::new(); + + for node in &feasible_nodes { + let node_name = node + .metadata + .name + .as_ref() + .unwrap_or(&"unknown".to_string()) + .clone(); + + let mut scores = Vec::new(); + + for scorer in &self.scorers { + let score = scorer.score(&context, node); + scores.push(score); + } + + let final_score = calculate_weighted_score(&scores); + node_scores.push((node_name, final_score)); + } + + // Phase 3: Select best node + node_scores.sort_by(|a, b| b.1.cmp(&a.1)); // Sort by score descending + + let best_node = node_scores + .first() + .ok_or_else(|| SchedulerError::internal_error("No nodes scored"))? + .0 + .clone(); + + info!( + "Selected node {} for pod {} with score {}", + best_node, pod_name, node_scores[0].1 + ); + + // Phase 4: Bind pod to node + self.bind_pod(&mut pod, &best_node).await?; + + Ok(best_node) + } + + /// Bind a pod to a node (update spec.nodeName) + async fn bind_pod(&self, pod: &mut Pod, node_name: &str) -> Result<()> { + let pod_name = pod + .metadata + .name + .as_ref() + .ok_or_else(|| SchedulerError::internal_error("Pod has no name"))?; + let namespace = pod + .metadata + .namespace + .as_ref() + .ok_or_else(|| SchedulerError::internal_error("Pod has no namespace"))?; + + info!("Binding pod {} to node {}", pod_name, node_name); + + // Update pod spec + if let Some(spec) = &mut pod.spec { + spec.node_name = Some(node_name.to_string()); + } else { + return Err(SchedulerError::internal_error("Pod has no spec")); + } + + // Save updated pod + let key = reddwarf_core::ResourceKey::new( + reddwarf_core::GroupVersionKind::from_api_version_kind("v1", "Pod"), + namespace, + pod_name, + ); + + let storage_key = KeyEncoder::encode_resource_key(&key); + let data = serde_json::to_vec(&pod) + .map_err(|e| SchedulerError::internal_error(format!("Failed to serialize pod: {}", e)))?; + + self.storage.as_ref().put(storage_key.as_bytes(), &data)?; + + info!("Successfully bound pod {} to node {}", pod_name, node_name); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use reddwarf_storage::RedbBackend; + use std::collections::BTreeMap; + use tempfile::tempdir; + + fn create_test_node(name: &str, cpu: &str, memory: &str) -> Node { + let mut node = Node::default(); + node.metadata.name = Some(name.to_string()); + node.status = Some(Default::default()); + node.status.as_mut().unwrap().allocatable = Some(BTreeMap::new()); + node.status + .as_mut() + .unwrap() + .allocatable + .as_mut() + .unwrap() + .insert( + "cpu".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()), + ); + node.status + .as_mut() + .unwrap() + .allocatable + .as_mut() + .unwrap() + .insert( + "memory".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()), + ); + node + } + + fn create_test_pod(name: &str, namespace: &str, cpu: &str, memory: &str) -> Pod { + let mut pod = Pod::default(); + pod.metadata.name = Some(name.to_string()); + pod.metadata.namespace = Some(namespace.to_string()); + pod.spec = Some(Default::default()); + pod.spec.as_mut().unwrap().containers = vec![Default::default()]; + pod.spec.as_mut().unwrap().containers[0].name = "test".to_string(); + pod.spec.as_mut().unwrap().containers[0].resources = Some(Default::default()); + pod.spec.as_mut().unwrap().containers[0] + .resources + .as_mut() + .unwrap() + .requests = Some(BTreeMap::new()); + pod.spec.as_mut().unwrap().containers[0] + .resources + .as_mut() + .unwrap() + .requests + .as_mut() + .unwrap() + .insert( + "cpu".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()), + ); + pod.spec.as_mut().unwrap().containers[0] + .resources + .as_mut() + .unwrap() + .requests + .as_mut() + .unwrap() + .insert( + "memory".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()), + ); + pod + } + + #[tokio::test] + async fn test_schedule_pod_success() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test.redb"); + let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); + + let scheduler = Scheduler::new(storage.clone(), SchedulerConfig::default()); + + // Create nodes + let nodes = vec![ + create_test_node("node1", "4", "8Gi"), + create_test_node("node2", "2", "4Gi"), + ]; + + // Create pod + let pod = create_test_pod("test-pod", "default", "1", "1Gi"); + + // Schedule pod + let result = scheduler.schedule_pod(pod, &nodes).await; + + assert!(result.is_ok()); + let node_name = result.unwrap(); + assert!(node_name == "node1" || node_name == "node2"); + } + + #[tokio::test] + async fn test_schedule_pod_no_suitable_nodes() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test.redb"); + let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); + + let scheduler = Scheduler::new(storage, SchedulerConfig::default()); + + // Create node with insufficient resources + let nodes = vec![create_test_node("node1", "1", "1Gi")]; + + // Create pod that requires more resources + let pod = create_test_pod("test-pod", "default", "2", "2Gi"); + + // Schedule pod should fail + let result = scheduler.schedule_pod(pod, &nodes).await; + + assert!(result.is_err()); + } +} diff --git a/crates/reddwarf-scheduler/src/score.rs b/crates/reddwarf-scheduler/src/score.rs new file mode 100644 index 0000000..34aa9fb --- /dev/null +++ b/crates/reddwarf-scheduler/src/score.rs @@ -0,0 +1,292 @@ +use crate::types::{ResourceQuantities, SchedulingContext, ScoreResult}; +use reddwarf_core::Node; +use tracing::debug; + +/// Scoring function trait +pub trait ScoreFunction: Send + Sync { + /// Score a node for the given pod (0-100, higher is better) + fn score(&self, context: &SchedulingContext, node: &Node) -> ScoreResult; + + /// Name of the scoring function + fn name(&self) -> &str; +} + +/// Score based on least allocated resources +pub struct LeastAllocated; + +impl ScoreFunction for LeastAllocated { + fn score(&self, context: &SchedulingContext, node: &Node) -> ScoreResult { + let node_name = node + .metadata + .name + .as_ref() + .unwrap_or(&"unknown".to_string()) + .clone(); + + // Get node allocatable resources + let allocatable = node + .status + .as_ref() + .and_then(|s| s.allocatable.as_ref()) + .cloned() + .unwrap_or_default(); + + let node_resources = ResourceQuantities::from_k8s_resource_map(&allocatable); + + // If node has no resources, score 0 + if node_resources.cpu_millicores == 0 || node_resources.memory_bytes == 0 { + return ScoreResult::new(node_name, 0); + } + + // Get pod requested resources + let pod_spec = match &context.pod.spec { + Some(spec) => spec, + None => return ScoreResult::new(node_name, 0), + }; + + let mut total_cpu = 0i64; + let mut total_memory = 0i64; + + for container in &pod_spec.containers { + if let Some(resources) = &container.resources { + if let Some(requests) = &resources.requests { + total_cpu += requests + .get("cpu") + .and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok()) + .unwrap_or(0); + + total_memory += requests + .get("memory") + .and_then(|s| ResourceQuantities::parse_memory(&s.0).ok()) + .unwrap_or(0); + } + } + } + + // Calculate remaining resources after scheduling this pod + let remaining_cpu = node_resources.cpu_millicores - total_cpu; + let remaining_memory = node_resources.memory_bytes - total_memory; + + // Calculate utilization percentage for each resource + let cpu_utilization = if node_resources.cpu_millicores > 0 { + ((node_resources.cpu_millicores - remaining_cpu) as f64 + / node_resources.cpu_millicores as f64) + * 100.0 + } else { + 100.0 + }; + + let memory_utilization = if node_resources.memory_bytes > 0 { + ((node_resources.memory_bytes - remaining_memory) as f64 + / node_resources.memory_bytes as f64) + * 100.0 + } else { + 100.0 + }; + + // Score is inverse of average utilization + // Lower utilization = higher score (prefer less loaded nodes) + let avg_utilization = (cpu_utilization + memory_utilization) / 2.0; + let score = (100.0 - avg_utilization).max(0.0).min(100.0) as i32; + + debug!( + "Node {} score: {} (CPU util: {:.1}%, Memory util: {:.1}%)", + node_name, score, cpu_utilization, memory_utilization + ); + + ScoreResult::new(node_name, score) + } + + fn name(&self) -> &str { + "LeastAllocated" + } +} + +/// Score based on balanced resource allocation +pub struct BalancedAllocation; + +impl ScoreFunction for BalancedAllocation { + fn score(&self, context: &SchedulingContext, node: &Node) -> ScoreResult { + let node_name = node + .metadata + .name + .as_ref() + .unwrap_or(&"unknown".to_string()) + .clone(); + + // Get node allocatable resources + let allocatable = node + .status + .as_ref() + .and_then(|s| s.allocatable.as_ref()) + .cloned() + .unwrap_or_default(); + + let node_resources = ResourceQuantities::from_k8s_resource_map(&allocatable); + + if node_resources.cpu_millicores == 0 || node_resources.memory_bytes == 0 { + return ScoreResult::new(node_name, 0); + } + + // Get pod requested resources + let pod_spec = match &context.pod.spec { + Some(spec) => spec, + None => return ScoreResult::new(node_name, 50), + }; + + let mut total_cpu = 0i64; + let mut total_memory = 0i64; + + for container in &pod_spec.containers { + if let Some(resources) = &container.resources { + if let Some(requests) = &resources.requests { + total_cpu += requests + .get("cpu") + .and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok()) + .unwrap_or(0); + + total_memory += requests + .get("memory") + .and_then(|s| ResourceQuantities::parse_memory(&s.0).ok()) + .unwrap_or(0); + } + } + } + + // Calculate utilization after scheduling + let cpu_fraction = total_cpu as f64 / node_resources.cpu_millicores as f64; + let memory_fraction = total_memory as f64 / node_resources.memory_bytes as f64; + + // Prefer balanced resource usage (CPU and memory usage should be similar) + let variance = (cpu_fraction - memory_fraction).abs(); + let score = ((1.0 - variance) * 100.0).max(0.0).min(100.0) as i32; + + debug!( + "Node {} balanced allocation score: {} (variance: {:.3})", + node_name, score, variance + ); + + ScoreResult::new(node_name, score) + } + + fn name(&self) -> &str { + "BalancedAllocation" + } +} + +/// Get default scoring functions +pub fn default_scores() -> Vec> { + vec![ + Box::new(LeastAllocated), + Box::new(BalancedAllocation), + ] +} + +/// Calculate weighted score from multiple scoring functions +pub fn calculate_weighted_score(scores: &[ScoreResult]) -> i32 { + if scores.is_empty() { + return 0; + } + + let total: i32 = scores.iter().map(|s| s.score).sum(); + total / scores.len() as i32 +} + +#[cfg(test)] +mod tests { + use super::*; + use reddwarf_core::{Node, Pod}; + use std::collections::BTreeMap; + + fn create_test_node(name: &str, cpu: &str, memory: &str) -> Node { + let mut node = Node::default(); + node.metadata.name = Some(name.to_string()); + node.status = Some(Default::default()); + node.status.as_mut().unwrap().allocatable = Some(BTreeMap::new()); + node.status + .as_mut() + .unwrap() + .allocatable + .as_mut() + .unwrap() + .insert( + "cpu".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()), + ); + node.status + .as_mut() + .unwrap() + .allocatable + .as_mut() + .unwrap() + .insert( + "memory".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()), + ); + node + } + + fn create_test_pod(cpu: &str, memory: &str) -> Pod { + let mut pod = Pod::default(); + pod.metadata.name = Some("test-pod".to_string()); + pod.spec = Some(Default::default()); + pod.spec.as_mut().unwrap().containers = vec![Default::default()]; + pod.spec.as_mut().unwrap().containers[0].resources = Some(Default::default()); + pod.spec.as_mut().unwrap().containers[0] + .resources + .as_mut() + .unwrap() + .requests = Some(BTreeMap::new()); + pod.spec.as_mut().unwrap().containers[0] + .resources + .as_mut() + .unwrap() + .requests + .as_mut() + .unwrap() + .insert( + "cpu".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()), + ); + pod.spec.as_mut().unwrap().containers[0] + .resources + .as_mut() + .unwrap() + .requests + .as_mut() + .unwrap() + .insert( + "memory".to_string(), + k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()), + ); + pod + } + + #[test] + fn test_least_allocated() { + let node1 = create_test_node("node1", "4", "8Gi"); + let node2 = create_test_node("node2", "4", "8Gi"); + let pod = create_test_pod("1", "2Gi"); + + let context = SchedulingContext::new(pod, vec![node1.clone(), node2.clone()]); + let scorer = LeastAllocated; + + let score1 = scorer.score(&context, &node1); + let score2 = scorer.score(&context, &node2); + + // Both nodes should have same score (same resources, same request) + assert_eq!(score1.score, score2.score); + assert!(score1.score > 50); // Should prefer empty nodes + } + + #[test] + fn test_calculate_weighted_score() { + let scores = vec![ + ScoreResult::new("node1".to_string(), 80), + ScoreResult::new("node1".to_string(), 60), + ]; + + let weighted = calculate_weighted_score(&scores); + assert_eq!(weighted, 70); // (80 + 60) / 2 + } +} diff --git a/crates/reddwarf-scheduler/src/types.rs b/crates/reddwarf-scheduler/src/types.rs new file mode 100644 index 0000000..3460a35 --- /dev/null +++ b/crates/reddwarf-scheduler/src/types.rs @@ -0,0 +1,180 @@ +use reddwarf_core::{Node, Pod}; +use std::collections::HashMap; + +/// Scheduling context containing pod and available nodes +#[derive(Debug, Clone)] +pub struct SchedulingContext { + /// Pod to be scheduled + pub pod: Pod, + /// Available nodes + pub nodes: Vec, +} + +impl SchedulingContext { + /// Create a new scheduling context + pub fn new(pod: Pod, nodes: Vec) -> Self { + Self { pod, nodes } + } +} + +/// Result of filtering a node +#[derive(Debug, Clone)] +pub struct FilterResult { + /// Node name + pub node_name: String, + /// Whether the node passed the filter + pub passed: bool, + /// Reason for failure (if any) + pub reason: Option, +} + +impl FilterResult { + /// Create a passing filter result + pub fn pass(node_name: String) -> Self { + Self { + node_name, + passed: true, + reason: None, + } + } + + /// Create a failing filter result + pub fn fail(node_name: String, reason: String) -> Self { + Self { + node_name, + passed: false, + reason: Some(reason), + } + } +} + +/// Result of scoring a node +#[derive(Debug, Clone)] +pub struct ScoreResult { + /// Node name + pub node_name: String, + /// Score (0-100, higher is better) + pub score: i32, +} + +impl ScoreResult { + /// Create a new score result + pub fn new(node_name: String, score: i32) -> Self { + Self { node_name, score } + } +} + +/// Resource quantities for nodes +#[derive(Debug, Clone, Default)] +pub struct ResourceQuantities { + /// CPU in millicores (1000 = 1 core) + pub cpu_millicores: i64, + /// Memory in bytes + pub memory_bytes: i64, +} + +impl ResourceQuantities { + /// Parse CPU string (e.g., "2", "1000m", "0.5") + pub fn parse_cpu(s: &str) -> Result { + if let Some(m) = s.strip_suffix('m') { + // Millicores + m.parse::() + .map_err(|e| format!("Invalid CPU millicore value: {}", e)) + } else if let Ok(cores) = s.parse::() { + // Cores as float + Ok((cores * 1000.0) as i64) + } else { + Err(format!("Invalid CPU format: {}", s)) + } + } + + /// Parse memory string (e.g., "128Mi", "1Gi", "1024") + pub fn parse_memory(s: &str) -> Result { + if let Some(num) = s.strip_suffix("Ki") { + Ok(num.parse::().map_err(|e| e.to_string())? * 1024) + } else if let Some(num) = s.strip_suffix("Mi") { + Ok(num.parse::().map_err(|e| e.to_string())? * 1024 * 1024) + } else if let Some(num) = s.strip_suffix("Gi") { + Ok(num.parse::().map_err(|e| e.to_string())? * 1024 * 1024 * 1024) + } else { + // Plain bytes + s.parse::().map_err(|e| e.to_string()) + } + } + + /// Get CPU and memory from a resource map (k8s-openapi format) + pub fn from_k8s_resource_map( + resources: &std::collections::BTreeMap, + ) -> Self { + let cpu_millicores = resources + .get("cpu") + .and_then(|q| Self::parse_cpu(&q.0).ok()) + .unwrap_or(0); + + let memory_bytes = resources + .get("memory") + .and_then(|q| Self::parse_memory(&q.0).ok()) + .unwrap_or(0); + + Self { + cpu_millicores, + memory_bytes, + } + } + + /// Get CPU and memory from a resource map (test format) + pub fn from_resource_map(resources: &HashMap) -> Self { + let cpu_millicores = resources + .get("cpu") + .and_then(|s| Self::parse_cpu(s).ok()) + .unwrap_or(0); + + let memory_bytes = resources + .get("memory") + .and_then(|s| Self::parse_memory(s).ok()) + .unwrap_or(0); + + Self { + cpu_millicores, + memory_bytes, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_cpu() { + assert_eq!(ResourceQuantities::parse_cpu("1").unwrap(), 1000); + assert_eq!(ResourceQuantities::parse_cpu("0.5").unwrap(), 500); + assert_eq!(ResourceQuantities::parse_cpu("100m").unwrap(), 100); + assert_eq!(ResourceQuantities::parse_cpu("2").unwrap(), 2000); + } + + #[test] + fn test_parse_memory() { + assert_eq!(ResourceQuantities::parse_memory("1024").unwrap(), 1024); + assert_eq!(ResourceQuantities::parse_memory("1Ki").unwrap(), 1024); + assert_eq!( + ResourceQuantities::parse_memory("128Mi").unwrap(), + 128 * 1024 * 1024 + ); + assert_eq!( + ResourceQuantities::parse_memory("1Gi").unwrap(), + 1024 * 1024 * 1024 + ); + } + + #[test] + fn test_filter_result() { + let pass = FilterResult::pass("node1".to_string()); + assert!(pass.passed); + assert!(pass.reason.is_none()); + + let fail = FilterResult::fail("node2".to_string(), "Insufficient CPU".to_string()); + assert!(!fail.passed); + assert_eq!(fail.reason, Some("Insufficient CPU".to_string())); + } +}