use std::{collections::HashMap, sync::Arc, time::Duration}; use dashmap::DashMap; use miette::Result; use tokio::sync::{mpsc, Semaphore}; use tokio::task::JoinSet; use tracing::{error, info}; use crate::hypervisor::{Hypervisor, VmSpec, JobContext}; use crate::persist; use sea_orm::DatabaseConnection; pub struct Scheduler { hv: Arc, tx: mpsc::Sender, rx: mpsc::Receiver, global_sem: Arc, label_sems: Arc, } type DashmapType = DashMap>; pub struct SchedItem { pub spec: VmSpec, pub ctx: JobContext, } impl Scheduler { pub fn new(hv: H, max_concurrency: usize, capacity_map: &HashMap) -> Self { let (tx, rx) = mpsc::channel::(max_concurrency * 4); let label_sems = DashMap::new(); for (label, cap) in capacity_map.iter() { label_sems.insert(label.clone(), Arc::new(Semaphore::new(*cap))); } Self { hv: Arc::new(hv), tx, rx, global_sem: Arc::new(Semaphore::new(max_concurrency)), label_sems: Arc::new(label_sems), } } pub fn sender(&self) -> mpsc::Sender { self.tx.clone() } pub async fn run(self) -> Result<()> { let Scheduler { hv, mut rx, global_sem, label_sems, .. } = self; let mut handles = Vec::new(); while let Some(item) = rx.recv().await { let hv = hv.clone(); let global = global_sem.clone(); let label_sems = label_sems.clone(); let handle = tokio::spawn(async move { // Acquire global and label permits (owned permits so they live inside the task) let _g = match global.acquire_owned().await { Ok(p) => p, Err(_) => return, }; let label_key = item.spec.label.clone(); let sem_arc = if let Some(entry) = label_sems.get(&label_key) { entry.clone() } else { let s = Arc::new(Semaphore::new(1)); label_sems.insert(label_key.clone(), s.clone()); s }; let _l = match sem_arc.acquire_owned().await { Ok(p) => p, Err(_) => return, }; // Provision and run match hv.prepare(&item.spec, &item.ctx).await { Ok(handle) => { if let Err(e) = hv.start(&handle).await { error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to start VM"); return; } info!(request_id = %item.ctx.request_id, label = %label_key, "vm started (workload execution placeholder)"); // Placeholder job runtime tokio::time::sleep(Duration::from_secs(1)).await; // Stop and destroy if let Err(e) = hv.stop(&handle, Duration::from_secs(10)).await { error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to stop VM"); } if let Err(e) = hv.destroy(handle).await { error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to destroy VM"); } } Err(e) => { error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to prepare VM"); return; } } }); handles.push(handle); } // Wait for all in-flight tasks to finish for h in handles { let _ = h.await; } Ok(()) } }