From f904cb88b291100ed05c980a7d06cd37b19b26cf3ab3497b7c05c308130cef19 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sun, 9 Nov 2025 17:59:04 +0100 Subject: [PATCH] Relax filesystem permissions for VM directories, overlays, and logs to support host libvirt/qemu access. Introduce dead-letter queue support with enriched error messages for failed jobs. Signed-off-by: Till Wegmueller --- crates/common/src/lib.rs | 4 +- crates/common/src/messages.rs | 38 +++++++++ crates/common/src/mq.rs | 111 ++++++++++++++++++-------- crates/orchestrator/src/hypervisor.rs | 14 +++- crates/orchestrator/src/main.rs | 6 +- crates/orchestrator/src/scheduler.rs | 28 ++++++- 6 files changed, 158 insertions(+), 43 deletions(-) diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index d0aff82..979f636 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -5,8 +5,8 @@ pub mod telemetry; pub mod config; pub use job::{Job, Step, Workflow, parse_workflow_file, parse_workflow_str}; -pub use messages::{JobRequest, JobResult, SourceSystem}; -pub use mq::{MqConfig, consume_jobs, consume_jobs_until, publish_job, publish_job_result}; +pub use messages::{JobRequest, JobResult, SourceSystem, DeadLetter}; +pub use mq::{MqConfig, consume_jobs, consume_jobs_until, publish_job, publish_job_result, publish_deadletter}; pub use telemetry::{TelemetryGuard, init_tracing}; pub use config::AppConfig; diff --git a/crates/common/src/messages.rs b/crates/common/src/messages.rs index 863f8b9..0b8ba5d 100644 --- a/crates/common/src/messages.rs +++ b/crates/common/src/messages.rs @@ -120,3 +120,41 @@ impl JobResult { } } } + +/// Combined dead-letter message that includes the original JobRequest and +/// an error summary so operators/tooling can inspect both together. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeadLetter { + #[serde(default = "default_deadletter_schema")] + pub schema_version: String, // e.g., "deadletter.v1" + pub request_id: Uuid, + pub occurred_at: OffsetDateTime, + /// Stage where the failure occurred (e.g., "prepare", "start", "other"). + pub stage: String, + /// Human-readable error summary (single-line preferred). + pub error: String, + /// Original job request that triggered the failure. + pub original: JobRequest, + /// Optional bag of extra diagnostic info. + #[serde(default)] + pub extra: Option>, +} + +fn default_deadletter_schema() -> String { + "deadletter.v1".to_string() +} + +impl DeadLetter { + pub fn new(stage: impl Into, error: impl Into, original: JobRequest) -> Self { + let request_id = original.request_id; + Self { + schema_version: default_deadletter_schema(), + request_id, + occurred_at: OffsetDateTime::now_utc(), + stage: stage.into(), + error: error.into(), + original, + extra: None, + } + } +} diff --git a/crates/common/src/mq.rs b/crates/common/src/mq.rs index 5b808ef..f04c7a2 100644 --- a/crates/common/src/mq.rs +++ b/crates/common/src/mq.rs @@ -14,7 +14,7 @@ use miette::{IntoDiagnostic as _, Result}; use tracing::Instrument; use tracing::{error, info, instrument, warn}; -use crate::messages::{JobRequest, JobResult}; +use crate::messages::{JobRequest, JobResult, DeadLetter}; /// Pretty-print an AMQP message body for logs. /// - If valid UTF-8 JSON, pretty-format it. @@ -236,6 +236,42 @@ pub struct DeliveryMeta { pub delivery_tag: u64, } +#[instrument(skip(cfg, dl))] +pub async fn publish_deadletter(cfg: &MqConfig, dl: &DeadLetter) -> Result<()> { + let conn = connect(cfg).await?; + let channel = conn.create_channel().await.into_diagnostic()?; + declare_topology(&channel, cfg).await?; + + // Enable confirms + channel + .confirm_select(ConfirmSelectOptions::default()) + .await + .into_diagnostic()?; + + let body = serde_json::to_vec(dl).into_diagnostic()?; + let props = BasicProperties::default() + .with_content_type(ShortString::from("application/json")) + .with_type(ShortString::from(dl.schema_version.clone())) + .with_delivery_mode(2u8.into()); + + let confirm = channel + .basic_publish( + &cfg.dlx, + "", + BasicPublishOptions { + mandatory: true, + immediate: false, + }, + &body, + props, + ) + .await + .into_diagnostic()?; + + confirm.await.into_diagnostic()?; + Ok(()) +} + #[instrument(skip(cfg, handler))] pub async fn consume_jobs(cfg: &MqConfig, handler: F) -> Result<()> where @@ -355,50 +391,55 @@ where pub async fn publish_job_result(cfg: &MqConfig, result: &JobResult) -> Result<()> { let conn = connect(cfg).await?; let channel = conn.create_channel().await.into_diagnostic()?; - - // Ensure main exchange exists - channel - .exchange_declare( - &cfg.exchange, - lapin::ExchangeKind::Direct, - ExchangeDeclareOptions { - durable: true, - auto_delete: false, - internal: false, - nowait: false, - passive: false, - }, - FieldTable::default(), - ) - .await - .into_diagnostic()?; - - // Enable publisher confirms - channel - .confirm_select(ConfirmSelectOptions::default()) - .await - .into_diagnostic()?; - - let payload = serde_json::to_vec(result).into_diagnostic()?; + declare_topology(&channel, cfg).await?; let props = BasicProperties::default() - .with_content_type("application/json".into()) - .with_content_encoding("utf-8".into()) - .with_type(ShortString::from(result.schema_version.clone())) - .with_delivery_mode(2u8.into()); + .with_content_type(ShortString::from("application/json")); - // Route by schema version; default routing key for results - let routing_key = "jobresult.v1"; + let body = serde_json::to_vec(result).into_diagnostic()?; let confirm = channel .basic_publish( &cfg.exchange, - routing_key, + "jobresult.v1", BasicPublishOptions { mandatory: true, - ..Default::default() + immediate: false, }, - &payload, + &body, + props, + ) + .await + .into_diagnostic()?; + + confirm.await.into_diagnostic()?; + Ok(()) +} + +/// Publish a raw message body to the DLX so it lands in the configured DLQ. +pub async fn publish_to_dlx_raw(cfg: &MqConfig, body: &[u8]) -> Result<()> { + let conn = connect(cfg).await?; + let channel = conn.create_channel().await.into_diagnostic()?; + declare_topology(&channel, cfg).await?; + + // Enable confirms + channel + .confirm_select(ConfirmSelectOptions::default()) + .await + .into_diagnostic()?; + + let props = BasicProperties::default() + .with_content_type(ShortString::from("application/json")); + + let confirm = channel + .basic_publish( + &cfg.dlx, + "", + BasicPublishOptions { + mandatory: true, + immediate: false, + }, + body, props, ) .await diff --git a/crates/orchestrator/src/hypervisor.rs b/crates/orchestrator/src/hypervisor.rs index c5d7344..a9cad78 100644 --- a/crates/orchestrator/src/hypervisor.rs +++ b/crates/orchestrator/src/hypervisor.rs @@ -284,7 +284,8 @@ impl LibvirtHypervisor { std::env::temp_dir().join("solstice-libvirt").join(id) }; let _ = std::fs::create_dir_all(&dir); - let _ = std::fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o700)); + // Make directory broadly accessible so host qemu (libvirt) can create/read files + let _ = std::fs::set_permissions(&dir, std::fs::Permissions::from_mode(0o777)); dir } } @@ -367,6 +368,8 @@ impl Hypervisor for LibvirtHypervisor { .await .into_diagnostic()??; let _ = status; // appease compiler if unused + // Relax permissions on overlay so host qemu can access it + let _ = std::fs::set_permissions(&overlay, std::fs::Permissions::from_mode(0o666)); // Build NoCloud seed ISO if user_data provided let mut seed_iso: Option = None; @@ -415,8 +418,15 @@ impl Hypervisor for LibvirtHypervisor { seed_iso = Some(iso_path); } - // Serial console log file path + // Relax permissions on seed ISO if created (readable by host qemu) + if let Some(ref p) = seed_iso { + let _ = std::fs::set_permissions(p, std::fs::Permissions::from_mode(0o644)); + } + + // Serial console log file path (pre-create with permissive perms for libvirt) let console_log = work_dir.join("console.log"); + let _ = std::fs::OpenOptions::new().create(true).append(true).open(&console_log); + let _ = std::fs::set_permissions(&console_log, std::fs::Permissions::from_mode(0o666)); let console_log_str = console_log.display().to_string(); info!(domain = %id, console = %console_log_str, "serial console will be logged to file"); diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 7a713a3..700e1e8 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -17,7 +17,6 @@ use hypervisor::{JobContext, RouterHypervisor, VmSpec}; use scheduler::{SchedItem, Scheduler}; use std::sync::Arc; use tokio::sync::Notify; -use std::net::SocketAddr as _; #[derive(Parser, Debug)] #[command( @@ -185,6 +184,7 @@ async fn main() -> Result<()> { &capacity_map, persist.clone(), Duration::from_secs(opts.vm_placeholder_run_secs), + Arc::new(mq_cfg.clone()), ); let sched_shutdown = Arc::new(Notify::new()); let sched_tx = sched.sender(); @@ -235,6 +235,8 @@ async fn main() -> Result<()> { image.defaults.as_ref().and_then(|d| d.ram_mb).unwrap_or(2048), image.defaults.as_ref().and_then(|d| d.disk_gb).unwrap_or(40), ); + // Keep original job to enrich dead-letter messages if scheduling fails early + let original = Some(job.clone()); let spec = VmSpec { label: label_resolved.clone(), image_path: image.local_path.clone(), @@ -254,7 +256,7 @@ async fn main() -> Result<()> { commit_sha: job.commit_sha, workflow_job_id: job.workflow_job_id, }; - sched_tx.send(SchedItem { spec, ctx }).await.into_diagnostic()?; + sched_tx.send(SchedItem { spec, ctx, original }).await.into_diagnostic()?; Ok(()) // ack on accept } }).await diff --git a/crates/orchestrator/src/scheduler.rs b/crates/orchestrator/src/scheduler.rs index 5acdb0c..93489d0 100644 --- a/crates/orchestrator/src/scheduler.rs +++ b/crates/orchestrator/src/scheduler.rs @@ -4,11 +4,13 @@ use dashmap::DashMap; use miette::Result; use tokio::sync::{Notify, Semaphore, mpsc}; use tracing::{error, info, warn}; +use common::{publish_deadletter, DeadLetter, JobRequest}; use crate::hypervisor::{BackendTag, Hypervisor, JobContext, VmSpec}; use crate::persist::{JobState, Persist, VmPersistState}; pub struct Scheduler { + mq_cfg: Arc, hv: Arc, tx: mpsc::Sender, rx: mpsc::Receiver, @@ -23,6 +25,8 @@ type DashmapType = DashMap>; pub struct SchedItem { pub spec: VmSpec, pub ctx: JobContext, + /// Original JobRequest for enriching dead-letter messages on early failure + pub original: Option, } impl Scheduler { @@ -32,6 +36,7 @@ impl Scheduler { capacity_map: &HashMap, persist: Arc, placeholder_runtime: Duration, + mq_cfg: Arc, ) -> Self { let (tx, rx) = mpsc::channel::(max_concurrency * 4); let label_sems = DashMap::new(); @@ -39,6 +44,7 @@ impl Scheduler { label_sems.insert(label.clone(), Arc::new(Semaphore::new(*cap))); } Self { + mq_cfg, hv: Arc::new(hv), tx, rx, @@ -55,6 +61,7 @@ impl Scheduler { pub async fn run_with_shutdown(self, shutdown: Arc) -> Result<()> { let Scheduler { + mq_cfg, hv, mut rx, global_sem, @@ -81,6 +88,7 @@ impl Scheduler { let label_sems = label_sems.clone(); let persist = persist.clone(); let shutdown = shutdown.clone(); + let mq_cfg_in = mq_cfg.clone(); let handle = tokio::spawn(async move { // Acquire global and label permits (owned permits so they live inside the task) let _g = match global.acquire_owned().await { Ok(p) => p, Err(_) => return }; @@ -105,6 +113,13 @@ impl Scheduler { if let Err(e) = hv.start(&h).await { error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to start VM"); let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Failed).await; + // Publish combined dead-letter with original request and error + if let Some(ref orig) = item.original { + let dl = DeadLetter::new("start", format!("{}", e), orig.clone()); + if let Err(pe) = publish_deadletter(&mq_cfg_in, &dl).await { + warn!(error = %pe, request_id = %item.ctx.request_id, "failed to publish dead-letter"); + } + } info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, "failed"); return; } @@ -184,6 +199,13 @@ impl Scheduler { Err(e) => { error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to prepare VM"); let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Failed).await; + // Publish combined dead-letter with original request and error + if let Some(ref orig) = item.original { + let dl = DeadLetter::new("prepare", format!("{}", e), orig.clone()); + if let Err(pe) = publish_deadletter(&mq_cfg_in, &dl).await { + warn!(error = %pe, request_id = %item.ctx.request_id, "failed to publish dead-letter"); + } + } info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, "failed"); return; } @@ -338,7 +360,7 @@ mod tests { let mut caps = HashMap::new(); caps.insert("x".to_string(), 10); - let sched = Scheduler::new(hv, 2, &caps, persist, Duration::from_millis(10)); + let sched = Scheduler::new(hv, 2, &caps, persist, Duration::from_millis(10), Arc::new(common::MqConfig::default())); let tx = sched.sender(); let run = tokio::spawn(async move { let _ = sched.run().await; @@ -349,6 +371,7 @@ mod tests { .send(SchedItem { spec: make_spec("x"), ctx: make_ctx(), + original: None, }) .await; } @@ -375,7 +398,7 @@ mod tests { caps.insert("a".to_string(), 1); caps.insert("b".to_string(), 2); - let sched = Scheduler::new(hv, 4, &caps, persist, Duration::from_millis(10)); + let sched = Scheduler::new(hv, 4, &caps, persist, Duration::from_millis(10), Arc::new(common::MqConfig::default())); let tx = sched.sender(); let run = tokio::spawn(async move { let _ = sched.run().await; @@ -386,6 +409,7 @@ mod tests { .send(SchedItem { spec: make_spec("a"), ctx: make_ctx(), + original: None, }) .await; }