implement next phase partial

Signed-off-by: Till Wegmueller <toasterson@gmail.com>
This commit is contained in:
Till Wegmueller 2026-01-28 23:16:43 +01:00
parent 149321f092
commit 205f040407
No known key found for this signature in database
8 changed files with 1319 additions and 1 deletions

5
Cargo.lock generated
View file

@ -1118,9 +1118,14 @@ dependencies = [
name = "reddwarf-scheduler"
version = "0.1.0"
dependencies = [
"k8s-openapi",
"miette",
"reddwarf-core",
"reddwarf-storage",
"serde",
"serde_json",
"tempfile",
"thiserror 2.0.18",
"tokio",
"tracing",
]

View file

@ -9,9 +9,14 @@ rust-version.workspace = true
[dependencies]
reddwarf-core = { workspace = true }
reddwarf-storage = { workspace = true }
k8s-openapi = { workspace = true }
tokio = { workspace = true }
miette = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

View file

@ -0,0 +1,85 @@
// Allow unused assignments for diagnostic fields - they're used by the macros
#![allow(unused_assignments)]
use miette::Diagnostic;
use thiserror::Error;
/// Scheduler error type
#[derive(Error, Debug, Diagnostic)]
pub enum SchedulerError {
/// No suitable nodes found
#[error("No suitable nodes found for pod {pod_name}")]
#[diagnostic(
code(scheduler::no_suitable_nodes),
help("Check node resources, taints, and pod requirements")
)]
NoSuitableNodes {
pod_name: String,
reason: String,
},
/// Scheduling failed
#[error("Scheduling failed: {message}")]
#[diagnostic(
code(scheduler::scheduling_failed),
help("{suggestion}")
)]
SchedulingFailed {
message: String,
suggestion: String,
},
/// Storage error
#[error("Storage error: {0}")]
#[diagnostic(
code(scheduler::storage_error),
help("Check the underlying storage system")
)]
StorageError(#[from] reddwarf_storage::StorageError),
/// Core error
#[error("Core error: {0}")]
#[diagnostic(
code(scheduler::core_error),
help("This is an internal error")
)]
CoreError(#[from] reddwarf_core::ReddwarfError),
/// Internal error
#[error("Internal error: {message}")]
#[diagnostic(
code(scheduler::internal_error),
help("This is likely a bug. Please report it")
)]
InternalError {
message: String,
},
}
/// Result type for scheduler operations
pub type Result<T> = std::result::Result<T, SchedulerError>;
impl SchedulerError {
/// Create a NoSuitableNodes error
pub fn no_suitable_nodes(pod_name: impl Into<String>, reason: impl Into<String>) -> Self {
Self::NoSuitableNodes {
pod_name: pod_name.into(),
reason: reason.into(),
}
}
/// Create a SchedulingFailed error
pub fn scheduling_failed(message: impl Into<String>, suggestion: impl Into<String>) -> Self {
Self::SchedulingFailed {
message: message.into(),
suggestion: suggestion.into(),
}
}
/// Create an InternalError
pub fn internal_error(message: impl Into<String>) -> Self {
Self::InternalError {
message: message.into(),
}
}
}

View file

@ -0,0 +1,335 @@
use crate::types::{FilterResult, ResourceQuantities, SchedulingContext};
use reddwarf_core::Node;
use tracing::debug;
/// Filter predicate trait
pub trait FilterPredicate: Send + Sync {
/// Filter a node for the given pod
fn filter(&self, context: &SchedulingContext, node: &Node) -> FilterResult;
/// Name of the filter
fn name(&self) -> &str;
}
/// Filter for pod resource requirements
pub struct PodFitsResources;
impl FilterPredicate for PodFitsResources {
fn filter(&self, context: &SchedulingContext, node: &Node) -> FilterResult {
let node_name = node
.metadata
.name
.as_ref()
.unwrap_or(&"unknown".to_string())
.clone();
// Get node allocatable resources
let allocatable = node
.status
.as_ref()
.and_then(|s| s.allocatable.as_ref())
.cloned()
.unwrap_or_default();
let node_resources = ResourceQuantities::from_k8s_resource_map(&allocatable);
// Get pod requested resources
let pod_spec = match &context.pod.spec {
Some(spec) => spec,
None => return FilterResult::fail(node_name, "Pod has no spec".to_string()),
};
let mut total_cpu = 0i64;
let mut total_memory = 0i64;
for container in &pod_spec.containers {
if let Some(resources) = &container.resources {
if let Some(requests) = &resources.requests {
total_cpu += requests
.get("cpu")
.and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok())
.unwrap_or(0);
total_memory += requests
.get("memory")
.and_then(|s| ResourceQuantities::parse_memory(&s.0).ok())
.unwrap_or(0);
}
}
}
debug!(
"Node {} has CPU: {} milli, Memory: {} bytes",
node_name, node_resources.cpu_millicores, node_resources.memory_bytes
);
debug!(
"Pod requests CPU: {} milli, Memory: {} bytes",
total_cpu, total_memory
);
// Check if node has enough resources
if total_cpu > node_resources.cpu_millicores {
return FilterResult::fail(
node_name,
format!(
"Insufficient CPU: requested {} milli, available {} milli",
total_cpu, node_resources.cpu_millicores
),
);
}
if total_memory > node_resources.memory_bytes {
return FilterResult::fail(
node_name,
format!(
"Insufficient memory: requested {} bytes, available {} bytes",
total_memory, node_resources.memory_bytes
),
);
}
FilterResult::pass(node_name)
}
fn name(&self) -> &str {
"PodFitsResources"
}
}
/// Filter for node selector
pub struct NodeSelectorMatch;
impl FilterPredicate for NodeSelectorMatch {
fn filter(&self, context: &SchedulingContext, node: &Node) -> FilterResult {
let node_name = node
.metadata
.name
.as_ref()
.unwrap_or(&"unknown".to_string())
.clone();
let pod_spec = match &context.pod.spec {
Some(spec) => spec,
None => return FilterResult::pass(node_name),
};
// Get node selector from pod
let node_selector = match &pod_spec.node_selector {
Some(selector) => selector,
None => return FilterResult::pass(node_name), // No selector = pass
};
// Get node labels
let node_labels = node.metadata.labels.as_ref();
// Check if all selector labels match
for (key, value) in node_selector {
let node_value = node_labels.and_then(|labels| labels.get(key));
if node_value != Some(value) {
return FilterResult::fail(
node_name,
format!("Node selector mismatch: {}={}", key, value),
);
}
}
FilterResult::pass(node_name)
}
fn name(&self) -> &str {
"NodeSelectorMatch"
}
}
/// Filter for taints and tolerations
pub struct TaintToleration;
impl FilterPredicate for TaintToleration {
fn filter(&self, context: &SchedulingContext, node: &Node) -> FilterResult {
let node_name = node
.metadata
.name
.as_ref()
.unwrap_or(&"unknown".to_string())
.clone();
// Get node taints
let taints = match node.spec.as_ref().and_then(|s| s.taints.as_ref()) {
Some(t) => t,
None => return FilterResult::pass(node_name), // No taints = pass
};
// Get pod tolerations
let pod_spec = match &context.pod.spec {
Some(spec) => spec,
None => return FilterResult::pass(node_name),
};
let tolerations = match &pod_spec.tolerations {
Some(t) => t,
None => {
// No tolerations but node has taints = fail
if !taints.is_empty() {
return FilterResult::fail(
node_name,
"Node has taints but pod has no tolerations".to_string(),
);
}
return FilterResult::pass(node_name);
}
};
// Check if pod tolerates all taints
for taint in taints {
let taint_key = &taint.key;
let taint_effect = &taint.effect;
let mut tolerated = false;
for toleration in tolerations {
// Check if toleration matches taint
let toleration_key = toleration.key.as_ref();
let toleration_effect = toleration.effect.as_ref();
if toleration_key.as_ref() == Some(&taint_key)
&& (toleration_effect.is_none() || toleration_effect.as_ref() == Some(&taint_effect))
{
tolerated = true;
break;
}
}
if !tolerated {
return FilterResult::fail(
node_name,
format!("Pod does not tolerate taint: {}={}", taint_key, taint_effect),
);
}
}
FilterResult::pass(node_name)
}
fn name(&self) -> &str {
"TaintToleration"
}
}
/// Get default filter predicates
pub fn default_filters() -> Vec<Box<dyn FilterPredicate>> {
vec![
Box::new(PodFitsResources),
Box::new(NodeSelectorMatch),
Box::new(TaintToleration),
]
}
#[cfg(test)]
mod tests {
use super::*;
use reddwarf_core::{Node, Pod};
use std::collections::BTreeMap;
fn create_test_node(name: &str, cpu: &str, memory: &str) -> Node {
let mut node = Node::default();
node.metadata.name = Some(name.to_string());
node.status = Some(Default::default());
node.status.as_mut().unwrap().allocatable = Some(BTreeMap::new());
node.status
.as_mut()
.unwrap()
.allocatable
.as_mut()
.unwrap()
.insert(
"cpu".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()),
);
node.status
.as_mut()
.unwrap()
.allocatable
.as_mut()
.unwrap()
.insert(
"memory".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()),
);
node
}
fn create_test_pod(cpu: &str, memory: &str) -> Pod {
let mut pod = Pod::default();
pod.metadata.name = Some("test-pod".to_string());
pod.spec = Some(Default::default());
pod.spec.as_mut().unwrap().containers = vec![Default::default()];
pod.spec.as_mut().unwrap().containers[0].resources = Some(Default::default());
pod.spec.as_mut().unwrap().containers[0]
.resources
.as_mut()
.unwrap()
.requests = Some(BTreeMap::new());
pod.spec.as_mut().unwrap().containers[0]
.resources
.as_mut()
.unwrap()
.requests
.as_mut()
.unwrap()
.insert(
"cpu".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()),
);
pod.spec.as_mut().unwrap().containers[0]
.resources
.as_mut()
.unwrap()
.requests
.as_mut()
.unwrap()
.insert(
"memory".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()),
);
pod
}
#[test]
fn test_pod_fits_resources_pass() {
let node = create_test_node("node1", "4", "8Gi");
let pod = create_test_pod("1", "1Gi");
let context = SchedulingContext::new(pod, vec![node.clone()]);
let filter = PodFitsResources;
let result = filter.filter(&context, &node);
assert!(result.passed);
}
#[test]
fn test_pod_fits_resources_fail_cpu() {
let node = create_test_node("node1", "1", "8Gi");
let pod = create_test_pod("2", "1Gi");
let context = SchedulingContext::new(pod, vec![node.clone()]);
let filter = PodFitsResources;
let result = filter.filter(&context, &node);
assert!(!result.passed);
assert!(result.reason.unwrap().contains("Insufficient CPU"));
}
#[test]
fn test_pod_fits_resources_fail_memory() {
let node = create_test_node("node1", "4", "1Gi");
let pod = create_test_pod("1", "2Gi");
let context = SchedulingContext::new(pod, vec![node.clone()]);
let filter = PodFitsResources;
let result = filter.filter(&context, &node);
assert!(!result.passed);
assert!(result.reason.unwrap().contains("Insufficient memory"));
}
}

View file

@ -1 +1,18 @@
// Placeholder for reddwarf-scheduler
//! Reddwarf Scheduler - Pod to Node scheduling
//!
//! This crate provides:
//! - Pod scheduling algorithm
//! - Filter predicates (resource requirements, node selectors)
//! - Scoring functions (least allocated)
//! - Pod binding to nodes
pub mod error;
pub mod types;
pub mod filter;
pub mod score;
pub mod scheduler;
// Re-export commonly used types
pub use error::{SchedulerError, Result};
pub use scheduler::Scheduler;
pub use types::{SchedulingContext, FilterResult, ScoreResult};

View file

@ -0,0 +1,399 @@
use crate::filter::{default_filters, FilterPredicate};
use crate::score::{calculate_weighted_score, default_scores, ScoreFunction};
use crate::types::SchedulingContext;
use crate::{Result, SchedulerError};
use reddwarf_core::{Node, Pod};
use reddwarf_storage::{KeyEncoder, KVStore, RedbBackend};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
/// Configuration for the scheduler
#[derive(Clone)]
pub struct SchedulerConfig {
/// Interval between scheduling cycles
pub schedule_interval: Duration,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
schedule_interval: Duration::from_secs(1),
}
}
}
/// Pod scheduler
pub struct Scheduler {
storage: Arc<RedbBackend>,
config: SchedulerConfig,
filters: Vec<Box<dyn FilterPredicate>>,
scorers: Vec<Box<dyn ScoreFunction>>,
}
impl Scheduler {
/// Create a new scheduler
pub fn new(storage: Arc<RedbBackend>, config: SchedulerConfig) -> Self {
Self {
storage,
config,
filters: default_filters(),
scorers: default_scores(),
}
}
/// Run the scheduler loop
pub async fn run(&self) -> Result<()> {
info!("Starting scheduler");
loop {
if let Err(e) = self.schedule_cycle().await {
error!("Scheduling cycle failed: {}", e);
}
sleep(self.config.schedule_interval).await;
}
}
/// Run a single scheduling cycle
async fn schedule_cycle(&self) -> Result<()> {
debug!("Running scheduling cycle");
// Get all unscheduled pods
let unscheduled_pods = self.get_unscheduled_pods().await?;
if unscheduled_pods.is_empty() {
debug!("No unscheduled pods found");
return Ok(());
}
info!("Found {} unscheduled pods", unscheduled_pods.len());
// Get all available nodes
let nodes = self.get_nodes().await?;
if nodes.is_empty() {
warn!("No nodes available for scheduling");
return Ok(());
}
info!("Found {} available nodes", nodes.len());
// Schedule each pod
for pod in unscheduled_pods {
let pod_name = pod
.metadata
.name
.as_ref()
.unwrap_or(&"unknown".to_string())
.clone();
match self.schedule_pod(pod, &nodes).await {
Ok(node_name) => {
info!("Scheduled pod {} to node {}", pod_name, node_name);
}
Err(e) => {
error!("Failed to schedule pod {}: {}", pod_name, e);
}
}
}
Ok(())
}
/// Get all unscheduled pods (spec.nodeName is empty)
async fn get_unscheduled_pods(&self) -> Result<Vec<Pod>> {
let prefix = KeyEncoder::encode_prefix("v1", "Pod", None);
let results = self.storage.as_ref().scan(prefix.as_bytes())?;
let mut unscheduled = Vec::new();
for (_key, data) in results.iter() {
let pod: Pod = serde_json::from_slice(data)
.map_err(|e| SchedulerError::internal_error(format!("Failed to deserialize pod: {}", e)))?;
// Check if pod is unscheduled
if let Some(spec) = &pod.spec {
if spec.node_name.is_none() {
unscheduled.push(pod);
}
}
}
Ok(unscheduled)
}
/// Get all nodes
async fn get_nodes(&self) -> Result<Vec<Node>> {
let prefix = KeyEncoder::encode_prefix("v1", "Node", None);
let results = self.storage.as_ref().scan(prefix.as_bytes())?;
let mut nodes = Vec::new();
for (_key, data) in results.iter() {
let node: Node = serde_json::from_slice(data)
.map_err(|e| SchedulerError::internal_error(format!("Failed to deserialize node: {}", e)))?;
nodes.push(node);
}
Ok(nodes)
}
/// Schedule a single pod
async fn schedule_pod(&self, mut pod: Pod, nodes: &[Node]) -> Result<String> {
let pod_name = pod
.metadata
.name
.as_ref()
.ok_or_else(|| SchedulerError::internal_error("Pod has no name"))?
.clone();
let context = SchedulingContext::new(pod.clone(), nodes.to_vec());
// Phase 1: Filter nodes
let mut feasible_nodes = Vec::new();
for node in nodes {
let node_name = node
.metadata
.name
.as_ref()
.unwrap_or(&"unknown".to_string())
.clone();
let mut passed = true;
for filter in &self.filters {
let result = filter.filter(&context, node);
if !result.passed {
debug!(
"Node {} filtered out by {}: {}",
node_name,
filter.name(),
result.reason.unwrap_or_default()
);
passed = false;
break;
}
}
if passed {
feasible_nodes.push(node.clone());
}
}
if feasible_nodes.is_empty() {
return Err(SchedulerError::no_suitable_nodes(
pod_name,
"All nodes filtered out".to_string(),
));
}
info!(
"Pod {} has {} feasible nodes",
pod_name,
feasible_nodes.len()
);
// Phase 2: Score nodes
let mut node_scores: Vec<(String, i32)> = Vec::new();
for node in &feasible_nodes {
let node_name = node
.metadata
.name
.as_ref()
.unwrap_or(&"unknown".to_string())
.clone();
let mut scores = Vec::new();
for scorer in &self.scorers {
let score = scorer.score(&context, node);
scores.push(score);
}
let final_score = calculate_weighted_score(&scores);
node_scores.push((node_name, final_score));
}
// Phase 3: Select best node
node_scores.sort_by(|a, b| b.1.cmp(&a.1)); // Sort by score descending
let best_node = node_scores
.first()
.ok_or_else(|| SchedulerError::internal_error("No nodes scored"))?
.0
.clone();
info!(
"Selected node {} for pod {} with score {}",
best_node, pod_name, node_scores[0].1
);
// Phase 4: Bind pod to node
self.bind_pod(&mut pod, &best_node).await?;
Ok(best_node)
}
/// Bind a pod to a node (update spec.nodeName)
async fn bind_pod(&self, pod: &mut Pod, node_name: &str) -> Result<()> {
let pod_name = pod
.metadata
.name
.as_ref()
.ok_or_else(|| SchedulerError::internal_error("Pod has no name"))?;
let namespace = pod
.metadata
.namespace
.as_ref()
.ok_or_else(|| SchedulerError::internal_error("Pod has no namespace"))?;
info!("Binding pod {} to node {}", pod_name, node_name);
// Update pod spec
if let Some(spec) = &mut pod.spec {
spec.node_name = Some(node_name.to_string());
} else {
return Err(SchedulerError::internal_error("Pod has no spec"));
}
// Save updated pod
let key = reddwarf_core::ResourceKey::new(
reddwarf_core::GroupVersionKind::from_api_version_kind("v1", "Pod"),
namespace,
pod_name,
);
let storage_key = KeyEncoder::encode_resource_key(&key);
let data = serde_json::to_vec(&pod)
.map_err(|e| SchedulerError::internal_error(format!("Failed to serialize pod: {}", e)))?;
self.storage.as_ref().put(storage_key.as_bytes(), &data)?;
info!("Successfully bound pod {} to node {}", pod_name, node_name);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use reddwarf_storage::RedbBackend;
use std::collections::BTreeMap;
use tempfile::tempdir;
fn create_test_node(name: &str, cpu: &str, memory: &str) -> Node {
let mut node = Node::default();
node.metadata.name = Some(name.to_string());
node.status = Some(Default::default());
node.status.as_mut().unwrap().allocatable = Some(BTreeMap::new());
node.status
.as_mut()
.unwrap()
.allocatable
.as_mut()
.unwrap()
.insert(
"cpu".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()),
);
node.status
.as_mut()
.unwrap()
.allocatable
.as_mut()
.unwrap()
.insert(
"memory".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()),
);
node
}
fn create_test_pod(name: &str, namespace: &str, cpu: &str, memory: &str) -> Pod {
let mut pod = Pod::default();
pod.metadata.name = Some(name.to_string());
pod.metadata.namespace = Some(namespace.to_string());
pod.spec = Some(Default::default());
pod.spec.as_mut().unwrap().containers = vec![Default::default()];
pod.spec.as_mut().unwrap().containers[0].name = "test".to_string();
pod.spec.as_mut().unwrap().containers[0].resources = Some(Default::default());
pod.spec.as_mut().unwrap().containers[0]
.resources
.as_mut()
.unwrap()
.requests = Some(BTreeMap::new());
pod.spec.as_mut().unwrap().containers[0]
.resources
.as_mut()
.unwrap()
.requests
.as_mut()
.unwrap()
.insert(
"cpu".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()),
);
pod.spec.as_mut().unwrap().containers[0]
.resources
.as_mut()
.unwrap()
.requests
.as_mut()
.unwrap()
.insert(
"memory".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()),
);
pod
}
#[tokio::test]
async fn test_schedule_pod_success() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.redb");
let storage = Arc::new(RedbBackend::new(&db_path).unwrap());
let scheduler = Scheduler::new(storage.clone(), SchedulerConfig::default());
// Create nodes
let nodes = vec![
create_test_node("node1", "4", "8Gi"),
create_test_node("node2", "2", "4Gi"),
];
// Create pod
let pod = create_test_pod("test-pod", "default", "1", "1Gi");
// Schedule pod
let result = scheduler.schedule_pod(pod, &nodes).await;
assert!(result.is_ok());
let node_name = result.unwrap();
assert!(node_name == "node1" || node_name == "node2");
}
#[tokio::test]
async fn test_schedule_pod_no_suitable_nodes() {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.redb");
let storage = Arc::new(RedbBackend::new(&db_path).unwrap());
let scheduler = Scheduler::new(storage, SchedulerConfig::default());
// Create node with insufficient resources
let nodes = vec![create_test_node("node1", "1", "1Gi")];
// Create pod that requires more resources
let pod = create_test_pod("test-pod", "default", "2", "2Gi");
// Schedule pod should fail
let result = scheduler.schedule_pod(pod, &nodes).await;
assert!(result.is_err());
}
}

View file

@ -0,0 +1,292 @@
use crate::types::{ResourceQuantities, SchedulingContext, ScoreResult};
use reddwarf_core::Node;
use tracing::debug;
/// Scoring function trait
pub trait ScoreFunction: Send + Sync {
/// Score a node for the given pod (0-100, higher is better)
fn score(&self, context: &SchedulingContext, node: &Node) -> ScoreResult;
/// Name of the scoring function
fn name(&self) -> &str;
}
/// Score based on least allocated resources
pub struct LeastAllocated;
impl ScoreFunction for LeastAllocated {
fn score(&self, context: &SchedulingContext, node: &Node) -> ScoreResult {
let node_name = node
.metadata
.name
.as_ref()
.unwrap_or(&"unknown".to_string())
.clone();
// Get node allocatable resources
let allocatable = node
.status
.as_ref()
.and_then(|s| s.allocatable.as_ref())
.cloned()
.unwrap_or_default();
let node_resources = ResourceQuantities::from_k8s_resource_map(&allocatable);
// If node has no resources, score 0
if node_resources.cpu_millicores == 0 || node_resources.memory_bytes == 0 {
return ScoreResult::new(node_name, 0);
}
// Get pod requested resources
let pod_spec = match &context.pod.spec {
Some(spec) => spec,
None => return ScoreResult::new(node_name, 0),
};
let mut total_cpu = 0i64;
let mut total_memory = 0i64;
for container in &pod_spec.containers {
if let Some(resources) = &container.resources {
if let Some(requests) = &resources.requests {
total_cpu += requests
.get("cpu")
.and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok())
.unwrap_or(0);
total_memory += requests
.get("memory")
.and_then(|s| ResourceQuantities::parse_memory(&s.0).ok())
.unwrap_or(0);
}
}
}
// Calculate remaining resources after scheduling this pod
let remaining_cpu = node_resources.cpu_millicores - total_cpu;
let remaining_memory = node_resources.memory_bytes - total_memory;
// Calculate utilization percentage for each resource
let cpu_utilization = if node_resources.cpu_millicores > 0 {
((node_resources.cpu_millicores - remaining_cpu) as f64
/ node_resources.cpu_millicores as f64)
* 100.0
} else {
100.0
};
let memory_utilization = if node_resources.memory_bytes > 0 {
((node_resources.memory_bytes - remaining_memory) as f64
/ node_resources.memory_bytes as f64)
* 100.0
} else {
100.0
};
// Score is inverse of average utilization
// Lower utilization = higher score (prefer less loaded nodes)
let avg_utilization = (cpu_utilization + memory_utilization) / 2.0;
let score = (100.0 - avg_utilization).max(0.0).min(100.0) as i32;
debug!(
"Node {} score: {} (CPU util: {:.1}%, Memory util: {:.1}%)",
node_name, score, cpu_utilization, memory_utilization
);
ScoreResult::new(node_name, score)
}
fn name(&self) -> &str {
"LeastAllocated"
}
}
/// Score based on balanced resource allocation
pub struct BalancedAllocation;
impl ScoreFunction for BalancedAllocation {
fn score(&self, context: &SchedulingContext, node: &Node) -> ScoreResult {
let node_name = node
.metadata
.name
.as_ref()
.unwrap_or(&"unknown".to_string())
.clone();
// Get node allocatable resources
let allocatable = node
.status
.as_ref()
.and_then(|s| s.allocatable.as_ref())
.cloned()
.unwrap_or_default();
let node_resources = ResourceQuantities::from_k8s_resource_map(&allocatable);
if node_resources.cpu_millicores == 0 || node_resources.memory_bytes == 0 {
return ScoreResult::new(node_name, 0);
}
// Get pod requested resources
let pod_spec = match &context.pod.spec {
Some(spec) => spec,
None => return ScoreResult::new(node_name, 50),
};
let mut total_cpu = 0i64;
let mut total_memory = 0i64;
for container in &pod_spec.containers {
if let Some(resources) = &container.resources {
if let Some(requests) = &resources.requests {
total_cpu += requests
.get("cpu")
.and_then(|s| ResourceQuantities::parse_cpu(&s.0).ok())
.unwrap_or(0);
total_memory += requests
.get("memory")
.and_then(|s| ResourceQuantities::parse_memory(&s.0).ok())
.unwrap_or(0);
}
}
}
// Calculate utilization after scheduling
let cpu_fraction = total_cpu as f64 / node_resources.cpu_millicores as f64;
let memory_fraction = total_memory as f64 / node_resources.memory_bytes as f64;
// Prefer balanced resource usage (CPU and memory usage should be similar)
let variance = (cpu_fraction - memory_fraction).abs();
let score = ((1.0 - variance) * 100.0).max(0.0).min(100.0) as i32;
debug!(
"Node {} balanced allocation score: {} (variance: {:.3})",
node_name, score, variance
);
ScoreResult::new(node_name, score)
}
fn name(&self) -> &str {
"BalancedAllocation"
}
}
/// Get default scoring functions
pub fn default_scores() -> Vec<Box<dyn ScoreFunction>> {
vec![
Box::new(LeastAllocated),
Box::new(BalancedAllocation),
]
}
/// Calculate weighted score from multiple scoring functions
pub fn calculate_weighted_score(scores: &[ScoreResult]) -> i32 {
if scores.is_empty() {
return 0;
}
let total: i32 = scores.iter().map(|s| s.score).sum();
total / scores.len() as i32
}
#[cfg(test)]
mod tests {
use super::*;
use reddwarf_core::{Node, Pod};
use std::collections::BTreeMap;
fn create_test_node(name: &str, cpu: &str, memory: &str) -> Node {
let mut node = Node::default();
node.metadata.name = Some(name.to_string());
node.status = Some(Default::default());
node.status.as_mut().unwrap().allocatable = Some(BTreeMap::new());
node.status
.as_mut()
.unwrap()
.allocatable
.as_mut()
.unwrap()
.insert(
"cpu".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()),
);
node.status
.as_mut()
.unwrap()
.allocatable
.as_mut()
.unwrap()
.insert(
"memory".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()),
);
node
}
fn create_test_pod(cpu: &str, memory: &str) -> Pod {
let mut pod = Pod::default();
pod.metadata.name = Some("test-pod".to_string());
pod.spec = Some(Default::default());
pod.spec.as_mut().unwrap().containers = vec![Default::default()];
pod.spec.as_mut().unwrap().containers[0].resources = Some(Default::default());
pod.spec.as_mut().unwrap().containers[0]
.resources
.as_mut()
.unwrap()
.requests = Some(BTreeMap::new());
pod.spec.as_mut().unwrap().containers[0]
.resources
.as_mut()
.unwrap()
.requests
.as_mut()
.unwrap()
.insert(
"cpu".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(cpu.to_string()),
);
pod.spec.as_mut().unwrap().containers[0]
.resources
.as_mut()
.unwrap()
.requests
.as_mut()
.unwrap()
.insert(
"memory".to_string(),
k8s_openapi::apimachinery::pkg::api::resource::Quantity(memory.to_string()),
);
pod
}
#[test]
fn test_least_allocated() {
let node1 = create_test_node("node1", "4", "8Gi");
let node2 = create_test_node("node2", "4", "8Gi");
let pod = create_test_pod("1", "2Gi");
let context = SchedulingContext::new(pod, vec![node1.clone(), node2.clone()]);
let scorer = LeastAllocated;
let score1 = scorer.score(&context, &node1);
let score2 = scorer.score(&context, &node2);
// Both nodes should have same score (same resources, same request)
assert_eq!(score1.score, score2.score);
assert!(score1.score > 50); // Should prefer empty nodes
}
#[test]
fn test_calculate_weighted_score() {
let scores = vec![
ScoreResult::new("node1".to_string(), 80),
ScoreResult::new("node1".to_string(), 60),
];
let weighted = calculate_weighted_score(&scores);
assert_eq!(weighted, 70); // (80 + 60) / 2
}
}

View file

@ -0,0 +1,180 @@
use reddwarf_core::{Node, Pod};
use std::collections::HashMap;
/// Scheduling context containing pod and available nodes
#[derive(Debug, Clone)]
pub struct SchedulingContext {
/// Pod to be scheduled
pub pod: Pod,
/// Available nodes
pub nodes: Vec<Node>,
}
impl SchedulingContext {
/// Create a new scheduling context
pub fn new(pod: Pod, nodes: Vec<Node>) -> Self {
Self { pod, nodes }
}
}
/// Result of filtering a node
#[derive(Debug, Clone)]
pub struct FilterResult {
/// Node name
pub node_name: String,
/// Whether the node passed the filter
pub passed: bool,
/// Reason for failure (if any)
pub reason: Option<String>,
}
impl FilterResult {
/// Create a passing filter result
pub fn pass(node_name: String) -> Self {
Self {
node_name,
passed: true,
reason: None,
}
}
/// Create a failing filter result
pub fn fail(node_name: String, reason: String) -> Self {
Self {
node_name,
passed: false,
reason: Some(reason),
}
}
}
/// Result of scoring a node
#[derive(Debug, Clone)]
pub struct ScoreResult {
/// Node name
pub node_name: String,
/// Score (0-100, higher is better)
pub score: i32,
}
impl ScoreResult {
/// Create a new score result
pub fn new(node_name: String, score: i32) -> Self {
Self { node_name, score }
}
}
/// Resource quantities for nodes
#[derive(Debug, Clone, Default)]
pub struct ResourceQuantities {
/// CPU in millicores (1000 = 1 core)
pub cpu_millicores: i64,
/// Memory in bytes
pub memory_bytes: i64,
}
impl ResourceQuantities {
/// Parse CPU string (e.g., "2", "1000m", "0.5")
pub fn parse_cpu(s: &str) -> Result<i64, String> {
if let Some(m) = s.strip_suffix('m') {
// Millicores
m.parse::<i64>()
.map_err(|e| format!("Invalid CPU millicore value: {}", e))
} else if let Ok(cores) = s.parse::<f64>() {
// Cores as float
Ok((cores * 1000.0) as i64)
} else {
Err(format!("Invalid CPU format: {}", s))
}
}
/// Parse memory string (e.g., "128Mi", "1Gi", "1024")
pub fn parse_memory(s: &str) -> Result<i64, String> {
if let Some(num) = s.strip_suffix("Ki") {
Ok(num.parse::<i64>().map_err(|e| e.to_string())? * 1024)
} else if let Some(num) = s.strip_suffix("Mi") {
Ok(num.parse::<i64>().map_err(|e| e.to_string())? * 1024 * 1024)
} else if let Some(num) = s.strip_suffix("Gi") {
Ok(num.parse::<i64>().map_err(|e| e.to_string())? * 1024 * 1024 * 1024)
} else {
// Plain bytes
s.parse::<i64>().map_err(|e| e.to_string())
}
}
/// Get CPU and memory from a resource map (k8s-openapi format)
pub fn from_k8s_resource_map(
resources: &std::collections::BTreeMap<String, k8s_openapi::apimachinery::pkg::api::resource::Quantity>,
) -> Self {
let cpu_millicores = resources
.get("cpu")
.and_then(|q| Self::parse_cpu(&q.0).ok())
.unwrap_or(0);
let memory_bytes = resources
.get("memory")
.and_then(|q| Self::parse_memory(&q.0).ok())
.unwrap_or(0);
Self {
cpu_millicores,
memory_bytes,
}
}
/// Get CPU and memory from a resource map (test format)
pub fn from_resource_map(resources: &HashMap<String, String>) -> Self {
let cpu_millicores = resources
.get("cpu")
.and_then(|s| Self::parse_cpu(s).ok())
.unwrap_or(0);
let memory_bytes = resources
.get("memory")
.and_then(|s| Self::parse_memory(s).ok())
.unwrap_or(0);
Self {
cpu_millicores,
memory_bytes,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_cpu() {
assert_eq!(ResourceQuantities::parse_cpu("1").unwrap(), 1000);
assert_eq!(ResourceQuantities::parse_cpu("0.5").unwrap(), 500);
assert_eq!(ResourceQuantities::parse_cpu("100m").unwrap(), 100);
assert_eq!(ResourceQuantities::parse_cpu("2").unwrap(), 2000);
}
#[test]
fn test_parse_memory() {
assert_eq!(ResourceQuantities::parse_memory("1024").unwrap(), 1024);
assert_eq!(ResourceQuantities::parse_memory("1Ki").unwrap(), 1024);
assert_eq!(
ResourceQuantities::parse_memory("128Mi").unwrap(),
128 * 1024 * 1024
);
assert_eq!(
ResourceQuantities::parse_memory("1Gi").unwrap(),
1024 * 1024 * 1024
);
}
#[test]
fn test_filter_result() {
let pass = FilterResult::pass("node1".to_string());
assert!(pass.passed);
assert!(pass.reason.is_none());
let fail = FilterResult::fail("node2".to_string(), "Insufficient CPU".to_string());
assert!(!fail.passed);
assert_eq!(fail.reason, Some("Insufficient CPU".to_string()));
}
}