diff --git a/crates/ciadm/src/main.rs b/crates/ciadm/src/main.rs index 2def2f2..003cf1d 100644 --- a/crates/ciadm/src/main.rs +++ b/crates/ciadm/src/main.rs @@ -36,10 +36,16 @@ async fn main() -> Result<()> { let _t = common::init_tracing("ciadm")?; let cli = Cli::parse(); match cli.command { - Commands::Trigger { repo, r#ref, workflow } => { + Commands::Trigger { + repo, + r#ref, + workflow, + } => { info!(%repo, %r#ref, %workflow, "trigger requested"); // TODO: Call orchestrator API to enqueue job - println!("Triggered job for {repo}@{ref} using {workflow}", r#ref = r#ref); + println!( + "Triggered job for {repo}@{ref} using {workflow}", + ); } Commands::Status { job_id } => { info!(%job_id, "status requested"); diff --git a/crates/cidev/src/main.rs b/crates/cidev/src/main.rs index c565986..60cb41f 100644 --- a/crates/cidev/src/main.rs +++ b/crates/cidev/src/main.rs @@ -2,7 +2,11 @@ use clap::{Parser, Subcommand}; use miette::Result; #[derive(Parser, Debug)] -#[command(name = "cidev", version, about = "Solstice CI Dev CLI — validate and inspect KDL workflows")] +#[command( + name = "cidev", + version, + about = "Solstice CI Dev CLI — validate and inspect KDL workflows" +)] struct Cli { #[command(subcommand)] command: Command, @@ -11,11 +15,22 @@ struct Cli { #[derive(Subcommand, Debug)] enum Command { /// Validate a workflow KDL file - Validate { #[arg(long)] path: String }, + Validate { + #[arg(long)] + path: String, + }, /// List jobs in a workflow - List { #[arg(long)] path: String }, + List { + #[arg(long)] + path: String, + }, /// Show a job's steps (by job id) - Show { #[arg(long)] path: String, #[arg(long)] job: String }, + Show { + #[arg(long)] + path: String, + #[arg(long)] + job: String, + }, } #[tokio::main(flavor = "multi_thread")] @@ -25,14 +40,24 @@ async fn main() -> Result<()> { match cli.command { Command::Validate { path } => { let wf = common::parse_workflow_file(&path)?; - println!("OK: parsed workflow{} with {} job(s)", - wf.name.as_ref().map(|n| format!(" '{n}'")).unwrap_or_default(), - wf.jobs.len()); + println!( + "OK: parsed workflow{} with {} job(s)", + wf.name + .as_ref() + .map(|n| format!(" '{n}'")) + .unwrap_or_default(), + wf.jobs.len() + ); } Command::List { path } => { let wf = common::parse_workflow_file(&path)?; for (id, job) in wf.jobs { - println!("{id}{}", job.runs_on.map(|ro| format!(" (runs_on: {ro})")).unwrap_or_default()); + println!( + "{id}{}", + job.runs_on + .map(|ro| format!(" (runs_on: {ro})")) + .unwrap_or_default() + ); } } Command::Show { path, job } => { @@ -40,7 +65,9 @@ async fn main() -> Result<()> { match wf.jobs.get(&job) { Some(j) => { println!("Job: {}", j.id); - if let Some(ro) = &j.runs_on { println!("runs_on: {ro}"); } + if let Some(ro) = &j.runs_on { + println!("runs_on: {ro}"); + } for (i, s) in j.steps.iter().enumerate() { let name = s.name.as_deref().unwrap_or("(unnamed)"); println!("- Step {}/{}: {}", i + 1, j.steps.len(), name); diff --git a/crates/common/src/job.rs b/crates/common/src/job.rs index 998f5e9..a8feb63 100644 --- a/crates/common/src/job.rs +++ b/crates/common/src/job.rs @@ -1,6 +1,6 @@ -use std::{collections::BTreeMap, fs, path::Path}; use kdl::{KdlDocument, KdlNode}; use miette::{IntoDiagnostic, Report, Result}; +use std::{collections::BTreeMap, fs, path::Path}; #[derive(Debug, Clone)] pub struct Workflow { diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 4344f8c..e3d48e0 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,12 +1,12 @@ -pub mod telemetry; pub mod job; pub mod messages; pub mod mq; +pub mod telemetry; -pub use telemetry::{init_tracing, TelemetryGuard}; -pub use job::{Workflow, Job, Step, parse_workflow_str, parse_workflow_file}; +pub use job::{Job, Step, Workflow, parse_workflow_file, parse_workflow_str}; pub use messages::{JobRequest, JobResult, SourceSystem}; -pub use mq::{MqConfig, publish_job, publish_job_result, consume_jobs, consume_jobs_until}; +pub use mq::{MqConfig, consume_jobs, consume_jobs_until, publish_job, publish_job_result}; +pub use telemetry::{TelemetryGuard, init_tracing}; // Generated gRPC module for runner <-> orchestrator pub mod runner { diff --git a/crates/common/src/messages.rs b/crates/common/src/messages.rs index 0193866..6326c8b 100644 --- a/crates/common/src/messages.rs +++ b/crates/common/src/messages.rs @@ -27,7 +27,9 @@ pub struct JobRequest { pub submitted_at: OffsetDateTime, } -fn default_jobrequest_schema() -> String { "jobrequest.v1".to_string() } +fn default_jobrequest_schema() -> String { + "jobrequest.v1".to_string() +} #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -38,7 +40,11 @@ pub enum SourceSystem { } impl JobRequest { - pub fn new(source: SourceSystem, repo_url: impl Into, commit_sha: impl Into) -> Self { + pub fn new( + source: SourceSystem, + repo_url: impl Into, + commit_sha: impl Into, + ) -> Self { Self { schema_version: default_jobrequest_schema(), request_id: Uuid::new_v4(), @@ -73,10 +79,19 @@ pub struct JobResult { pub completed_at: OffsetDateTime, } -fn default_jobresult_schema() -> String { "jobresult.v1".to_string() } +fn default_jobresult_schema() -> String { + "jobresult.v1".to_string() +} impl JobResult { - pub fn new(request_id: Uuid, repo_url: String, commit_sha: String, success: bool, exit_code: i32, summary: Option) -> Self { + pub fn new( + request_id: Uuid, + repo_url: String, + commit_sha: String, + success: bool, + exit_code: i32, + summary: Option, + ) -> Self { Self { schema_version: default_jobresult_schema(), request_id, diff --git a/crates/common/src/mq.rs b/crates/common/src/mq.rs index c93c3dc..8a918e0 100644 --- a/crates/common/src/mq.rs +++ b/crates/common/src/mq.rs @@ -2,16 +2,17 @@ use std::time::Duration; use futures_util::StreamExt; use lapin::{ + BasicProperties, Channel, Connection, ConnectionProperties, Consumer, options::{ - BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions, - ConfirmSelectOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, + BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, + BasicQosOptions, ConfirmSelectOptions, ExchangeDeclareOptions, QueueBindOptions, + QueueDeclareOptions, }, types::{AMQPValue, FieldTable, LongString, ShortString}, - BasicProperties, Channel, Connection, ConnectionProperties, Consumer, }; use miette::{IntoDiagnostic as _, Result}; -use tracing::{error, info, instrument, warn}; use tracing::Instrument; +use tracing::{error, info, instrument, warn}; use crate::messages::{JobRequest, JobResult}; @@ -31,11 +32,15 @@ impl Default for MqConfig { Self { url: std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()), exchange: std::env::var("AMQP_EXCHANGE").unwrap_or_else(|_| "solstice.jobs".into()), - routing_key: std::env::var("AMQP_ROUTING_KEY").unwrap_or_else(|_| "jobrequest.v1".into()), + routing_key: std::env::var("AMQP_ROUTING_KEY") + .unwrap_or_else(|_| "jobrequest.v1".into()), queue: std::env::var("AMQP_QUEUE").unwrap_or_else(|_| "solstice.jobs.v1".into()), dlx: std::env::var("AMQP_DLX").unwrap_or_else(|_| "solstice.dlx".into()), dlq: std::env::var("AMQP_DLQ").unwrap_or_else(|_| "solstice.jobs.v1.dlq".into()), - prefetch: std::env::var("AMQP_PREFETCH").ok().and_then(|s| s.parse().ok()).unwrap_or(64), + prefetch: std::env::var("AMQP_PREFETCH") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(64), } } } @@ -71,7 +76,13 @@ pub async fn declare_topology(channel: &Channel, cfg: &MqConfig) -> Result<()> { .exchange_declare( &cfg.dlx, lapin::ExchangeKind::Fanout, - ExchangeDeclareOptions { durable: true, auto_delete: false, internal: false, nowait: false, passive: false }, + ExchangeDeclareOptions { + durable: true, + auto_delete: false, + internal: false, + nowait: false, + passive: false, + }, FieldTable::default(), ) .await @@ -86,7 +97,13 @@ pub async fn declare_topology(channel: &Channel, cfg: &MqConfig) -> Result<()> { channel .queue_declare( &cfg.dlq, - QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false }, + QueueDeclareOptions { + durable: true, + auto_delete: false, + exclusive: false, + nowait: false, + passive: false, + }, dlq_args, ) .await @@ -113,7 +130,13 @@ pub async fn declare_topology(channel: &Channel, cfg: &MqConfig) -> Result<()> { channel .queue_declare( &cfg.queue, - QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false }, + QueueDeclareOptions { + durable: true, + auto_delete: false, + exclusive: false, + nowait: false, + passive: false, + }, q_args, ) .await @@ -159,7 +182,10 @@ pub async fn publish_job(cfg: &MqConfig, job: &JobRequest) -> Result<()> { .basic_publish( &cfg.exchange, &cfg.routing_key, - BasicPublishOptions { mandatory: true, ..Default::default() }, + BasicPublishOptions { + mandatory: true, + ..Default::default() + }, &payload, props, ) @@ -206,7 +232,10 @@ where .basic_consume( &cfg.queue, "orchestrator", - BasicConsumeOptions { no_ack: false, ..Default::default() }, + BasicConsumeOptions { + no_ack: false, + ..Default::default() + }, FieldTable::default(), ) .await @@ -215,7 +244,8 @@ where info!(queue = %cfg.queue, prefetch = cfg.prefetch, "consumer started"); tokio::pin!(consumer); - let mut shutdown: core::pin::Pin + Send>> = Box::pin(shutdown); + let mut shutdown: core::pin::Pin + Send>> = + Box::pin(shutdown); 'consume: loop { tokio::select! { @@ -272,12 +302,12 @@ where // Close channel and connection to stop heartbeats and background tasks match tokio::time::timeout(Duration::from_secs(2), channel.close(200, "shutdown")).await { - Ok(Ok(_)) => {}, + Ok(Ok(_)) => {} Ok(Err(e)) => warn!(error = %e, "failed to close AMQP channel"), Err(_) => warn!("timeout while closing AMQP channel"), } match tokio::time::timeout(Duration::from_secs(2), conn.close(200, "shutdown")).await { - Ok(Ok(_)) => {}, + Ok(Ok(_)) => {} Ok(Err(e)) => warn!(error = %e, "failed to close AMQP connection"), Err(_) => warn!("timeout while closing AMQP connection"), } @@ -286,7 +316,6 @@ where Ok(()) } - #[instrument(skip(cfg, result))] pub async fn publish_job_result(cfg: &MqConfig, result: &JobResult) -> Result<()> { let conn = connect(cfg).await?; @@ -297,7 +326,13 @@ pub async fn publish_job_result(cfg: &MqConfig, result: &JobResult) -> Result<() .exchange_declare( &cfg.exchange, lapin::ExchangeKind::Direct, - ExchangeDeclareOptions { durable: true, auto_delete: false, internal: false, nowait: false, passive: false }, + ExchangeDeclareOptions { + durable: true, + auto_delete: false, + internal: false, + nowait: false, + passive: false, + }, FieldTable::default(), ) .await @@ -324,7 +359,10 @@ pub async fn publish_job_result(cfg: &MqConfig, result: &JobResult) -> Result<() .basic_publish( &cfg.exchange, routing_key, - BasicPublishOptions { mandatory: true, ..Default::default() }, + BasicPublishOptions { + mandatory: true, + ..Default::default() + }, &payload, props, ) diff --git a/crates/common/src/telemetry.rs b/crates/common/src/telemetry.rs index c739a9b..1ae6805 100644 --- a/crates/common/src/telemetry.rs +++ b/crates/common/src/telemetry.rs @@ -15,13 +15,14 @@ pub fn init_tracing(_service_name: &str) -> miette::Result { .with_writer(nb_writer) .with_ansi(atty::is(atty::Stream::Stderr)); - let filter = EnvFilter::try_from_default_env() - .unwrap_or_else(|_| EnvFilter::new("info")); + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::registry() .with(filter) .with(fmt_layer) .init(); - Ok(TelemetryGuard { _guard: Some(guard) }) + Ok(TelemetryGuard { + _guard: Some(guard), + }) } diff --git a/crates/forge-integration/src/main.rs b/crates/forge-integration/src/main.rs index ed1ee78..60e485f 100644 --- a/crates/forge-integration/src/main.rs +++ b/crates/forge-integration/src/main.rs @@ -2,12 +2,12 @@ use std::net::SocketAddr; use std::sync::Arc; use axum::{ + Router, body::Bytes, extract::State, http::{HeaderMap, StatusCode}, response::IntoResponse, routing::post, - Router, }; use clap::{Parser, Subcommand}; use hmac::{Hmac, Mac}; @@ -33,7 +33,11 @@ enum Cmd { } #[derive(Parser, Debug)] -#[command(name = "solstice-forge", version, about = "Solstice CI — Forge Integration Layer")] +#[command( + name = "solstice-forge", + version, + about = "Solstice CI — Forge Integration Layer" +)] struct Opts { /// HTTP bind address for webhooks (e.g., 0.0.0.0:8080) #[arg(long, env = "HTTP_ADDR", default_value = "0.0.0.0:8080")] @@ -87,12 +91,25 @@ async fn main() -> Result<()> { // Apply AMQP overrides if provided let mut mq_cfg = common::MqConfig::default(); - if let Some(u) = opts.amqp_url { mq_cfg.url = u; } - if let Some(x) = opts.amqp_exchange { mq_cfg.exchange = x; } - if let Some(q) = opts.amqp_queue { mq_cfg.queue = q; } - if let Some(rk) = opts.amqp_routing_key { mq_cfg.routing_key = rk; } + if let Some(u) = opts.amqp_url { + mq_cfg.url = u; + } + if let Some(x) = opts.amqp_exchange { + mq_cfg.exchange = x; + } + if let Some(q) = opts.amqp_queue { + mq_cfg.queue = q; + } + if let Some(rk) = opts.amqp_routing_key { + mq_cfg.routing_key = rk; + } - if let Some(Cmd::Enqueue { repo_url, commit_sha, runs_on }) = opts.cmd { + if let Some(Cmd::Enqueue { + repo_url, + commit_sha, + runs_on, + }) = opts.cmd + { let mut jr = common::JobRequest::new(common::SourceSystem::Manual, repo_url, commit_sha); jr.runs_on = runs_on; common::publish_job(&mq_cfg, &jr).await?; @@ -101,10 +118,15 @@ async fn main() -> Result<()> { } if opts.webhook_secret.is_none() { - warn!("WEBHOOK_SECRET is not set — accepting webhooks without signature validation (dev mode)"); + warn!( + "WEBHOOK_SECRET is not set — accepting webhooks without signature validation (dev mode)" + ); } - let state = Arc::new(AppState { mq_cfg, webhook_secret: opts.webhook_secret }); + let state = Arc::new(AppState { + mq_cfg, + webhook_secret: opts.webhook_secret, + }); // Leak the path string to satisfy 'static requirement for axum route API let path: &'static str = Box::leak(opts.webhook_path.clone().into_boxed_str()); @@ -114,9 +136,12 @@ async fn main() -> Result<()> { .with_state(state.clone()); let addr: SocketAddr = opts.http_addr.parse().expect("invalid HTTP_ADDR"); - axum::serve(tokio::net::TcpListener::bind(addr).await.expect("bind"), router) - .await - .expect("server error"); + axum::serve( + tokio::net::TcpListener::bind(addr).await.expect("bind"), + router, + ) + .await + .expect("server error"); Ok(()) } diff --git a/crates/github-integration/src/main.rs b/crates/github-integration/src/main.rs index 6870e44..04c2bce 100644 --- a/crates/github-integration/src/main.rs +++ b/crates/github-integration/src/main.rs @@ -3,7 +3,11 @@ use miette::Result; use tracing::{info, warn}; #[derive(Parser, Debug)] -#[command(name = "solstice-github", version, about = "Solstice CI — GitHub Integration (GitHub App)")] +#[command( + name = "solstice-github", + version, + about = "Solstice CI — GitHub Integration (GitHub App)" +)] struct Opts { /// HTTP bind address for GitHub webhooks (e.g., 0.0.0.0:8081) #[arg(long, env = "HTTP_ADDR", default_value = "0.0.0.0:8081")] diff --git a/crates/migration/src/lib.rs b/crates/migration/src/lib.rs index ff25d5a..a75b140 100644 --- a/crates/migration/src/lib.rs +++ b/crates/migration/src/lib.rs @@ -26,20 +26,35 @@ mod m2025_10_25_000001_create_jobs { Table::create() .table(Jobs::Table) .if_not_exists() - .col(ColumnDef::new(Jobs::RequestId).uuid().not_null().primary_key()) + .col( + ColumnDef::new(Jobs::RequestId) + .uuid() + .not_null() + .primary_key(), + ) .col(ColumnDef::new(Jobs::RepoUrl).string().not_null()) .col(ColumnDef::new(Jobs::CommitSha).string().not_null()) .col(ColumnDef::new(Jobs::RunsOn).string().null()) .col(ColumnDef::new(Jobs::State).string().not_null()) - .col(ColumnDef::new(Jobs::CreatedAt).timestamp_with_time_zone().not_null()) - .col(ColumnDef::new(Jobs::UpdatedAt).timestamp_with_time_zone().not_null()) + .col( + ColumnDef::new(Jobs::CreatedAt) + .timestamp_with_time_zone() + .not_null(), + ) + .col( + ColumnDef::new(Jobs::UpdatedAt) + .timestamp_with_time_zone() + .not_null(), + ) .to_owned(), ) .await } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - manager.drop_table(Table::drop().table(Jobs::Table).to_owned()).await + manager + .drop_table(Table::drop().table(Jobs::Table).to_owned()) + .await } } @@ -76,15 +91,25 @@ mod m2025_10_25_000002_create_vms { .col(ColumnDef::new(Vms::SeedPath).string().null()) .col(ColumnDef::new(Vms::Backend).string().not_null()) .col(ColumnDef::new(Vms::State).string().not_null()) - .col(ColumnDef::new(Vms::CreatedAt).timestamp_with_time_zone().not_null()) - .col(ColumnDef::new(Vms::UpdatedAt).timestamp_with_time_zone().not_null()) + .col( + ColumnDef::new(Vms::CreatedAt) + .timestamp_with_time_zone() + .not_null(), + ) + .col( + ColumnDef::new(Vms::UpdatedAt) + .timestamp_with_time_zone() + .not_null(), + ) .to_owned(), ) .await } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - manager.drop_table(Table::drop().table(Vms::Table).to_owned()).await + manager + .drop_table(Table::drop().table(Vms::Table).to_owned()) + .await } } diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index f055027..6449a69 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -1,4 +1,8 @@ -use std::{collections::BTreeMap, fs, path::{Path, PathBuf}}; +use std::{ + collections::BTreeMap, + fs, + path::{Path, PathBuf}, +}; use miette::{IntoDiagnostic as _, Result}; use serde::Deserialize; @@ -45,7 +49,6 @@ pub struct ImageDefaults { pub disk_gb: Option, } - #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum Decompress { @@ -54,7 +57,9 @@ pub enum Decompress { } impl Default for Decompress { - fn default() -> Self { Decompress::None } + fn default() -> Self { + Decompress::None + } } impl OrchestratorConfig { @@ -65,8 +70,7 @@ impl OrchestratorConfig { }; // Use blocking read via spawn_blocking to avoid blocking Tokio let cfg: OrchestratorConfig = task::spawn_blocking(move || { - let builder = config::Config::builder() - .add_source(config::File::from(path)); + let builder = config::Config::builder().add_source(config::File::from(path)); let cfg = builder.build().into_diagnostic()?; cfg.try_deserialize().into_diagnostic() }) @@ -113,10 +117,16 @@ pub async fn ensure_images(cfg: &OrchestratorConfig) -> Result<()> { let resp = reqwest::get(&image.source).await.into_diagnostic()?; let status = resp.status(); if !status.is_success() { - miette::bail!("failed to download {url}: {status}", url = image.source, status = status); + miette::bail!( + "failed to download {url}: {status}", + url = image.source, + status = status + ); } let bytes = resp.bytes().await.into_diagnostic()?; - tokio::fs::write(&tmp_path, &bytes).await.into_diagnostic()?; + tokio::fs::write(&tmp_path, &bytes) + .await + .into_diagnostic()?; // Decompress or move into place match image.decompress.unwrap_or(Decompress::None) { @@ -146,7 +156,6 @@ pub async fn ensure_images(cfg: &OrchestratorConfig) -> Result<()> { Ok(()) } - #[cfg(test)] mod tests { use super::*; @@ -172,7 +181,10 @@ mod tests { // resolve default assert_eq!(cfg.resolve_label(None), Some("openindiana-hipster")); // alias mapping - assert_eq!(cfg.resolve_label(Some("illumos-latest")), Some("openindiana-hipster")); + assert_eq!( + cfg.resolve_label(Some("illumos-latest")), + Some("openindiana-hipster") + ); // image for canonical key let img = cfg.image_for("openindiana-hipster").expect("image exists"); assert!(img.nocloud); diff --git a/crates/orchestrator/src/grpc.rs b/crates/orchestrator/src/grpc.rs index a756478..f0eb372 100644 --- a/crates/orchestrator/src/grpc.rs +++ b/crates/orchestrator/src/grpc.rs @@ -1,18 +1,23 @@ -use std::net::SocketAddr; use futures_util::StreamExt; use miette::{IntoDiagnostic as _, Result}; +use std::net::SocketAddr; use tonic::{Request, Response, Status}; use tracing::{error, info, warn}; +use common::runner::v1::{ + Ack, LogItem, + runner_server::{Runner, RunnerServer}, +}; use common::{MqConfig, publish_job_result}; -use common::runner::v1::{runner_server::{Runner, RunnerServer}, LogItem, Ack}; pub struct RunnerSvc { mq_cfg: MqConfig, } impl RunnerSvc { - pub fn new(mq_cfg: MqConfig) -> Self { Self { mq_cfg } } + pub fn new(mq_cfg: MqConfig) -> Self { + Self { mq_cfg } + } } #[tonic::async_trait] @@ -28,7 +33,12 @@ impl Runner for RunnerSvc { let mut exit_code: i32 = 0; let mut success: bool = true; - while let Some(item) = stream.next().await.transpose().map_err(|e| Status::internal(e.to_string()))? { + while let Some(item) = stream + .next() + .await + .transpose() + .map_err(|e| Status::internal(e.to_string()))? + { // Correlate request id if req_id.is_none() { match uuid::Uuid::parse_str(&item.request_id) { @@ -58,20 +68,38 @@ impl Runner for RunnerSvc { } // Publish final status if we have enough context - if let (Some(id), Some(repo), Some(sha)) = (req_id.as_ref(), repo_url.as_ref(), commit_sha.as_ref()) { - let result = common::messages::JobResult::new(id.clone(), repo.clone(), sha.clone(), success, exit_code, None); + if let (Some(id), Some(repo), Some(sha)) = + (req_id.as_ref(), repo_url.as_ref(), commit_sha.as_ref()) + { + let result = common::messages::JobResult::new( + id.clone(), + repo.clone(), + sha.clone(), + success, + exit_code, + None, + ); if let Err(e) = publish_job_result(&self.mq_cfg, &result).await { error!(error = %e, request_id = %id, "failed to publish JobResult"); } } else { - warn!(have_req_id = req_id.is_some(), have_repo = repo_url.is_some(), have_sha = commit_sha.is_some(), "missing context for JobResult; skipping publish"); + warn!( + have_req_id = req_id.is_some(), + have_repo = repo_url.is_some(), + have_sha = commit_sha.is_some(), + "missing context for JobResult; skipping publish" + ); } Ok(Response::new(Ack { ok: true })) } } -pub async fn serve_with_shutdown(addr: SocketAddr, mq_cfg: MqConfig, shutdown: impl std::future::Future) -> Result<()> { +pub async fn serve_with_shutdown( + addr: SocketAddr, + mq_cfg: MqConfig, + shutdown: impl std::future::Future, +) -> Result<()> { info!(%addr, "gRPC server starting"); tonic::transport::Server::builder() .add_service(RunnerServer::new(RunnerSvc::new(mq_cfg))) diff --git a/crates/orchestrator/src/hypervisor.rs b/crates/orchestrator/src/hypervisor.rs index cbbc7da..7945d98 100644 --- a/crates/orchestrator/src/hypervisor.rs +++ b/crates/orchestrator/src/hypervisor.rs @@ -1,12 +1,18 @@ -use std::{path::PathBuf, time::Duration}; - use async_trait::async_trait; -use miette::{Result, IntoDiagnostic as _}; +use miette::{IntoDiagnostic as _, Result}; +use std::os::unix::prelude::PermissionsExt; +use std::{path::PathBuf, time::Duration}; use tracing::info; // Backend tag is used internally to remember which backend handled this VM. #[derive(Debug, Clone, Copy)] -pub enum BackendTag { Noop, #[cfg(all(target_os = "linux", feature = "libvirt"))] Libvirt, #[cfg(target_os = "illumos")] Zones } +pub enum BackendTag { + Noop, + #[cfg(all(target_os = "linux", feature = "libvirt"))] + Libvirt, + #[cfg(target_os = "illumos")] + Zones, +} #[derive(Debug, Clone)] pub struct VmSpec { @@ -51,7 +57,9 @@ pub trait Hypervisor: Send + Sync { async fn start(&self, vm: &VmHandle) -> Result<()>; async fn stop(&self, vm: &VmHandle, graceful_timeout: Duration) -> Result<()>; async fn destroy(&self, vm: VmHandle) -> Result<()>; - async fn state(&self, _vm: &VmHandle) -> Result { Ok(VmState::Prepared) } + async fn state(&self, _vm: &VmHandle) -> Result { + Ok(VmState::Prepared) + } } /// A router that delegates to the correct backend implementation per job. @@ -70,16 +78,27 @@ impl RouterHypervisor { { return RouterHypervisor { noop: NoopHypervisor::default(), - libvirt: Some(LibvirtHypervisor { uri: libvirt_uri, network: libvirt_network }), + libvirt: Some(LibvirtHypervisor { + uri: libvirt_uri, + network: libvirt_network, + }), }; } #[cfg(target_os = "illumos")] { - return RouterHypervisor { noop: NoopHypervisor::default(), zones: Some(ZonesHypervisor) }; + return RouterHypervisor { + noop: NoopHypervisor::default(), + zones: Some(ZonesHypervisor), + }; } - #[cfg(all(not(target_os = "illumos"), not(all(target_os = "linux", feature = "libvirt"))))] + #[cfg(all( + not(target_os = "illumos"), + not(all(target_os = "linux", feature = "libvirt")) + ))] { - return RouterHypervisor { noop: NoopHypervisor::default() }; + return RouterHypervisor { + noop: NoopHypervisor::default(), + }; } } } @@ -89,11 +108,15 @@ impl Hypervisor for RouterHypervisor { async fn prepare(&self, spec: &VmSpec, ctx: &JobContext) -> Result { #[cfg(all(target_os = "linux", feature = "libvirt"))] { - if let Some(ref hv) = self.libvirt { return hv.prepare(spec, ctx).await; } + if let Some(ref hv) = self.libvirt { + return hv.prepare(spec, ctx).await; + } } #[cfg(target_os = "illumos")] { - if let Some(ref hv) = self.zones { return hv.prepare(spec, ctx).await; } + if let Some(ref hv) = self.zones { + return hv.prepare(spec, ctx).await; + } } self.noop.prepare(spec, ctx).await } @@ -101,11 +124,19 @@ impl Hypervisor for RouterHypervisor { match vm.backend { #[cfg(all(target_os = "linux", feature = "libvirt"))] BackendTag::Libvirt => { - if let Some(ref hv) = self.libvirt { hv.start(vm).await } else { self.noop.start(vm).await } + if let Some(ref hv) = self.libvirt { + hv.start(vm).await + } else { + self.noop.start(vm).await + } } #[cfg(target_os = "illumos")] BackendTag::Zones => { - if let Some(ref hv) = self.zones { hv.start(vm).await } else { self.noop.start(vm).await } + if let Some(ref hv) = self.zones { + hv.start(vm).await + } else { + self.noop.start(vm).await + } } _ => self.noop.start(vm).await, } @@ -114,11 +145,19 @@ impl Hypervisor for RouterHypervisor { match vm.backend { #[cfg(all(target_os = "linux", feature = "libvirt"))] BackendTag::Libvirt => { - if let Some(ref hv) = self.libvirt { hv.stop(vm, t).await } else { self.noop.stop(vm, t).await } + if let Some(ref hv) = self.libvirt { + hv.stop(vm, t).await + } else { + self.noop.stop(vm, t).await + } } #[cfg(target_os = "illumos")] BackendTag::Zones => { - if let Some(ref hv) = self.zones { hv.stop(vm, t).await } else { self.noop.stop(vm, t).await } + if let Some(ref hv) = self.zones { + hv.stop(vm, t).await + } else { + self.noop.stop(vm, t).await + } } _ => self.noop.stop(vm, t).await, } @@ -127,11 +166,19 @@ impl Hypervisor for RouterHypervisor { match vm.backend { #[cfg(all(target_os = "linux", feature = "libvirt"))] BackendTag::Libvirt => { - if let Some(ref hv) = self.libvirt { hv.destroy(vm).await } else { self.noop.destroy(vm).await } + if let Some(ref hv) = self.libvirt { + hv.destroy(vm).await + } else { + self.noop.destroy(vm).await + } } #[cfg(target_os = "illumos")] BackendTag::Zones => { - if let Some(ref hv) = self.zones { hv.destroy(vm).await } else { self.noop.destroy(vm).await } + if let Some(ref hv) = self.zones { + hv.destroy(vm).await + } else { + self.noop.destroy(vm).await + } } _ => self.noop.destroy(vm).await, } @@ -140,11 +187,19 @@ impl Hypervisor for RouterHypervisor { match vm.backend { #[cfg(all(target_os = "linux", feature = "libvirt"))] BackendTag::Libvirt => { - if let Some(ref hv) = self.libvirt { hv.state(vm).await } else { Ok(VmState::Prepared) } + if let Some(ref hv) = self.libvirt { + hv.state(vm).await + } else { + Ok(VmState::Prepared) + } } #[cfg(target_os = "illumos")] BackendTag::Zones => { - if let Some(ref hv) = self.zones { hv.state(vm).await } else { Ok(VmState::Prepared) } + if let Some(ref hv) = self.zones { + hv.state(vm).await + } else { + Ok(VmState::Prepared) + } } _ => Ok(VmState::Prepared), } @@ -160,9 +215,17 @@ impl Hypervisor for NoopHypervisor { async fn prepare(&self, spec: &VmSpec, ctx: &JobContext) -> Result { let id = format!("noop-{}", ctx.request_id); let work_dir = std::env::temp_dir().join("solstice-noop").join(&id); - tokio::fs::create_dir_all(&work_dir).await.into_diagnostic()?; + tokio::fs::create_dir_all(&work_dir) + .await + .into_diagnostic()?; info!(id = %id, label = %spec.label, image = ?spec.image_path, "noop prepare"); - Ok(VmHandle { id, backend: BackendTag::Noop, work_dir, overlay_path: None, seed_iso_path: None }) + Ok(VmHandle { + id, + backend: BackendTag::Noop, + work_dir, + overlay_path: None, + seed_iso_path: None, + }) } async fn start(&self, vm: &VmHandle) -> Result<()> { info!(id = %vm.id, "noop start"); @@ -214,7 +277,8 @@ impl Hypervisor for LibvirtHypervisor { let net_name = self.network.clone(); tokio::task::spawn_blocking(move || -> miette::Result<()> { use virt::{connect::Connect, network::Network}; - let conn = Connect::open(Some(&uri)).map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; + let conn = Connect::open(Some(&uri)) + .map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; if let Ok(net) = Network::lookup_by_name(&conn, &net_name) { // If not active, try to create (activate). Then set autostart. let active = net.is_active().unwrap_or(false); @@ -224,7 +288,9 @@ impl Hypervisor for LibvirtHypervisor { let _ = net.set_autostart(true); } Ok(()) - }).await.into_diagnostic()??; + }) + .await + .into_diagnostic()??; // Create qcow2 overlay let overlay = work_dir.join("overlay.qcow2"); @@ -255,30 +321,43 @@ impl Hypervisor for LibvirtHypervisor { }; let out = Command::new("qemu-img") - .args(["create","-f","qcow2","-F"]) + .args(["create", "-f", "qcow2", "-F"]) .arg(&base_fmt) - .args(["-b"]) + .args(["-b"]) .arg(&base) .arg(&overlay) .arg(&size_arg) .output() .map_err(|e| miette::miette!("qemu-img not found or failed: {e}"))?; - if !out.status.success() { return Err(miette::miette!("qemu-img create failed: {}", String::from_utf8_lossy(&out.stderr))); } + if !out.status.success() { + return Err(miette::miette!( + "qemu-img create failed: {}", + String::from_utf8_lossy(&out.stderr) + )); + } Ok(()) } - }).await.into_diagnostic()??; + }) + .await + .into_diagnostic()??; let _ = status; // appease compiler if unused // Build NoCloud seed ISO if user_data provided let mut seed_iso: Option = None; if let Some(ref user_data) = spec.user_data { let seed_dir = work_dir.join("seed"); - tokio::fs::create_dir_all(&seed_dir).await.into_diagnostic()?; + tokio::fs::create_dir_all(&seed_dir) + .await + .into_diagnostic()?; let ud_path = seed_dir.join("user-data"); let md_path = seed_dir.join("meta-data"); - tokio::fs::write(&ud_path, user_data).await.into_diagnostic()?; + tokio::fs::write(&ud_path, user_data) + .await + .into_diagnostic()?; let meta = format!("instance-id: {}\nlocal-hostname: {}\n", id, id); - tokio::fs::write(&md_path, meta.as_bytes()).await.into_diagnostic()?; + tokio::fs::write(&md_path, meta.as_bytes()) + .await + .into_diagnostic()?; // mkisofs or genisoimage let iso_path = work_dir.join("seed.iso"); @@ -288,17 +367,25 @@ impl Hypervisor for LibvirtHypervisor { move || -> miette::Result<()> { let try_mk = |bin: &str| -> std::io::Result { Command::new(bin) - .args(["-V","cidata","-J","-R","-o"]) + .args(["-V", "cidata", "-J", "-R", "-o"]) .arg(&iso_path) .arg(&seed_dir) .output() }; - let out = try_mk("mkisofs").or_else(|_| try_mk("genisoimage")) + let out = try_mk("mkisofs") + .or_else(|_| try_mk("genisoimage")) .map_err(|e| miette::miette!("mkisofs/genisoimage not found: {e}"))?; - if !out.status.success() { return Err(miette::miette!("mkisofs failed: {}", String::from_utf8_lossy(&out.stderr))); } + if !out.status.success() { + return Err(miette::miette!( + "mkisofs failed: {}", + String::from_utf8_lossy(&out.stderr) + )); + } Ok(()) } - }).await.into_diagnostic()??; + }) + .await + .into_diagnostic()??; seed_iso = Some(iso_path); } @@ -310,8 +397,10 @@ impl Hypervisor for LibvirtHypervisor { let seed_str = seed_iso.as_ref().map(|p| p.display().to_string()); let net = self.network.clone(); let cdrom = seed_str.map(|p| format!("\n \n \n \n \n", p)).unwrap_or_default(); - format!("\n{}\n{}\n{}\n\n hvm\n \n\n\n\n \n \n \n \n \n {}\n \n \n \n \n \n \n \n \n \n \n \n\ndestroy\ndestroy\n", - id, mem, vcpus, overlay_str, cdrom, net) + format!( + "\n{}\n{}\n{}\n\n hvm\n \n\n\n\n \n \n \n \n \n {}\n \n \n \n \n \n \n \n \n \n \n \n\ndestroy\ndestroy\n", + id, mem, vcpus, overlay_str, cdrom, net + ) }; // Define via virt crate @@ -319,13 +408,23 @@ impl Hypervisor for LibvirtHypervisor { let xml_clone = xml.clone(); tokio::task::spawn_blocking(move || -> miette::Result<()> { use virt::{connect::Connect, domain::Domain}; - let conn = Connect::open(Some(&uri2)).map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; - let _dom = Domain::define_xml(&conn, &xml_clone).map_err(|e| miette::miette!("define domain failed: {e}"))?; + let conn = Connect::open(Some(&uri2)) + .map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; + let _dom = Domain::define_xml(&conn, &xml_clone) + .map_err(|e| miette::miette!("define domain failed: {e}"))?; Ok(()) - }).await.into_diagnostic()??; + }) + .await + .into_diagnostic()??; info!(domain = %id, image = ?spec.image_path, cpu = spec.cpu, ram_mb = spec.ram_mb, "libvirt prepared"); - Ok(VmHandle { id, backend: BackendTag::Libvirt, work_dir, overlay_path: Some(overlay), seed_iso_path: seed_iso }) + Ok(VmHandle { + id, + backend: BackendTag::Libvirt, + work_dir, + overlay_path: Some(overlay), + seed_iso_path: seed_iso, + }) } async fn start(&self, vm: &VmHandle) -> Result<()> { @@ -333,12 +432,17 @@ impl Hypervisor for LibvirtHypervisor { let uri = self.uri.clone(); tokio::task::spawn_blocking(move || -> miette::Result<()> { use virt::{connect::Connect, domain::Domain}; - let conn = Connect::open(Some(&uri)).map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; + let conn = Connect::open(Some(&uri)) + .map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; // Lookup domain by name and start - let dom = Domain::lookup_by_name(&conn, &id).map_err(|e| miette::miette!("lookup domain failed: {e}"))?; - dom.create().map_err(|e| miette::miette!("domain start failed: {e}"))?; + let dom = Domain::lookup_by_name(&conn, &id) + .map_err(|e| miette::miette!("lookup domain failed: {e}"))?; + dom.create() + .map_err(|e| miette::miette!("domain start failed: {e}"))?; Ok(()) - }).await.into_diagnostic()??; + }) + .await + .into_diagnostic()??; info!(domain = %vm.id, "libvirt started"); Ok(()) } @@ -348,8 +452,10 @@ impl Hypervisor for LibvirtHypervisor { let uri = self.uri.clone(); tokio::task::spawn_blocking(move || -> miette::Result<()> { use virt::{connect::Connect, domain::Domain}; - let conn = Connect::open(Some(&uri)).map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; - let dom = Domain::lookup_by_name(&conn, &id).map_err(|e| miette::miette!("lookup domain failed: {e}"))?; + let conn = Connect::open(Some(&uri)) + .map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; + let dom = Domain::lookup_by_name(&conn, &id) + .map_err(|e| miette::miette!("lookup domain failed: {e}"))?; let _ = dom.shutdown(); let start = std::time::Instant::now(); while start.elapsed() < t { @@ -362,7 +468,9 @@ impl Hypervisor for LibvirtHypervisor { // Force destroy if still active let _ = dom.destroy(); Ok(()) - }).await.into_diagnostic()??; + }) + .await + .into_diagnostic()??; info!(domain = %vm.id, "libvirt stopped"); Ok(()) } @@ -373,15 +481,22 @@ impl Hypervisor for LibvirtHypervisor { let id_for_task = id.clone(); tokio::task::spawn_blocking(move || -> miette::Result<()> { use virt::{connect::Connect, domain::Domain}; - let conn = Connect::open(Some(&uri)).map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; + let conn = Connect::open(Some(&uri)) + .map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; if let Ok(dom) = Domain::lookup_by_name(&conn, &id_for_task) { let _ = dom.undefine(); } Ok(()) - }).await.into_diagnostic()??; + }) + .await + .into_diagnostic()??; // Cleanup files - if let Some(p) = vm.overlay_path.as_ref() { let _ = tokio::fs::remove_file(p).await; } - if let Some(p) = vm.seed_iso_path.as_ref() { let _ = tokio::fs::remove_file(p).await; } + if let Some(p) = vm.overlay_path.as_ref() { + let _ = tokio::fs::remove_file(p).await; + } + if let Some(p) = vm.seed_iso_path.as_ref() { + let _ = tokio::fs::remove_file(p).await; + } let _ = tokio::fs::remove_dir_all(&vm.work_dir).await; info!(domain = %id, "libvirt destroyed"); Ok(()) @@ -392,12 +507,20 @@ impl Hypervisor for LibvirtHypervisor { let uri = self.uri.clone(); let active = tokio::task::spawn_blocking(move || -> miette::Result { use virt::{connect::Connect, domain::Domain}; - let conn = Connect::open(Some(&uri)).map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; - let dom = Domain::lookup_by_name(&conn, &id).map_err(|e| miette::miette!("lookup domain failed: {e}"))?; + let conn = Connect::open(Some(&uri)) + .map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; + let dom = Domain::lookup_by_name(&conn, &id) + .map_err(|e| miette::miette!("lookup domain failed: {e}"))?; let active = dom.is_active().unwrap_or(false); Ok(active) - }).await.into_diagnostic()??; - Ok(if active { VmState::Running } else { VmState::Stopped }) + }) + .await + .into_diagnostic()??; + Ok(if active { + VmState::Running + } else { + VmState::Stopped + }) } } @@ -428,16 +551,25 @@ impl Hypervisor for ZonesHypervisor { let base = spec.image_path.clone(); let base_fmt = tokio::task::spawn_blocking(move || -> miette::Result { let out = Command::new("qemu-img") - .args(["info", "--output=json"]).arg(&base) + .args(["info", "--output=json"]) + .arg(&base) .output() .map_err(|e| miette::miette!("qemu-img not found or failed: {e}"))?; if !out.status.success() { - return Err(miette::miette!("qemu-img info failed: {}", String::from_utf8_lossy(&out.stderr))); + return Err(miette::miette!( + "qemu-img info failed: {}", + String::from_utf8_lossy(&out.stderr) + )); } let v: serde_json::Value = serde_json::from_slice(&out.stdout) .map_err(|e| miette::miette!("parse qemu-img info json failed: {e}"))?; - Ok(v.get("format").and_then(|f| f.as_str()).unwrap_or("raw").to_string()) - }).await.into_diagnostic()??; + Ok(v.get("format") + .and_then(|f| f.as_str()) + .unwrap_or("raw") + .to_string()) + }) + .await + .into_diagnostic()??; // Ensure raw image for bhyve: convert if needed let raw_path = if base_fmt != "raw" { @@ -446,16 +578,21 @@ impl Hypervisor for ZonesHypervisor { let dst = out_path.clone(); tokio::task::spawn_blocking(move || -> miette::Result<()> { let out = Command::new("qemu-img") - .args(["convert", "-O", "raw"]) + .args(["convert", "-O", "raw"]) .arg(&src) .arg(&dst) .output() .map_err(|e| miette::miette!("qemu-img convert failed to start: {e}"))?; if !out.status.success() { - return Err(miette::miette!("qemu-img convert failed: {}", String::from_utf8_lossy(&out.stderr))); + return Err(miette::miette!( + "qemu-img convert failed: {}", + String::from_utf8_lossy(&out.stderr) + )); } Ok(()) - }).await.into_diagnostic()??; + }) + .await + .into_diagnostic()??; info!(label = %spec.label, src = ?spec.image_path, out = ?out_path, "converted image to raw for bhyve"); out_path } else { @@ -463,9 +600,21 @@ impl Hypervisor for ZonesHypervisor { }; // Seed ISO creation left to future; for now, return handle with path in overlay_path - Ok(VmHandle { id, backend: BackendTag::Zones, work_dir, overlay_path: Some(raw_path), seed_iso_path: None }) + Ok(VmHandle { + id, + backend: BackendTag::Zones, + work_dir, + overlay_path: Some(raw_path), + seed_iso_path: None, + }) + } + async fn start(&self, _vm: &VmHandle) -> Result<()> { + Ok(()) + } + async fn stop(&self, _vm: &VmHandle, _t: Duration) -> Result<()> { + Ok(()) + } + async fn destroy(&self, _vm: VmHandle) -> Result<()> { + Ok(()) } - async fn start(&self, _vm: &VmHandle) -> Result<()> { Ok(()) } - async fn stop(&self, _vm: &VmHandle, _t: Duration) -> Result<()> { Ok(()) } - async fn destroy(&self, _vm: VmHandle) -> Result<()> { Ok(()) } } diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 68bec51..aeae7ac 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -1,8 +1,8 @@ mod config; -mod hypervisor; -mod scheduler; -mod persist; mod grpc; +mod hypervisor; +mod persist; +mod scheduler; use std::{collections::HashMap, path::PathBuf, time::Duration}; @@ -10,15 +10,19 @@ use clap::Parser; use miette::{IntoDiagnostic as _, Result}; use tracing::{info, warn}; +use crate::persist::{JobState, Persist}; use config::OrchestratorConfig; -use hypervisor::{RouterHypervisor, VmSpec, JobContext}; -use scheduler::{Scheduler, SchedItem}; +use hypervisor::{JobContext, RouterHypervisor, VmSpec}; +use scheduler::{SchedItem, Scheduler}; use std::sync::Arc; use tokio::sync::Notify; -use crate::persist::{Persist, JobState}; #[derive(Parser, Debug)] -#[command(name = "solstice-orchestrator", version, about = "Solstice CI Orchestrator")] +#[command( + name = "solstice-orchestrator", + version, + about = "Solstice CI Orchestrator" +)] struct Opts { /// Path to orchestrator YAML config (image map) #[arg(long, env = "ORCH_CONFIG")] @@ -37,7 +41,11 @@ struct Opts { grpc_addr: String, /// Postgres connection string - #[arg(long, env = "DATABASE_URL", default_value = "postgres://user:pass@localhost:5432/solstice")] + #[arg( + long, + env = "DATABASE_URL", + default_value = "postgres://user:pass@localhost:5432/solstice" + )] database_url: String, /// RabbitMQ URL (AMQP) @@ -114,11 +122,13 @@ async fn main() -> Result<()> { let grpc_task = tokio::spawn(async move { let _ = crate::grpc::serve_with_shutdown(grpc_addr, mq_cfg_for_grpc, async move { let _ = grpc_shutdown_rx.await; - }).await; + }) + .await; }); // Orchestrator contact address for runner to dial back (can override via ORCH_CONTACT_ADDR) - let orch_contact = std::env::var("ORCH_CONTACT_ADDR").unwrap_or_else(|_| opts.grpc_addr.clone()); + let orch_contact = + std::env::var("ORCH_CONTACT_ADDR").unwrap_or_else(|_| opts.grpc_addr.clone()); // Scheduler let sched = Scheduler::new( @@ -177,7 +187,7 @@ async fn main() -> Result<()> { disk_gb, network: None, // libvirt network handled in backend nocloud: image.nocloud, - user_data: Some(make_cloud_init_userdata(&job.repo_url, &job.commit_sha, job.request_id, &orch_contact_val)), + user_data: Some(make_cloud_init_userdata(&job.repo_url, &job.commit_sha, job.request_id, &orch_contact_val)), }; if !spec.nocloud { warn!(label = %label_resolved, "image is not marked nocloud=true; cloud-init may not work"); @@ -219,22 +229,34 @@ fn parse_capacity_map(s: Option<&str>) -> HashMap { let mut m = HashMap::new(); if let Some(s) = s { for part in s.split(',') { - if part.trim().is_empty() { continue; } - if let Some((k,v)) = part.split_once('=') { + if part.trim().is_empty() { + continue; + } + if let Some((k, v)) = part.split_once('=') { let k = k.trim(); - if k.is_empty() { continue; } - if let Ok(n) = v.parse::() { m.insert(k.to_string(), n); } + if k.is_empty() { + continue; + } + if let Ok(n) = v.parse::() { + m.insert(k.to_string(), n); + } } } } m } -fn make_cloud_init_userdata(repo_url: &str, commit_sha: &str, request_id: uuid::Uuid, orch_addr: &str) -> Vec { +fn make_cloud_init_userdata( + repo_url: &str, + commit_sha: &str, + request_id: uuid::Uuid, + orch_addr: &str, +) -> Vec { // Allow local dev to inject one or more runner URLs that the VM can fetch. let runner_url = std::env::var("SOLSTICE_RUNNER_URL").unwrap_or_default(); let runner_urls = std::env::var("SOLSTICE_RUNNER_URLS").unwrap_or_default(); - let s = format!(r#"#cloud-config + let s = format!( + r#"#cloud-config write_files: - path: /etc/solstice/job.yaml permissions: '0644' @@ -311,18 +333,26 @@ write_files: (command -v poweroff >/dev/null 2>&1 && poweroff) || (command -v shutdown >/dev/null 2>&1 && shutdown -y -i5 -g0) || true runcmd: - [ /usr/local/bin/solstice-bootstrap.sh ] -"#, repo = repo_url, sha = commit_sha, req_id = request_id, orch_addr = orch_addr, runner_url = runner_url, runner_urls = runner_urls); +"#, + repo = repo_url, + sha = commit_sha, + req_id = request_id, + orch_addr = orch_addr, + runner_url = runner_url, + runner_urls = runner_urls + ); s.into_bytes() } - #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_capacity_map_mixed_input() { - let m = parse_capacity_map(Some("illumos-latest=2, ubuntu-22.04=4, bad=, =3, other=notnum, foo=5, ,")); + let m = parse_capacity_map(Some( + "illumos-latest=2, ubuntu-22.04=4, bad=, =3, other=notnum, foo=5, ,", + )); assert_eq!(m.get("illumos-latest"), Some(&2)); assert_eq!(m.get("ubuntu-22.04"), Some(&4)); assert_eq!(m.get("foo"), Some(&5)); @@ -334,7 +364,12 @@ mod tests { #[test] fn test_make_cloud_init_userdata_includes_fields() { let req_id = uuid::Uuid::new_v4(); - let data = make_cloud_init_userdata("https://example.com/repo.git", "deadbeef", req_id, "127.0.0.1:50051"); + let data = make_cloud_init_userdata( + "https://example.com/repo.git", + "deadbeef", + req_id, + "127.0.0.1:50051", + ); let s = String::from_utf8(data).unwrap(); assert!(s.contains("#cloud-config")); assert!(s.contains("repo_url: https://example.com/repo.git")); diff --git a/crates/orchestrator/src/persist.rs b/crates/orchestrator/src/persist.rs index 05ac4c2..078dfd3 100644 --- a/crates/orchestrator/src/persist.rs +++ b/crates/orchestrator/src/persist.rs @@ -1,10 +1,13 @@ +use chrono::Utc; use miette::{IntoDiagnostic as _, Result}; -use sea_orm::{entity::prelude::*, Database, DatabaseConnection, Set, ActiveModelTrait, QueryFilter, ColumnTrait}; -use sea_orm::sea_query::{OnConflict, Expr}; +use sea_orm::sea_query::{Expr, OnConflict}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Database, DatabaseConnection, QueryFilter, Set, + entity::prelude::*, +}; use sea_orm_migration::MigratorTrait; use tracing::{debug, info, warn}; use uuid::Uuid; -use chrono::Utc; /// Minimal persistence module for the Orchestrator with real upserts for jobs & vms. #[derive(Clone)] @@ -13,20 +16,40 @@ pub struct Persist { } #[derive(Debug, Clone, Copy)] -pub enum JobState { Queued, Running, Succeeded, Failed } +pub enum JobState { + Queued, + Running, + Succeeded, + Failed, +} impl JobState { fn as_str(self) -> &'static str { - match self { JobState::Queued => "queued", JobState::Running => "running", JobState::Succeeded => "succeeded", JobState::Failed => "failed" } + match self { + JobState::Queued => "queued", + JobState::Running => "running", + JobState::Succeeded => "succeeded", + JobState::Failed => "failed", + } } } #[derive(Debug, Clone, Copy)] -pub enum VmPersistState { Prepared, Running, Stopped, Destroyed } +pub enum VmPersistState { + Prepared, + Running, + Stopped, + Destroyed, +} impl VmPersistState { fn as_str(self) -> &'static str { - match self { VmPersistState::Prepared => "prepared", VmPersistState::Running => "running", VmPersistState::Stopped => "stopped", VmPersistState::Destroyed => "destroyed" } + match self { + VmPersistState::Prepared => "prepared", + VmPersistState::Running => "running", + VmPersistState::Stopped => "stopped", + VmPersistState::Destroyed => "destroyed", + } } } @@ -36,7 +59,6 @@ impl VmPersistState { mod jobs { use super::*; - #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "jobs")] @@ -59,7 +81,6 @@ mod jobs { mod vms { use super::*; - #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "vms")] @@ -120,7 +141,9 @@ impl Persist { Ok(Self { db: None }) } - pub fn is_enabled(&self) -> bool { self.db.is_some() } + pub fn is_enabled(&self) -> bool { + self.db.is_some() + } /// Upsert a job row by request_id. pub async fn record_job_state( @@ -182,8 +205,14 @@ impl Persist { let state_val = state.as_str(); let res = vms::Entity::update_many() - .col_expr(vms::Column::OverlayPath, Expr::value(overlay_path.map(|s| s.to_string()))) - .col_expr(vms::Column::SeedPath, Expr::value(seed_path.map(|s| s.to_string()))) + .col_expr( + vms::Column::OverlayPath, + Expr::value(overlay_path.map(|s| s.to_string())), + ) + .col_expr( + vms::Column::SeedPath, + Expr::value(seed_path.map(|s| s.to_string())), + ) .col_expr(vms::Column::Backend, Expr::value(backend_val)) .col_expr(vms::Column::State, Expr::value(state_val)) .col_expr(vms::Column::UpdatedAt, Expr::value(now)) @@ -216,8 +245,12 @@ mod tests { async fn sqlite_memory_db() -> DatabaseConnection { let mut opts = sea_orm::ConnectOptions::new("sqlite::memory:".to_string()); - opts.max_connections(1).min_connections(1).sqlx_logging(false); - let db = Database::connect(opts).await.expect("sqlite memory connect"); + opts.max_connections(1) + .min_connections(1) + .sqlx_logging(false); + let db = Database::connect(opts) + .await + .expect("sqlite memory connect"); // Create tables from entities to avoid using migrator (faster and avoids migration bookkeeping table) let backend = db.get_database_backend(); let schema = sea_orm::Schema::new(backend); @@ -233,18 +266,32 @@ mod tests { #[tokio::test] async fn test_job_upsert_sqlite() { let db = sqlite_memory_db().await; - let p = Persist { db: Some(db.clone()) }; + let p = Persist { + db: Some(db.clone()), + }; let req = Uuid::new_v4(); let repo = "https://example.com/repo.git"; let sha = "deadbeef"; // Insert queued - p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Queued).await.expect("insert queued"); + p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Queued) + .await + .expect("insert queued"); // Fetch - let row = jobs::Entity::find_by_id(req).one(&db).await.expect("query").expect("row exists"); + let row = jobs::Entity::find_by_id(req) + .one(&db) + .await + .expect("query") + .expect("row exists"); assert_eq!(row.state, "queued"); // Update to running - p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Running).await.expect("update running"); - let row2 = jobs::Entity::find_by_id(req).one(&db).await.expect("query").expect("row exists"); + p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Running) + .await + .expect("update running"); + let row2 = jobs::Entity::find_by_id(req) + .one(&db) + .await + .expect("query") + .expect("row exists"); assert_eq!(row2.state, "running"); assert!(row2.updated_at >= row.created_at); } @@ -252,13 +299,42 @@ mod tests { #[tokio::test] async fn test_vm_event_upsert_sqlite() { let db = sqlite_memory_db().await; - let p = Persist { db: Some(db.clone()) }; + let p = Persist { + db: Some(db.clone()), + }; let req = Uuid::new_v4(); let domain = format!("job-{}", req); // prepared -> running -> destroyed - p.record_vm_event(req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Prepared).await.expect("prepared"); - p.record_vm_event(req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Running).await.expect("running"); - p.record_vm_event(req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Destroyed).await.expect("destroyed"); + p.record_vm_event( + req, + &domain, + Some("/tmp/ovl.qcow2"), + Some("/tmp/seed.iso"), + Some("noop"), + VmPersistState::Prepared, + ) + .await + .expect("prepared"); + p.record_vm_event( + req, + &domain, + Some("/tmp/ovl.qcow2"), + Some("/tmp/seed.iso"), + Some("noop"), + VmPersistState::Running, + ) + .await + .expect("running"); + p.record_vm_event( + req, + &domain, + Some("/tmp/ovl.qcow2"), + Some("/tmp/seed.iso"), + Some("noop"), + VmPersistState::Destroyed, + ) + .await + .expect("destroyed"); use sea_orm::QuerySelect; let rows = vms::Entity::find() .filter(vms::Column::RequestId.eq(req)) @@ -275,7 +351,6 @@ mod tests { } } - #[cfg(test)] mod noop_tests { use super::*; @@ -286,7 +361,18 @@ mod noop_tests { assert!(!p.is_enabled()); // Calls should succeed without DB let req = Uuid::new_v4(); - p.record_job_state(req, "https://x", "sha", Some("illumos"), JobState::Queued).await.expect("job noop ok"); - p.record_vm_event(req, "job-x", None, None, Some("noop"), VmPersistState::Prepared).await.expect("vm noop ok"); + p.record_job_state(req, "https://x", "sha", Some("illumos"), JobState::Queued) + .await + .expect("job noop ok"); + p.record_vm_event( + req, + "job-x", + None, + None, + Some("noop"), + VmPersistState::Prepared, + ) + .await + .expect("vm noop ok"); } } diff --git a/crates/orchestrator/src/scheduler.rs b/crates/orchestrator/src/scheduler.rs index 7a14b2b..b3336d1 100644 --- a/crates/orchestrator/src/scheduler.rs +++ b/crates/orchestrator/src/scheduler.rs @@ -2,11 +2,11 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use dashmap::DashMap; use miette::Result; -use tokio::sync::{mpsc, Semaphore, Notify}; +use tokio::sync::{Notify, Semaphore, mpsc}; use tracing::{error, info, warn}; -use crate::hypervisor::{Hypervisor, VmSpec, JobContext, BackendTag}; -use crate::persist::{Persist, VmPersistState, JobState}; +use crate::hypervisor::{BackendTag, Hypervisor, JobContext, VmSpec}; +use crate::persist::{JobState, Persist, VmPersistState}; pub struct Scheduler { hv: Arc, @@ -26,7 +26,13 @@ pub struct SchedItem { } impl Scheduler { - pub fn new(hv: H, max_concurrency: usize, capacity_map: &HashMap, persist: Arc, placeholder_runtime: Duration) -> Self { + pub fn new( + hv: H, + max_concurrency: usize, + capacity_map: &HashMap, + persist: Arc, + placeholder_runtime: Duration, + ) -> Self { let (tx, rx) = mpsc::channel::(max_concurrency * 4); let label_sems = DashMap::new(); for (label, cap) in capacity_map.iter() { @@ -43,10 +49,20 @@ impl Scheduler { } } - pub fn sender(&self) -> mpsc::Sender { self.tx.clone() } + pub fn sender(&self) -> mpsc::Sender { + self.tx.clone() + } pub async fn run_with_shutdown(self, shutdown: Arc) -> Result<()> { - let Scheduler { hv, mut rx, global_sem, label_sems, persist, placeholder_runtime, .. } = self; + let Scheduler { + hv, + mut rx, + global_sem, + label_sems, + persist, + placeholder_runtime, + .. + } = self; let mut handles = Vec::new(); let mut shutting_down = false; 'scheduler: loop { @@ -138,7 +154,9 @@ impl Scheduler { } } // Wait for all in-flight tasks to finish - for h in handles { let _ = h.await; } + for h in handles { + let _ = h.await; + } info!("scheduler: completed"); Ok(()) } @@ -149,14 +167,13 @@ impl Scheduler { } } - #[cfg(test)] mod tests { use super::*; - use std::path::PathBuf; - use std::sync::atomic::{AtomicUsize, Ordering}; use async_trait::async_trait; use dashmap::DashMap; + use std::path::PathBuf; + use std::sync::atomic::{AtomicUsize, Ordering}; use crate::hypervisor::{VmHandle, VmState}; @@ -172,7 +189,13 @@ mod tests { impl MockHypervisor { fn new(active_all: Arc, peak_all: Arc) -> Self { - Self { active_all, peak_all, per_curr: Arc::new(DashMap::new()), per_peak: Arc::new(DashMap::new()), id_to_label: Arc::new(DashMap::new()) } + Self { + active_all, + peak_all, + per_curr: Arc::new(DashMap::new()), + per_peak: Arc::new(DashMap::new()), + id_to_label: Arc::new(DashMap::new()), + } } fn update_peak(peak: &AtomicUsize, current: usize) { let mut prev = peak.load(Ordering::Relaxed); @@ -191,9 +214,16 @@ mod tests { let now = self.active_all.fetch_add(1, Ordering::SeqCst) + 1; Self::update_peak(&self.peak_all, now); // per-label current/peak - let entry = self.per_curr.entry(spec.label.clone()).or_insert_with(|| Arc::new(AtomicUsize::new(0))); + let entry = self + .per_curr + .entry(spec.label.clone()) + .or_insert_with(|| Arc::new(AtomicUsize::new(0))); let curr = entry.fetch_add(1, Ordering::SeqCst) + 1; - let peak_entry = self.per_peak.entry(spec.label.clone()).or_insert_with(|| Arc::new(AtomicUsize::new(0))).clone(); + let peak_entry = self + .per_peak + .entry(spec.label.clone()) + .or_insert_with(|| Arc::new(AtomicUsize::new(0))) + .clone(); Self::update_peak(&peak_entry, curr); let id = format!("job-{}", ctx.request_id); @@ -207,8 +237,12 @@ mod tests { seed_iso_path: None, }) } - async fn start(&self, _vm: &VmHandle) -> miette::Result<()> { Ok(()) } - async fn stop(&self, _vm: &VmHandle, _t: Duration) -> miette::Result<()> { Ok(()) } + async fn start(&self, _vm: &VmHandle) -> miette::Result<()> { + Ok(()) + } + async fn stop(&self, _vm: &VmHandle, _t: Duration) -> miette::Result<()> { + Ok(()) + } async fn destroy(&self, vm: VmHandle) -> miette::Result<()> { // decrement overall current self.active_all.fetch_sub(1, Ordering::SeqCst); @@ -220,7 +254,9 @@ mod tests { } Ok(()) } - async fn state(&self, _vm: &VmHandle) -> miette::Result { Ok(VmState::Prepared) } + async fn state(&self, _vm: &VmHandle) -> miette::Result { + Ok(VmState::Prepared) + } } fn make_spec(label: &str) -> VmSpec { @@ -237,7 +273,12 @@ mod tests { } fn make_ctx() -> JobContext { - JobContext { request_id: uuid::Uuid::new_v4(), repo_url: "https://example.com/r.git".into(), commit_sha: "deadbeef".into(), workflow_job_id: None } + JobContext { + request_id: uuid::Uuid::new_v4(), + repo_url: "https://example.com/r.git".into(), + commit_sha: "deadbeef".into(), + workflow_job_id: None, + } } #[tokio::test(flavor = "multi_thread")] @@ -253,15 +294,25 @@ mod tests { let sched = Scheduler::new(hv, 2, &caps, persist, Duration::from_millis(10)); let tx = sched.sender(); - let run = tokio::spawn(async move { let _ = sched.run().await; }); + let run = tokio::spawn(async move { + let _ = sched.run().await; + }); for _ in 0..5 { - let _ = tx.send(SchedItem { spec: make_spec("x"), ctx: make_ctx() }).await; + let _ = tx + .send(SchedItem { + spec: make_spec("x"), + ctx: make_ctx(), + }) + .await; } drop(tx); // Allow time for tasks to execute under concurrency limits tokio::time::sleep(Duration::from_millis(500)).await; - assert!(hv_probe.peak_all.load(Ordering::SeqCst) <= 2, "peak should not exceed global limit"); + assert!( + hv_probe.peak_all.load(Ordering::SeqCst) <= 2, + "peak should not exceed global limit" + ); // Stop the scheduler task run.abort(); } @@ -280,19 +331,46 @@ mod tests { let sched = Scheduler::new(hv, 4, &caps, persist, Duration::from_millis(10)); let tx = sched.sender(); - let run = tokio::spawn(async move { let _ = sched.run().await; }); + let run = tokio::spawn(async move { + let _ = sched.run().await; + }); - for _ in 0..3 { let _ = tx.send(SchedItem { spec: make_spec("a"), ctx: make_ctx() }).await; } - for _ in 0..3 { let _ = tx.send(SchedItem { spec: make_spec("b"), ctx: make_ctx() }).await; } + for _ in 0..3 { + let _ = tx + .send(SchedItem { + spec: make_spec("a"), + ctx: make_ctx(), + }) + .await; + } + for _ in 0..3 { + let _ = tx + .send(SchedItem { + spec: make_spec("b"), + ctx: make_ctx(), + }) + .await; + } drop(tx); tokio::time::sleep(Duration::from_millis(800)).await; // read per-label peaks - let a_peak = hv_probe.per_peak.get("a").map(|p| p.load(Ordering::SeqCst)).unwrap_or(0); - let b_peak = hv_probe.per_peak.get("b").map(|p| p.load(Ordering::SeqCst)).unwrap_or(0); + let a_peak = hv_probe + .per_peak + .get("a") + .map(|p| p.load(Ordering::SeqCst)) + .unwrap_or(0); + let b_peak = hv_probe + .per_peak + .get("b") + .map(|p| p.load(Ordering::SeqCst)) + .unwrap_or(0); assert!(a_peak <= 1, "label a peak should be <= 1, got {}", a_peak); assert!(b_peak <= 2, "label b peak should be <= 2, got {}", b_peak); - assert!(hv_probe.peak_all.load(Ordering::SeqCst) <= 4, "global peak should respect global limit"); + assert!( + hv_probe.peak_all.load(Ordering::SeqCst) <= 4, + "global peak should respect global limit" + ); run.abort(); } } diff --git a/crates/workflow-runner/src/main.rs b/crates/workflow-runner/src/main.rs index 0eaf7fc..dd22a58 100644 --- a/crates/workflow-runner/src/main.rs +++ b/crates/workflow-runner/src/main.rs @@ -1,15 +1,23 @@ use clap::Parser; +use common::runner::v1::{JobEnd, LogChunk, LogItem, log_item::Event, runner_client::RunnerClient}; use miette::{IntoDiagnostic as _, Result}; use serde::Deserialize; -use tokio::{fs, process::Command, io::{AsyncBufReadExt, BufReader}}; use std::process::Stdio; +use tokio::sync::mpsc; +use tokio::{ + fs, + io::{AsyncBufReadExt, BufReader}, + process::Command, +}; use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; -use tokio::sync::mpsc; -use common::runner::v1::{runner_client::RunnerClient, log_item::Event, LogItem, LogChunk, JobEnd}; #[derive(Parser, Debug)] -#[command(name = "solstice-runner", version, about = "Solstice CI Workflow Runner (VM agent)")] +#[command( + name = "solstice-runner", + version, + about = "Solstice CI Workflow Runner (VM agent)" +)] struct Opts { /// Optional path to workflow KDL file (for local testing only) #[arg(long, env = "SOL_WORKFLOW_PATH")] @@ -23,7 +31,8 @@ struct JobFile { } async fn read_job_file() -> Result { - let path = std::env::var("SOLSTICE_JOB_FILE").unwrap_or_else(|_| "/etc/solstice/job.yaml".into()); + let path = + std::env::var("SOLSTICE_JOB_FILE").unwrap_or_else(|_| "/etc/solstice/job.yaml".into()); let bytes = fs::read(&path).await.into_diagnostic()?; let jf: JobFile = serde_yaml::from_slice(&bytes).into_diagnostic()?; Ok(jf) @@ -31,7 +40,12 @@ async fn read_job_file() -> Result { async fn run_shell(cmd: &str) -> Result { info!(%cmd, "exec"); - let status = Command::new("/bin/sh").arg("-lc").arg(cmd).status().await.into_diagnostic()?; + let status = Command::new("/bin/sh") + .arg("-lc") + .arg(cmd) + .status() + .await + .into_diagnostic()?; Ok(status.code().unwrap_or(1)) } @@ -40,15 +54,23 @@ async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { // Use system git to avoid libgit2 cross issues let cmds = vec![ format!("cd {workdir} && git init"), - format!("cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo}"), + format!( + "cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo}" + ), format!("cd {workdir} && git fetch --depth=1 origin {sha}"), format!("cd {workdir} && git checkout -q FETCH_HEAD"), ]; - for c in cmds { let _ = run_shell(&c).await?; } + for c in cmds { + let _ = run_shell(&c).await?; + } Ok(()) } -async fn run_job_script_streamed(workdir: &str, tx: Option>, request_id: &str) -> Result { +async fn run_job_script_streamed( + workdir: &str, + tx: Option>, + request_id: &str, +) -> Result { let script = format!("{}/.solstice/job.sh", workdir); if !fs::try_exists(&script).await.into_diagnostic()? { warn!(path = %script, "job script not found"); @@ -57,7 +79,8 @@ async fn run_job_script_streamed(workdir: &str, tx: Option let _ = run_shell(&format!("chmod +x {} || true", script)).await?; let mut cmd = Command::new("/bin/sh"); - cmd.arg("-lc").arg(format!("cd {workdir} && {}", script)) + cmd.arg("-lc") + .arg(format!("cd {workdir} && {}", script)) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = cmd.spawn().into_diagnostic()?; @@ -69,7 +92,15 @@ async fn run_job_script_streamed(workdir: &str, tx: Option let req = request_id.to_string(); tokio::spawn(async move { while let Ok(Some(line)) = lines.next_line().await { - let _ = tx2.send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line, stderr: false })) }).await; + let _ = tx2 + .send(LogItem { + request_id: req.clone(), + event: Some(Event::Log(LogChunk { + line, + stderr: false, + })), + }) + .await; } }); } @@ -79,7 +110,12 @@ async fn run_job_script_streamed(workdir: &str, tx: Option let req = request_id.to_string(); tokio::spawn(async move { while let Ok(Some(line)) = lines.next_line().await { - let _ = tx2.send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line, stderr: true })) }).await; + let _ = tx2 + .send(LogItem { + request_id: req.clone(), + event: Some(Event::Log(LogChunk { line, stderr: true })), + }) + .await; } }); } @@ -122,7 +158,7 @@ async fn main() -> Result<()> { let stream = ReceiverStream::new(rx); // Spawn client task tokio::spawn(async move { - match RunnerClient::connect(format!("http://{addr}" )).await { + match RunnerClient::connect(format!("http://{addr}")).await { Ok(mut client) => { let _ = client.stream_logs(stream).await; // ignore result } @@ -134,16 +170,39 @@ async fn main() -> Result<()> { tx_opt = Some(tx); // Send a first line if let Some(ref tx) = tx_opt { - let _ = tx.send(LogItem { request_id: req_id.clone(), event: Some(Event::Log(LogChunk { line: format!("runner starting: repo={repo} sha={sha}"), stderr: false })) }).await; + let _ = tx + .send(LogItem { + request_id: req_id.clone(), + event: Some(Event::Log(LogChunk { + line: format!("runner starting: repo={repo} sha={sha}"), + stderr: false, + })), + }) + .await; } } ensure_repo(&repo, &sha, &workdir).await?; - let code = run_job_script_streamed(&workdir, tx_opt.clone(), request_id.as_deref().unwrap_or("")).await?; + let code = run_job_script_streamed( + &workdir, + tx_opt.clone(), + request_id.as_deref().unwrap_or(""), + ) + .await?; // Send JobEnd if streaming enabled if let (Some(tx), Some(req_id)) = (tx_opt.clone(), request_id.clone()) { - let _ = tx.send(LogItem { request_id: req_id.clone(), event: Some(Event::End(JobEnd { exit_code: code, success: code == 0, repo_url: repo.clone(), commit_sha: sha.clone() })) }).await; + let _ = tx + .send(LogItem { + request_id: req_id.clone(), + event: Some(Event::End(JobEnd { + exit_code: code, + success: code == 0, + repo_url: repo.clone(), + commit_sha: sha.clone(), + })), + }) + .await; // Give the client task a brief moment to flush tokio::time::sleep(std::time::Duration::from_millis(50)).await; }