diff --git a/crates/ciadm/src/main.rs b/crates/ciadm/src/main.rs index 003cf1d..dcc8195 100644 --- a/crates/ciadm/src/main.rs +++ b/crates/ciadm/src/main.rs @@ -43,9 +43,7 @@ async fn main() -> Result<()> { } => { info!(%repo, %r#ref, %workflow, "trigger requested"); // TODO: Call orchestrator API to enqueue job - println!( - "Triggered job for {repo}@{ref} using {workflow}", - ); + println!("Triggered job for {repo}@{ref} using {workflow}",); } Commands::Status { job_id } => { info!(%job_id, "status requested"); diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 4acab58..b76edcb 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -1,7 +1,7 @@ +use kdl::{KdlDocument, KdlValue}; +use miette::{IntoDiagnostic as _, Result}; use std::collections::HashMap; use std::path::PathBuf; -use miette::{IntoDiagnostic as _, Result}; -use kdl::{KdlDocument, KdlValue}; /// Internal application configuration aggregated from env and KDL. #[derive(Clone, Debug)] @@ -19,10 +19,18 @@ impl AppConfig { pub fn load(service: &str) -> Result { let kdl_map = load_kdl_kv(service)?; - let grpc_addr = std::env::var("GRPC_ADDR").ok().or_else(|| kdl_map.get("GRPC_ADDR").cloned()); - let http_addr = std::env::var("HTTP_ADDR").ok().or_else(|| kdl_map.get("HTTP_ADDR").cloned()); - let database_url = std::env::var("DATABASE_URL").ok().or_else(|| kdl_map.get("DATABASE_URL").cloned()); - let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok().or_else(|| kdl_map.get("OTEL_EXPORTER_OTLP_ENDPOINT").cloned()); + let grpc_addr = std::env::var("GRPC_ADDR") + .ok() + .or_else(|| kdl_map.get("GRPC_ADDR").cloned()); + let http_addr = std::env::var("HTTP_ADDR") + .ok() + .or_else(|| kdl_map.get("HTTP_ADDR").cloned()); + let database_url = std::env::var("DATABASE_URL") + .ok() + .or_else(|| kdl_map.get("DATABASE_URL").cloned()); + let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") + .ok() + .or_else(|| kdl_map.get("OTEL_EXPORTER_OTLP_ENDPOINT").cloned()); // Build MQ config from env with KDL fallbacks, then defaults let url = std::env::var("AMQP_URL") @@ -66,9 +74,25 @@ impl AppConfig { .or_else(|| std::env::var("RESULTS_QUEUE").ok()) .unwrap_or_else(|| "solstice.results.v1".into()); - let mq = crate::mq::MqConfig { url, exchange, routing_key, queue, results_routing_key, results_queue, dlx, dlq, prefetch }; + let mq = crate::mq::MqConfig { + url, + exchange, + routing_key, + queue, + results_routing_key, + results_queue, + dlx, + dlq, + prefetch, + }; - Ok(Self { grpc_addr, http_addr, database_url, otlp_endpoint, mq }) + Ok(Self { + grpc_addr, + http_addr, + database_url, + otlp_endpoint, + mq, + }) } } @@ -78,7 +102,9 @@ fn load_kdl_kv(service: &str) -> Result> { let svc = PathBuf::from(format!("/etc/solstice/{}.kdl", service)); let mut map = HashMap::new(); for path in [global, svc] { - if !path.exists() { continue; } + if !path.exists() { + continue; + } let s = std::fs::read_to_string(&path).into_diagnostic()?; let doc: KdlDocument = s.parse().into_diagnostic()?; for node in doc.nodes() { diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 979f636..a9d3d5c 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,14 +1,16 @@ +pub mod config; pub mod job; pub mod messages; pub mod mq; pub mod telemetry; -pub mod config; -pub use job::{Job, Step, Workflow, parse_workflow_file, parse_workflow_str}; -pub use messages::{JobRequest, JobResult, SourceSystem, DeadLetter}; -pub use mq::{MqConfig, consume_jobs, consume_jobs_until, publish_job, publish_job_result, publish_deadletter}; -pub use telemetry::{TelemetryGuard, init_tracing}; pub use config::AppConfig; +pub use job::{Job, Step, Workflow, parse_workflow_file, parse_workflow_str}; +pub use messages::{DeadLetter, JobRequest, JobResult, SourceSystem}; +pub use mq::{ + MqConfig, consume_jobs, consume_jobs_until, publish_deadletter, publish_job, publish_job_result, +}; +pub use telemetry::{TelemetryGuard, init_tracing}; // Generated gRPC module for runner <-> orchestrator pub mod runner { diff --git a/crates/common/src/mq.rs b/crates/common/src/mq.rs index 211d2cf..911e6c8 100644 --- a/crates/common/src/mq.rs +++ b/crates/common/src/mq.rs @@ -14,7 +14,7 @@ use miette::{IntoDiagnostic as _, Result}; use tracing::Instrument; use tracing::{error, info, instrument, warn}; -use crate::messages::{JobRequest, JobResult, DeadLetter}; +use crate::messages::{DeadLetter, JobRequest, JobResult}; /// Pretty-print an AMQP message body for logs. /// - If valid UTF-8 JSON, pretty-format it. @@ -22,7 +22,12 @@ use crate::messages::{JobRequest, JobResult, DeadLetter}; /// - Otherwise, return a hex preview with ASCII sidecar. pub fn pretty_amqp_body(data: &[u8]) -> String { // Try JSON first when it looks like JSON - let looks_json = data.iter().skip_while(|b| b.is_ascii_whitespace()).next().map(|b| *b == b'{' || *b == b'[').unwrap_or(false); + let looks_json = data + .iter() + .skip_while(|b| b.is_ascii_whitespace()) + .next() + .map(|b| *b == b'{' || *b == b'[') + .unwrap_or(false); if looks_json { if let Ok(v) = serde_json::from_slice::(data) { if let Ok(s) = serde_json::to_string_pretty(&v) { @@ -42,7 +47,11 @@ pub fn pretty_amqp_body(data: &[u8]) -> String { let mut ascii = String::with_capacity(max); for &b in &data[..max] { hex.push_str(&format!("{:02x}", b)); - ascii.push(if b.is_ascii_graphic() || b == b' ' { b as char } else { '.' }); + ascii.push(if b.is_ascii_graphic() || b == b' ' { + b as char + } else { + '.' + }); } if data.len() > max { format!("<{} bytes> hex:{}… ascii:{}…", data.len(), hex, ascii) @@ -69,7 +78,8 @@ impl Default for MqConfig { Self { url: std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()), exchange: std::env::var("AMQP_EXCHANGE").unwrap_or_else(|_| "solstice.jobs".into()), - routing_key: std::env::var("AMQP_ROUTING_KEY").unwrap_or_else(|_| "jobrequest.v1".into()), + routing_key: std::env::var("AMQP_ROUTING_KEY") + .unwrap_or_else(|_| "jobrequest.v1".into()), queue: std::env::var("AMQP_QUEUE").unwrap_or_else(|_| "solstice.jobs.v1".into()), results_routing_key: std::env::var("AMQP_RESULTS_ROUTING_KEY") .or_else(|_| std::env::var("RESULTS_ROUTING_KEY")) @@ -79,7 +89,10 @@ impl Default for MqConfig { .unwrap_or_else(|_| "solstice.results.v1".into()), dlx: std::env::var("AMQP_DLX").unwrap_or_else(|_| "solstice.dlx".into()), dlq: std::env::var("AMQP_DLQ").unwrap_or_else(|_| "solstice.jobs.v1.dlq".into()), - prefetch: std::env::var("AMQP_PREFETCH").ok().and_then(|s| s.parse().ok()).unwrap_or(64), + prefetch: std::env::var("AMQP_PREFETCH") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(64), } } } @@ -424,8 +437,7 @@ pub async fn publish_job_result(cfg: &MqConfig, result: &JobResult) -> Result<() let channel = conn.create_channel().await.into_diagnostic()?; declare_topology(&channel, cfg).await?; - let props = BasicProperties::default() - .with_content_type(ShortString::from("application/json")); + let props = BasicProperties::default().with_content_type(ShortString::from("application/json")); let body = serde_json::to_vec(result).into_diagnostic()?; @@ -459,8 +471,7 @@ pub async fn publish_to_dlx_raw(cfg: &MqConfig, body: &[u8]) -> Result<()> { .await .into_diagnostic()?; - let props = BasicProperties::default() - .with_content_type(ShortString::from("application/json")); + let props = BasicProperties::default().with_content_type(ShortString::from("application/json")); let confirm = channel .basic_publish( diff --git a/crates/forge-integration/src/main.rs b/crates/forge-integration/src/main.rs index 1256b5c..dd63490 100644 --- a/crates/forge-integration/src/main.rs +++ b/crates/forge-integration/src/main.rs @@ -1,6 +1,6 @@ +use aws_sdk_s3::primitives::ByteStream; use std::net::SocketAddr; use std::sync::Arc; -use aws_sdk_s3::primitives::ByteStream; use axum::{ Router, @@ -11,12 +11,12 @@ use axum::{ routing::post, }; use clap::{Parser, Subcommand}; +use futures_util::StreamExt; use hmac::{Hmac, Mac}; -use miette::{Result, IntoDiagnostic}; +use miette::{IntoDiagnostic, Result}; use serde::Deserialize; use sha2::Sha256; use tracing::{error, info, warn}; -use futures_util::StreamExt; #[derive(Subcommand, Debug)] enum Cmd { @@ -228,8 +228,20 @@ async fn post_commit_status( description: Option<&str>, ) -> Result<()> { // Extract owner/repo from repo_url (supports https://.../owner/repo.git and ssh://git@host/owner/repo.git) - let (owner, repo) = parse_owner_repo(repo_url).ok_or_else(|| miette::miette!("cannot parse owner/repo from repo_url: {repo_url}"))?; - post_commit_status_owner(base, token, &owner, &repo, sha, context, state, target_url, description).await + let (owner, repo) = parse_owner_repo(repo_url) + .ok_or_else(|| miette::miette!("cannot parse owner/repo from repo_url: {repo_url}"))?; + post_commit_status_owner( + base, + token, + &owner, + &repo, + sha, + context, + state, + target_url, + description, + ) + .await } async fn post_commit_status_owner( @@ -243,15 +255,26 @@ async fn post_commit_status_owner( target_url: Option<&str>, description: Option<&str>, ) -> Result<()> { - let api = format!("{}/repos/{}/{}/statuses/{}", base.trim_end_matches('/'), owner, repo, sha); + let api = format!( + "{}/repos/{}/{}/statuses/{}", + base.trim_end_matches('/'), + owner, + repo, + sha + ); let mut body = serde_json::json!({ "state": state, "context": context, }); - if let Some(u) = target_url { body["target_url"] = serde_json::Value::String(u.to_string()); } - if let Some(d) = description { body["description"] = serde_json::Value::String(d.to_string()); } + if let Some(u) = target_url { + body["target_url"] = serde_json::Value::String(u.to_string()); + } + if let Some(d) = description { + body["description"] = serde_json::Value::String(d.to_string()); + } let client = reqwest::Client::new(); - let resp = client.post(&api) + let resp = client + .post(&api) .bearer_auth(token) .json(&body) .send() @@ -296,11 +319,18 @@ async fn consume_job_results(state: Arc) -> Result<()> { .into_diagnostic()?; // Declare results queue and bind to routing key jobresult.v1 - let results_queue = std::env::var("RESULTS_QUEUE").unwrap_or_else(|_| "solstice.results.v1".into()); + let results_queue = + std::env::var("RESULTS_QUEUE").unwrap_or_else(|_| "solstice.results.v1".into()); channel .queue_declare( &results_queue, - lapin::options::QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false }, + lapin::options::QueueDeclareOptions { + durable: true, + auto_delete: false, + exclusive: false, + nowait: false, + passive: false, + }, lapin::types::FieldTable::default(), ) .await @@ -325,7 +355,10 @@ async fn consume_job_results(state: Arc) -> Result<()> { .basic_consume( &results_queue, "forge-integration", - lapin::options::BasicConsumeOptions { no_ack: false, ..Default::default() }, + lapin::options::BasicConsumeOptions { + no_ack: false, + ..Default::default() + }, lapin::types::FieldTable::default(), ) .await @@ -337,7 +370,8 @@ async fn consume_job_results(state: Arc) -> Result<()> { match delivery { Ok(d) => { let tag = d.delivery_tag; - let res: Result = serde_json::from_slice(&d.data).into_diagnostic(); + let res: Result = + serde_json::from_slice(&d.data).into_diagnostic(); match res { Ok(jobres) => { if let Err(e) = handle_job_result(&state, &jobres).await { @@ -371,8 +405,16 @@ async fn handle_job_result(state: &AppState, jobres: &common::messages::JobResul // Fetch logs let mut log_text: Option = None; if let Some(base) = state.orch_http_base.as_ref() { - let url = format!("{}/jobs/{}/logs", base.trim_end_matches('/'), jobres.request_id); - let resp = reqwest::Client::new().get(&url).send().await.into_diagnostic()?; + let url = format!( + "{}/jobs/{}/logs", + base.trim_end_matches('/'), + jobres.request_id + ); + let resp = reqwest::Client::new() + .get(&url) + .send() + .await + .into_diagnostic()?; if resp.status().is_success() { let txt = resp.text().await.into_diagnostic()?; log_text = Some(txt); @@ -383,22 +425,45 @@ async fn handle_job_result(state: &AppState, jobres: &common::messages::JobResul // Upload to S3 if configured and we have logs let mut target_url: Option = None; - if let (Some(endpoint), Some(bucket), Some(text)) = (state.s3_endpoint.as_ref(), state.s3_bucket.as_ref(), log_text.as_ref()) { - if let Ok(url) = upload_to_s3(endpoint, bucket, &format!("logs/{}/{}.txt", jobres.repo_url.replace(':', "/").replace('/', "_"), jobres.request_id), text.as_bytes()).await { + if let (Some(endpoint), Some(bucket), Some(text)) = ( + state.s3_endpoint.as_ref(), + state.s3_bucket.as_ref(), + log_text.as_ref(), + ) { + if let Ok(url) = upload_to_s3( + endpoint, + bucket, + &format!( + "logs/{}/{}.txt", + jobres.repo_url.replace(':', "/").replace('/', "_"), + jobres.request_id + ), + text.as_bytes(), + ) + .await + { target_url = Some(url); } } // Fallback to orchestrator log URL if upload not done if target_url.is_none() { if let Some(base) = state.orch_http_base.as_ref() { - target_url = Some(format!("{}/jobs/{}/logs", base.trim_end_matches('/'), jobres.request_id)); + target_url = Some(format!( + "{}/jobs/{}/logs", + base.trim_end_matches('/'), + jobres.request_id + )); } } // Post final status to Forgejo let state_str = if jobres.success { "success" } else { "failure" }; if let (Some(base), Some(token)) = (state.forgejo_base.as_ref(), state.forgejo_token.as_ref()) { - let desc = if jobres.success { Some("Job succeeded") } else { Some("Job failed") }; + let desc = if jobres.success { + Some("Job succeeded") + } else { + Some("Job failed") + }; // Prefer explicit owner/repo from JobResult when available if let (Some(owner), Some(repo)) = (jobres.repo_owner.as_ref(), jobres.repo_name.as_ref()) { let _ = post_commit_status_owner( @@ -411,7 +476,8 @@ async fn handle_job_result(state: &AppState, jobres: &common::messages::JobResul state_str, target_url.as_deref(), desc, - ).await; + ) + .await; } else { let _ = post_commit_status( base, @@ -422,7 +488,8 @@ async fn handle_job_result(state: &AppState, jobres: &common::messages::JobResul state_str, target_url.as_deref(), desc, - ).await; + ) + .await; } } @@ -430,7 +497,9 @@ async fn handle_job_result(state: &AppState, jobres: &common::messages::JobResul } async fn upload_to_s3(endpoint: &str, bucket: &str, key: &str, bytes: &[u8]) -> Result { - let loader = aws_config::defaults(aws_config::BehaviorVersion::latest()).load().await; + let loader = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .load() + .await; // Override endpoint and enforce path-style let conf = aws_sdk_s3::config::Builder::from(&loader) .endpoint_url(endpoint) @@ -454,19 +523,28 @@ async fn upload_to_s3(endpoint: &str, bucket: &str, key: &str, bytes: &[u8]) -> fn parse_owner_repo(repo_url: &str) -> Option<(String, String)> { // Strip .git let url = repo_url.trim_end_matches(".git"); - if let Some(rest) = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://")) { + if let Some(rest) = url + .strip_prefix("https://") + .or_else(|| url.strip_prefix("http://")) + { let parts: Vec<&str> = rest.split('/').collect(); - if parts.len() >= 3 { return Some((parts[1].to_string(), parts[2].to_string())); } + if parts.len() >= 3 { + return Some((parts[1].to_string(), parts[2].to_string())); + } } else if let Some(rest) = url.strip_prefix("ssh://") { // ssh://git@host/owner/repo let after_host = rest.splitn(2, '/').nth(1)?; let parts: Vec<&str> = after_host.split('/').collect(); - if parts.len() >= 2 { return Some((parts[0].to_string(), parts[1].to_string())); } + if parts.len() >= 2 { + return Some((parts[0].to_string(), parts[1].to_string())); + } } else if let Some(idx) = url.find(':') { // git@host:owner/repo - let after = &url[idx+1..]; + let after = &url[idx + 1..]; let parts: Vec<&str> = after.split('/').collect(); - if parts.len() >= 2 { return Some((parts[0].to_string(), parts[1].to_string())); } + if parts.len() >= 2 { + return Some((parts[0].to_string(), parts[1].to_string())); + } } None } @@ -475,7 +553,9 @@ fn parse_runs_on_map(s: &str) -> std::collections::HashMap { let mut map = std::collections::HashMap::new(); for part in s.split(',') { let p = part.trim(); - if p.is_empty() { continue; } + if p.is_empty() { + continue; + } if let Some((k, v)) = p.split_once('=') { let key = k.trim().to_string(); let val = v.trim().to_string(); @@ -503,13 +583,19 @@ fn infer_runs_on(state: &AppState, repo_url: &str, labels: Option<&[String]>) -> // patterns: "runs-on: label", "runs-on=label", or "runs-on-label" if let Some(rest) = lower.strip_prefix("runs-on:") { let label = rest.trim(); - if !label.is_empty() { return Some(label.to_string()); } + if !label.is_empty() { + return Some(label.to_string()); + } } else if let Some(rest) = lower.strip_prefix("runs-on=") { let label = rest.trim(); - if !label.is_empty() { return Some(label.to_string()); } + if !label.is_empty() { + return Some(label.to_string()); + } } else if let Some(rest) = lower.strip_prefix("runs-on-") { let label = rest.trim(); - if !label.is_empty() { return Some(label.to_string()); } + if !label.is_empty() { + return Some(label.to_string()); + } } } } @@ -604,7 +690,7 @@ async fn handle_push(state: Arc, body: Bytes) -> StatusCode { let repo_url = pick_repo_url(&payload.repository); let sha = payload.after; - match enqueue_jobs(&state, repo_url.clone(), sha.clone(), None).await { + match enqueue_jobs(&state, repo_url.clone(), sha.clone(), None).await { Ok(jobs) => { if let (Some(base), Some(token), Some(orch)) = ( state.forgejo_base.as_ref(), @@ -619,7 +705,11 @@ async fn handle_push(state: Arc, body: Bytes) -> StatusCode { &first.commit_sha, &state.forge_context, "pending", - Some(&format!("{}/jobs/{}/logs", orch.trim_end_matches('/'), first.request_id)), + Some(&format!( + "{}/jobs/{}/logs", + orch.trim_end_matches('/'), + first.request_id + )), Some("Solstice jobs queued"), ) .await; @@ -649,7 +739,9 @@ struct PrHead { } #[derive(Debug, Deserialize)] -struct Label { name: String } +struct Label { + name: String, +} #[derive(Debug, Deserialize)] struct PullRequest { @@ -714,22 +806,42 @@ fn pick_repo_url_pr(repo: &PrRepoInfo) -> String { .to_string() } -struct ParsedJob { id: String, runs_on: Option, script: Option } +struct ParsedJob { + id: String, + runs_on: Option, + script: Option, +} -async fn fetch_workflow_kdl(base: Option<&str>, token: Option<&str>, owner: &str, repo: &str, sha: &str) -> Result> { +async fn fetch_workflow_kdl( + base: Option<&str>, + token: Option<&str>, + owner: &str, + repo: &str, + sha: &str, +) -> Result> { // Try Forgejo API: GET /repos/{owner}/{repo}/contents/.solstice/workflow.kdl?ref={sha} if let Some(base) = base { - let url = format!("{}/repos/{}/{}/contents/.solstice/workflow.kdl?ref={}", base.trim_end_matches('/'), owner, repo, sha); + let url = format!( + "{}/repos/{}/{}/contents/.solstice/workflow.kdl?ref={}", + base.trim_end_matches('/'), + owner, + repo, + sha + ); let client = reqwest::Client::new(); let mut req = client.get(&url); - if let Some(tok) = token { req = req.bearer_auth(tok); } + if let Some(tok) = token { + req = req.bearer_auth(tok); + } let resp = req.send().await.into_diagnostic()?; if resp.status().is_success() { let v: serde_json::Value = resp.json().await.into_diagnostic()?; if let Some(enc) = v.get("encoding").and_then(|e| e.as_str()) { if enc.eq_ignore_ascii_case("base64") { if let Some(content) = v.get("content").and_then(|c| c.as_str()) { - let decoded = base64::engine::general_purpose::STANDARD.decode(content.replace('\n', "")).into_diagnostic()?; + let decoded = base64::engine::general_purpose::STANDARD + .decode(content.replace('\n', "")) + .into_diagnostic()?; let s = String::from_utf8(decoded).into_diagnostic()?; return Ok(Some(s)); } @@ -756,15 +868,24 @@ fn parse_workflow_jobs(kdl: &str) -> Vec { let mut depth = if l.ends_with('{') { 1 } else { 0 }; while let Some((_j, ln)) = lines.peek().cloned() { let t = ln.trim(); - if t.ends_with('{') { depth += 1; } + if t.ends_with('{') { + depth += 1; + } if t.starts_with('}') { - if depth == 0 { break; } + if depth == 0 { + break; + } depth -= 1; - if depth == 0 { lines.next(); break; } + if depth == 0 { + lines.next(); + break; + } } // within job block: look for step or script lines; allow `script path="..."` or `step name="..." run="..."` if t.starts_with("script ") && t.contains("path=") { - if let Some(p) = capture_attr(t, "path") { script = Some(p); } + if let Some(p) = capture_attr(t, "path") { + script = Some(p); + } } // Also allow runs_on within block as override if t.contains("runs_on=") && runs_on.is_none() { @@ -773,7 +894,11 @@ fn parse_workflow_jobs(kdl: &str) -> Vec { lines.next(); } if let Some(id_val) = id { - out.push(ParsedJob { id: id_val, runs_on, script }); + out.push(ParsedJob { + id: id_val, + runs_on, + script, + }); } } } @@ -785,17 +910,26 @@ fn capture_attr(line: &str, key: &str) -> Option { let pattern1 = format!("{}=\"", key); if let Some(start) = line.find(&pattern1) { let rest = &line[start + pattern1.len()..]; - if let Some(end) = rest.find('"') { return Some(rest[..end].to_string()); } + if let Some(end) = rest.find('"') { + return Some(rest[..end].to_string()); + } } let pattern2 = format!("{}='", key); if let Some(start) = line.find(&pattern2) { let rest = &line[start + pattern2.len()..]; - if let Some(end) = rest.find('\'') { return Some(rest[..end].to_string()); } + if let Some(end) = rest.find('\'') { + return Some(rest[..end].to_string()); + } } None } -async fn enqueue_jobs(state: &Arc, repo_url: String, commit_sha: String, labels: Option>) -> Result> { +async fn enqueue_jobs( + state: &Arc, + repo_url: String, + commit_sha: String, + labels: Option>, +) -> Result> { use uuid::Uuid; if repo_url.is_empty() { miette::bail!("missing repo_url in webhook payload"); @@ -812,7 +946,15 @@ async fn enqueue_jobs(state: &Arc, repo_url: String, commit_sha: Strin // Attempt to fetch and parse .solstice/workflow.kdl at the commit let mut published: Vec = Vec::new(); if let (Some(owner), Some(repo)) = (base.repo_owner.clone(), base.repo_name.clone()) { - if let Ok(Some(kdl)) = fetch_workflow_kdl(state.forgejo_base.as_deref(), state.forgejo_token.as_deref(), &owner, &repo, &base.commit_sha).await { + if let Ok(Some(kdl)) = fetch_workflow_kdl( + state.forgejo_base.as_deref(), + state.forgejo_token.as_deref(), + &owner, + &repo, + &base.commit_sha, + ) + .await + { let jobs = parse_workflow_jobs(&kdl); if !jobs.is_empty() { let gid = Uuid::new_v4(); @@ -823,10 +965,9 @@ async fn enqueue_jobs(state: &Arc, repo_url: String, commit_sha: Strin jr.workflow_path = Some(".solstice/workflow.kdl".to_string()); jr.workflow_job_id = Some(pj.id); // runs_on precedence: job-specific -> inferred (labels/map/default) - jr.runs_on = pj - .runs_on - .clone() - .or_else(|| infer_runs_on(state, &jr.repo_url, labels.as_ref().map(|v| v.as_slice()))); + jr.runs_on = pj.runs_on.clone().or_else(|| { + infer_runs_on(state, &jr.repo_url, labels.as_ref().map(|v| v.as_slice())) + }); jr.script_path = pj.script.clone(); common::publish_job(&state.mq_cfg, &jr).await?; diff --git a/crates/github-integration/src/main.rs b/crates/github-integration/src/main.rs index 7749b23..4f1e0f3 100644 --- a/crates/github-integration/src/main.rs +++ b/crates/github-integration/src/main.rs @@ -1,8 +1,8 @@ -use std::net::SocketAddr; +use axum::{Router, http::StatusCode, response::IntoResponse, routing::post}; use clap::Parser; use miette::Result; +use std::net::SocketAddr; use tracing::{info, warn}; -use axum::{Router, routing::post, response::IntoResponse, http::StatusCode}; #[derive(Parser, Debug)] #[command( @@ -47,7 +47,9 @@ async fn main() -> Result<()> { let app = Router::new().route(path, post(handle_github_webhook)); let addr: SocketAddr = opts.http_addr.parse().expect("invalid HTTP_ADDR"); - warn!("github-integration webhook endpoint is active but handler is minimal; implement GitHub App flow"); + warn!( + "github-integration webhook endpoint is active but handler is minimal; implement GitHub App flow" + ); axum::serve( tokio::net::TcpListener::bind(addr).await.expect("bind"), diff --git a/crates/logs-service/src/main.rs b/crates/logs-service/src/main.rs index a5dd33e..a356235 100644 --- a/crates/logs-service/src/main.rs +++ b/crates/logs-service/src/main.rs @@ -1,12 +1,21 @@ -use axum::{extract::Path, http::StatusCode, response::{IntoResponse, Response}, routing::get, Json, Router}; +use axum::{ + Json, Router, + extract::Path, + http::StatusCode, + response::{IntoResponse, Response}, + routing::get, +}; use clap::Parser; use miette::{IntoDiagnostic as _, Result}; -use sea_orm::{entity::prelude::*, Database, DatabaseConnection, QueryOrder, ColumnTrait, QueryFilter, QuerySelect}; use sea_orm::sea_query::Expr; +use sea_orm::{ + ColumnTrait, Database, DatabaseConnection, QueryFilter, QueryOrder, QuerySelect, + entity::prelude::*, +}; use sea_orm_migration::MigratorTrait; use serde::Serialize; -use std::net::SocketAddr; use std::collections::BTreeMap; +use std::net::SocketAddr; use tracing::{info, warn}; use uuid::Uuid; @@ -50,19 +59,26 @@ struct Opts { } #[derive(Clone)] -struct AppState { db: DatabaseConnection } +struct AppState { + db: DatabaseConnection, +} #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let _t = common::init_tracing("solstice-logs-service")?; let opts = Opts::parse(); - let db = Database::connect(opts.database_url).await.into_diagnostic()?; + let db = Database::connect(opts.database_url) + .await + .into_diagnostic()?; migration::Migrator::up(&db, None).await.into_diagnostic()?; let state = AppState { db }; let router = Router::new() .route("/jobs/{request_id}/logs", get(list_logs)) - .route("/jobs/{request_id}/logs/{category}", get(get_logs_by_category)) + .route( + "/jobs/{request_id}/logs/{category}", + get(get_logs_by_category), + ) .route("/jobs", get(list_jobs_grouped)) .with_state(state); @@ -126,8 +142,13 @@ struct LogCategorySummary { last_ts: chrono::DateTime, } -async fn list_logs(Path(request_id): Path, axum::extract::State(state): axum::extract::State) -> Response { - let Ok(id) = Uuid::parse_str(&request_id) else { return StatusCode::BAD_REQUEST.into_response(); }; +async fn list_logs( + Path(request_id): Path, + axum::extract::State(state): axum::extract::State, +) -> Response { + let Ok(id) = Uuid::parse_str(&request_id) else { + return StatusCode::BAD_REQUEST.into_response(); + }; let query = job_logs::Entity::find() .select_only() @@ -140,11 +161,14 @@ async fn list_logs(Path(request_id): Path, axum::extract::State(state): .order_by_asc(job_logs::Column::Category); // Aggregate basic stats per category - let tuples: miette::Result, chrono::DateTime)>> = query - .into_tuple() - .all(&state.db) - .await - .into_diagnostic(); + let tuples: miette::Result< + Vec<( + String, + i64, + chrono::DateTime, + chrono::DateTime, + )>, + > = query.into_tuple().all(&state.db).await.into_diagnostic(); // Separately fetch categories that have any error (portable across backends) let errs_res: miette::Result> = job_logs::Entity::find() @@ -173,12 +197,20 @@ async fn list_logs(Path(request_id): Path, axum::extract::State(state): .collect(); Json::>(out).into_response() } - (Err(e), _) | (_, Err(e)) => { warn!(error = %e, request_id = %id, "failed to query log categories"); StatusCode::INTERNAL_SERVER_ERROR.into_response() } + (Err(e), _) | (_, Err(e)) => { + warn!(error = %e, request_id = %id, "failed to query log categories"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } } } -async fn get_logs_by_category(Path((request_id, category)): Path<(String, String)>, axum::extract::State(state): axum::extract::State) -> Response { - let Ok(id) = Uuid::parse_str(&request_id) else { return StatusCode::BAD_REQUEST.into_response(); }; +async fn get_logs_by_category( + Path((request_id, category)): Path<(String, String)>, + axum::extract::State(state): axum::extract::State, +) -> Response { + let Ok(id) = Uuid::parse_str(&request_id) else { + return StatusCode::BAD_REQUEST.into_response(); + }; let rows = job_logs::Entity::find() .filter(job_logs::Column::RequestId.eq(id)) .filter(job_logs::Column::Category.eq(category.clone())) @@ -195,20 +227,30 @@ async fn get_logs_by_category(Path((request_id, category)): Path<(String, String text.push_str("[stderr] "); } text.push_str(&r.line); - if !text.ends_with('\n') { text.push('\n'); } + if !text.ends_with('\n') { + text.push('\n'); + } } ( StatusCode::OK, - [(axum::http::header::CONTENT_TYPE, "text/plain; charset=utf-8")], + [( + axum::http::header::CONTENT_TYPE, + "text/plain; charset=utf-8", + )], text, - ).into_response() + ) + .into_response() + } + Err(e) => { + warn!(error = %e, request_id = %id, "failed to read logs"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() } - Err(e) => { warn!(error = %e, request_id = %id, "failed to read logs"); StatusCode::INTERNAL_SERVER_ERROR.into_response() } } } - -async fn list_jobs_grouped(axum::extract::State(state): axum::extract::State) -> Response { +async fn list_jobs_grouped( + axum::extract::State(state): axum::extract::State, +) -> Response { // Fetch all jobs ordered by most recently updated first let rows_res: miette::Result> = jobs::Entity::find() .order_by_desc(jobs::Column::UpdatedAt) @@ -231,7 +273,10 @@ async fn list_jobs_grouped(axum::extract::State(state): axum::extract::State = items .into_iter() .map(|j| JobSummary { @@ -239,7 +284,9 @@ async fn list_jobs_grouped(axum::extract::State(state): axum::extract::State Result<(), DbErr> { // Drop the composite index manager - .drop_index(Index::drop().name("idx_job_logs_req_cat_seq").table(JobLogs::Table).to_owned()) + .drop_index( + Index::drop() + .name("idx_job_logs_req_cat_seq") + .table(JobLogs::Table) + .to_owned(), + ) .await?; // Drop added columns diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 56574e2..a2f1d1f 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -127,22 +127,29 @@ pub async fn ensure_images(cfg: &OrchestratorConfig) -> Result<()> { match image.decompress.unwrap_or(Decompress::None) { Decompress::None => { // Copy to temporary then atomically move into place - tokio::fs::copy(&src_path, &tmp_path).await.into_diagnostic()?; - tokio::fs::rename(&tmp_path, &image.local_path).await.into_diagnostic()?; + tokio::fs::copy(&src_path, &tmp_path) + .await + .into_diagnostic()?; + tokio::fs::rename(&tmp_path, &image.local_path) + .await + .into_diagnostic()?; } Decompress::Zstd => { let src = src_path.clone(); let tmp_out = tmp_path.clone(); task::spawn_blocking(move || -> miette::Result<()> { let infile = fs::File::open(&src).into_diagnostic()?; - let mut decoder = zstd::stream::read::Decoder::new(infile).into_diagnostic()?; + let mut decoder = + zstd::stream::read::Decoder::new(infile).into_diagnostic()?; let mut outfile = fs::File::create(&tmp_out).into_diagnostic()?; std::io::copy(&mut decoder, &mut outfile).into_diagnostic()?; Ok(()) }) .await .into_diagnostic()??; - tokio::fs::rename(&tmp_path, &image.local_path).await.into_diagnostic()?; + tokio::fs::rename(&tmp_path, &image.local_path) + .await + .into_diagnostic()?; } } } else { @@ -158,19 +165,24 @@ pub async fn ensure_images(cfg: &OrchestratorConfig) -> Result<()> { ); } let bytes = resp.bytes().await.into_diagnostic()?; - tokio::fs::write(&tmp_path, &bytes).await.into_diagnostic()?; + tokio::fs::write(&tmp_path, &bytes) + .await + .into_diagnostic()?; // Decompress or move into place match image.decompress.unwrap_or(Decompress::None) { Decompress::None => { - tokio::fs::rename(&tmp_path, &image.local_path).await.into_diagnostic()?; + tokio::fs::rename(&tmp_path, &image.local_path) + .await + .into_diagnostic()?; } Decompress::Zstd => { let src = tmp_path.clone(); let dst = image.local_path.clone(); task::spawn_blocking(move || -> miette::Result<()> { let infile = fs::File::open(&src).into_diagnostic()?; - let mut decoder = zstd::stream::read::Decoder::new(infile).into_diagnostic()?; + let mut decoder = + zstd::stream::read::Decoder::new(infile).into_diagnostic()?; let mut outfile = fs::File::create(&dst).into_diagnostic()?; std::io::copy(&mut decoder, &mut outfile).into_diagnostic()?; // remove compressed temp diff --git a/crates/orchestrator/src/error.rs b/crates/orchestrator/src/error.rs index 6c1c2b7..b733ad5 100644 --- a/crates/orchestrator/src/error.rs +++ b/crates/orchestrator/src/error.rs @@ -36,7 +36,11 @@ pub enum OrchestratorError { SshHandshake(#[source] ssh2::Error), #[error("ssh auth failed for {user}: {source}")] - SshAuth { user: String, #[source] source: ssh2::Error }, + SshAuth { + user: String, + #[source] + source: ssh2::Error, + }, #[error("ssh not authenticated")] SshNotAuthenticated, @@ -80,5 +84,7 @@ pub enum OrchestratorError { // Helper conversions for common external error types into anyhow::Error where needed. impl From for OrchestratorError { - fn from(e: virt::error::Error) -> Self { OrchestratorError::LibvirtDomain(anyhow::Error::new(e)) } + fn from(e: virt::error::Error) -> Self { + OrchestratorError::LibvirtDomain(anyhow::Error::new(e)) + } } diff --git a/crates/orchestrator/src/http.rs b/crates/orchestrator/src/http.rs index c5581ca..fabfed1 100644 --- a/crates/orchestrator/src/http.rs +++ b/crates/orchestrator/src/http.rs @@ -1,4 +1,10 @@ -use axum::{extract::Path, http::StatusCode, response::{IntoResponse, Response}, routing::get, Router}; +use axum::{ + Router, + extract::Path, + http::StatusCode, + response::{IntoResponse, Response}, + routing::get, +}; use std::net::SocketAddr; use std::sync::Arc; use tracing::{info, warn}; @@ -24,7 +30,11 @@ async fn get_logs_moved( ) -> Response { let base = std::env::var("LOGS_BASE_URL").ok(); let msg = if let Some(b) = base.as_ref() { - format!("Logs have moved: {}/jobs/{}/logs", b.trim_end_matches('/'), request_id) + format!( + "Logs have moved: {}/jobs/{}/logs", + b.trim_end_matches('/'), + request_id + ) } else { "Logs endpoint moved to logs-service; set LOGS_BASE_URL to enable 302 redirects".to_string() }; @@ -34,15 +44,22 @@ async fn get_logs_moved( StatusCode::MOVED_PERMANENTLY, [(axum::http::header::LOCATION, loc.as_str())], msg, - ).into_response(); + ) + .into_response(); } (StatusCode::GONE, msg).into_response() } -pub async fn serve(addr: SocketAddr, persist: Arc, shutdown: impl std::future::Future) { +pub async fn serve( + addr: SocketAddr, + persist: Arc, + shutdown: impl std::future::Future, +) { let app = build_router(persist); info!(%addr, "http server starting"); - let listener = tokio::net::TcpListener::bind(addr).await.expect("bind http"); + let listener = tokio::net::TcpListener::bind(addr) + .await + .expect("bind http"); let server = axum::serve(listener, app); let _ = tokio::select! { _ = server => {}, diff --git a/crates/orchestrator/src/hypervisor.rs b/crates/orchestrator/src/hypervisor.rs index d503d9a..fe4ffb7 100644 --- a/crates/orchestrator/src/hypervisor.rs +++ b/crates/orchestrator/src/hypervisor.rs @@ -451,7 +451,10 @@ ssh_authorized_keys: // Serial console log file path (pre-create with permissive perms for libvirt) let console_log = work_dir.join("console.log"); - let _ = std::fs::OpenOptions::new().create(true).append(true).open(&console_log); + let _ = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&console_log); let _ = std::fs::set_permissions(&console_log, std::fs::Permissions::from_mode(0o666)); let console_log_str = console_log.display().to_string(); info!(domain = %id, console = %console_log_str, "serial console will be logged to file"); @@ -550,7 +553,8 @@ ssh_authorized_keys: .map_err(|e| miette::miette!("libvirt connect failed: {e}"))?; let dom = Domain::lookup_by_name(&conn, &id) .map_err(|e| miette::miette!("lookup domain failed: {e}"))?; - dom.suspend().map_err(|e| miette::miette!("domain suspend failed: {e}"))?; + dom.suspend() + .map_err(|e| miette::miette!("domain suspend failed: {e}"))?; Ok(()) }) .await diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 42c67ed..6c6ccda 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -1,23 +1,23 @@ mod config; +mod error; +mod http; mod hypervisor; mod persist; mod scheduler; -mod http; -mod error; -use std::{collections::HashMap, path::PathBuf, time::Duration}; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; +use std::{collections::HashMap, path::PathBuf, time::Duration}; use clap::Parser; use miette::{IntoDiagnostic as _, Result}; -use tracing::{info, warn, debug}; +use tracing::{debug, info, warn}; -use crate::persist::{JobState, Persist}; use crate::error::OrchestratorError; +use crate::persist::{JobState, Persist}; use config::OrchestratorConfig; use hypervisor::{JobContext, RouterHypervisor, VmSpec}; -use scheduler::{SchedItem, Scheduler, ExecConfig}; +use scheduler::{ExecConfig, SchedItem, Scheduler}; use std::sync::Arc; use tokio::sync::Notify; @@ -37,7 +37,11 @@ struct Opts { max_concurrency: usize, /// Skip persistence initialization (faster startup; disables DB writes) - #[arg(long = "skip-persistence", env = "ORCH_SKIP_PERSIST", default_value_t = false)] + #[arg( + long = "skip-persistence", + env = "ORCH_SKIP_PERSIST", + default_value_t = false + )] skip_persistence: bool, /// Per-label capacity map (e.g., illumos-latest=2,ubuntu-22.04=4) @@ -45,11 +49,7 @@ struct Opts { capacity_map: Option, /// Postgres connection string (if empty, persistence is disabled) - #[arg( - long, - env = "DATABASE_URL", - default_value = "" - )] + #[arg(long, env = "DATABASE_URL", default_value = "")] database_url: String, /// RabbitMQ URL (AMQP) @@ -88,17 +88,28 @@ struct Opts { #[arg(long, env = "HTTP_ADDR", default_value = "0.0.0.0:8081")] http_addr: String, - /// Local path to Linux runner binary to upload - #[arg(long, env = "RUNNER_LINUX_PATH", default_value = "./target/release/solstice-runner-linux")] + #[arg( + long, + env = "RUNNER_LINUX_PATH", + default_value = "./target/release/solstice-runner-linux" + )] runner_linux_path: String, /// Local path to illumos runner binary to upload - #[arg(long, env = "RUNNER_ILLUMOS_PATH", default_value = "./target/release/solstice-runner-illumos")] + #[arg( + long, + env = "RUNNER_ILLUMOS_PATH", + default_value = "./target/release/solstice-runner-illumos" + )] runner_illumos_path: String, /// Remote path where runner will be uploaded and executed - #[arg(long, env = "REMOTE_RUNNER_PATH", default_value = "/usr/local/bin/solstice-runner")] + #[arg( + long, + env = "REMOTE_RUNNER_PATH", + default_value = "/usr/local/bin/solstice-runner" + )] remote_runner_path: String, /// SSH connect timeout (seconds) @@ -112,7 +123,6 @@ struct Opts { #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { - // Load internal config (preloads KDL -> env, then reads env) let app_cfg = common::AppConfig::load("orchestrator")?; let _t = common::init_tracing("solstice-orchestrator")?; @@ -384,7 +394,8 @@ async fn main() -> Result<()> { let http_task = tokio::spawn(async move { http::serve(http_addr, persist_for_http, async move { let _ = http_shutdown_rx.await; - }).await; + }) + .await; }); // Wait for ctrl-c @@ -429,7 +440,6 @@ fn parse_capacity_map(s: Option<&str>) -> HashMap { m } - fn make_cloud_init_userdata( repo_url: &str, commit_sha: &str, @@ -440,9 +450,15 @@ fn make_cloud_init_userdata( group_id: Option, ) -> Vec { let mut extra = String::new(); - if let Some(j) = workflow_job_id { extra.push_str(&format!(" workflow_job_id: {}\n", j)); } - if let Some(s) = script_path { extra.push_str(&format!(" script_path: {}\n", s)); } - if let Some(g) = group_id { extra.push_str(&format!(" group_id: {}\n", g)); } + if let Some(j) = workflow_job_id { + extra.push_str(&format!(" workflow_job_id: {}\n", j)); + } + if let Some(s) = script_path { + extra.push_str(&format!(" script_path: {}\n", s)); + } + if let Some(g) = group_id { + extra.push_str(&format!(" group_id: {}\n", g)); + } let s = format!( r#"#cloud-config users: @@ -487,7 +503,6 @@ mod tests { assert!(m.get("other").is_none()); } - #[test] fn test_make_cloud_init_userdata_includes_fields() { let req_id = uuid::Uuid::new_v4(); diff --git a/crates/orchestrator/src/persist.rs b/crates/orchestrator/src/persist.rs index a304870..fa99afb 100644 --- a/crates/orchestrator/src/persist.rs +++ b/crates/orchestrator/src/persist.rs @@ -1,12 +1,9 @@ +use crate::error::OrchestratorError; use chrono::Utc; use miette::{IntoDiagnostic as _, Result}; -use crate::error::OrchestratorError; -use sea_orm::sea_query::{Expr, OnConflict}; -use sea_orm::{ - entity::prelude::*, ColumnTrait, Database, DatabaseConnection, QueryFilter, - Set, -}; use sea_orm::QueryOrder; +use sea_orm::sea_query::{Expr, OnConflict}; +use sea_orm::{ColumnTrait, Database, DatabaseConnection, QueryFilter, Set, entity::prelude::*}; use sea_orm_migration::MigratorTrait; use tracing::{debug, info, warn}; use uuid::Uuid; @@ -155,8 +152,15 @@ mod job_ssh_keys { impl Persist { /// Save per-job SSH keys (public + private OpenSSH format). No-op if persistence disabled. - pub async fn save_job_ssh_keys(&self, request_id: Uuid, public_key: &str, private_key: &str) -> Result<()> { - let Some(db) = self.db.as_ref() else { return Ok(()); }; + pub async fn save_job_ssh_keys( + &self, + request_id: Uuid, + public_key: &str, + private_key: &str, + ) -> Result<()> { + let Some(db) = self.db.as_ref() else { + return Ok(()); + }; let now = Utc::now(); let am = job_ssh_keys::ActiveModel { request_id: Set(request_id), @@ -182,8 +186,13 @@ impl Persist { /// Load per-job SSH keys; returns None if absent or persistence disabled. pub async fn get_job_ssh_keys(&self, request_id: Uuid) -> Result> { - let Some(db) = self.db.as_ref() else { return Ok(None); }; - let row = job_ssh_keys::Entity::find_by_id(request_id).one(db).await.into_diagnostic()?; + let Some(db) = self.db.as_ref() else { + return Ok(None); + }; + let row = job_ssh_keys::Entity::find_by_id(request_id) + .one(db) + .await + .into_diagnostic()?; Ok(row.map(|r| (r.public_key, r.private_key))) } /// Initialize persistence. @@ -198,7 +207,8 @@ impl Persist { opts.max_connections(1) .min_connections(1) .sqlx_logging(false); - let db = Database::connect(opts).await + let db = Database::connect(opts) + .await .map_err(|e| OrchestratorError::DbConnect(e.into())) .into_diagnostic()?; migration::Migrator::up(&db, None) @@ -266,17 +276,29 @@ impl Persist { // Keep original JSON for reference fields_json = Some(line.to_string()); if let Some(c) = val.get("category").and_then(|v| v.as_str()) { - if !c.is_empty() { category = c.to_string(); } + if !c.is_empty() { + category = c.to_string(); + } } if let Some(l) = val.get("level").and_then(|v| v.as_str()) { level_str = Some(l.to_string()); } // Prefer common keys for message - if let Some(m) = val.get("msg").or_else(|| val.get("message")).and_then(|v| v.as_str()) { + if let Some(m) = val + .get("msg") + .or_else(|| val.get("message")) + .and_then(|v| v.as_str()) + { msg_line = m.to_string(); } } - let level = level_str.or_else(|| if stderr { Some("error".to_string()) } else { Some("info".to_string()) }); + let level = level_str.or_else(|| { + if stderr { + Some("error".to_string()) + } else { + Some("info".to_string()) + } + }); let has_error = stderr || level.as_deref() == Some("error"); let am = job_logs::ActiveModel { request_id: Set(request_id), @@ -289,7 +311,10 @@ impl Persist { fields: Set(fields_json), has_error: Set(has_error), }; - job_logs::Entity::insert(am).exec(db).await.into_diagnostic()?; + job_logs::Entity::insert(am) + .exec(db) + .await + .into_diagnostic()?; Ok(()) } diff --git a/crates/orchestrator/src/scheduler.rs b/crates/orchestrator/src/scheduler.rs index 9c28f7c..8c5ca46 100644 --- a/crates/orchestrator/src/scheduler.rs +++ b/crates/orchestrator/src/scheduler.rs @@ -1,17 +1,17 @@ -use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; +use crate::error::OrchestratorError; +use common::{DeadLetter, JobRequest, messages::JobResult, publish_deadletter, publish_job_result}; use dashmap::DashMap; use miette::{IntoDiagnostic as _, Result}; -use crate::error::OrchestratorError; +use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use tokio::sync::{Notify, Semaphore, mpsc}; use tracing::{error, info, warn}; -use common::{publish_deadletter, publish_job_result, DeadLetter, JobRequest, messages::JobResult}; use crate::hypervisor::{BackendTag, Hypervisor, JobContext, VmSpec}; use crate::persist::{JobState, Persist, VmPersistState}; +use tokio::fs::File; use tokio::io::{AsyncBufReadExt, BufReader}; use uuid::Uuid; -use tokio::fs::File; pub struct Scheduler { mq_cfg: Arc, @@ -51,7 +51,8 @@ fn strip_ansi(input: &str) -> String { let mut i = 0; while i < bytes.len() { let b = bytes[i]; - if b == 0x1b { // ESC + if b == 0x1b { + // ESC if i + 1 < bytes.len() { let b1 = bytes[i + 1]; // CSI: ESC [ ... final byte in 0x40..=0x7E @@ -59,7 +60,10 @@ fn strip_ansi(input: &str) -> String { i += 2; while i < bytes.len() { let c = bytes[i]; - if (0x40..=0x7E).contains(&c) { i += 1; break; } + if (0x40..=0x7E).contains(&c) { + i += 1; + break; + } i += 1; } continue; @@ -68,8 +72,14 @@ fn strip_ansi(input: &str) -> String { if b1 == b']' { i += 2; while i < bytes.len() { - if bytes[i] == 0x07 { i += 1; break; } - if bytes[i] == 0x1b && i + 1 < bytes.len() && bytes[i + 1] == b'\\' { i += 2; break; } + if bytes[i] == 0x07 { + i += 1; + break; + } + if bytes[i] == 0x1b && i + 1 < bytes.len() && bytes[i + 1] == b'\\' { + i += 2; + break; + } i += 1; } continue; @@ -268,9 +278,9 @@ impl Scheduler { }); } }, - Err(e) => { + Err(e) => { failure_summary = Some(format!("ssh/runner error: {}", e)); - warn!(error = %e, request_id = %item.ctx.request_id, ip = %ip, "ssh runner execution failed"); + warn!(error = %e, request_id = %item.ctx.request_id, ip = %ip, "ssh runner execution failed"); } } } @@ -322,10 +332,10 @@ impl Scheduler { let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Failed).await; // Publish combined dead-letter with original request and error if let Some(ref orig) = item.original { - let dl = DeadLetter::new("prepare", format!("{}", e), orig.clone()); - if let Err(pe) = publish_deadletter(&mq_cfg_in, &dl).await { - warn!(error = %pe, request_id = %item.ctx.request_id, "failed to publish dead-letter"); - } + let dl = DeadLetter::new("prepare", format!("{}", e), orig.clone()); + if let Err(pe) = publish_deadletter(&mq_cfg_in, &dl).await { + warn!(error = %pe, request_id = %item.ctx.request_id, "failed to publish dead-letter"); + } } info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, "failed"); return; @@ -349,9 +359,13 @@ impl Scheduler { } // Tail the VM serial console file and record lines into the job log until cancelled. -async fn tail_console_to_joblog(persist: Arc, request_id: Uuid, console_path: PathBuf) -> miette::Result<()> { - use tokio::time::{sleep, Duration}; +async fn tail_console_to_joblog( + persist: Arc, + request_id: Uuid, + console_path: PathBuf, +) -> miette::Result<()> { use miette::IntoDiagnostic as _; + use tokio::time::{Duration, sleep}; // Negative sequence numbers for early console logs so they sort before runner logs (which start at 0). let mut seq: i64 = -1_000_000; // ample headroom @@ -390,7 +404,11 @@ async fn tail_console_to_joblog(persist: Arc, request_id: Uuid, console } // Snapshot the entire console log file once and persist its lines with negative seq numbers. -async fn snapshot_console_to_joblog(persist: Arc, request_id: Uuid, console_path: PathBuf) -> miette::Result<()> { +async fn snapshot_console_to_joblog( + persist: Arc, + request_id: Uuid, + console_path: PathBuf, +) -> miette::Result<()> { use miette::IntoDiagnostic as _; match tokio::fs::read_to_string(&console_path).await { Ok(content) => { @@ -416,9 +434,17 @@ async fn snapshot_console_to_joblog(persist: Arc, request_id: Uuid, con } #[cfg(all(target_os = "linux", feature = "libvirt"))] -async fn discover_guest_ip_virsh(domain: &str, timeout: Duration, libvirt_uri: &str, libvirt_network: Option<&str>) -> Option { - use tokio::{task, time::{sleep, Instant, Duration}}; +async fn discover_guest_ip_virsh( + domain: &str, + timeout: Duration, + libvirt_uri: &str, + libvirt_network: Option<&str>, +) -> Option { use std::process::Command; + use tokio::{ + task, + time::{Duration, Instant, sleep}, + }; use tracing::{debug, warn}; fn parse_ipv4_from_text(s: &str) -> Option { @@ -432,12 +458,18 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration, libvirt_uri: & } let mut cur = String::new(); for ch in line.chars() { - if ch.is_ascii_digit() || ch == '.' { cur.push(ch); } else { - if cur.split('.').count() == 4 { return Some(cur.clone()); } + if ch.is_ascii_digit() || ch == '.' { + cur.push(ch); + } else { + if cur.split('.').count() == 4 { + return Some(cur.clone()); + } cur.clear(); } } - if cur.split('.').count() == 4 { return Some(cur); } + if cur.split('.').count() == 4 { + return Some(cur); + } } None } @@ -446,7 +478,10 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration, libvirt_uri: & let s = String::from_utf8_lossy(b); let s = s.trim(); let mut out = s.to_string(); - if out.len() > 800 { out.truncate(800); out.push_str("…"); } + if out.len() > 800 { + out.truncate(800); + out.push_str("…"); + } out } @@ -472,17 +507,29 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration, libvirt_uri: & let mut cmd = Command::new("virsh"); cmd.args(&args_vec); cmd.output() - }).await { + }) + .await + { Ok(Ok(out)) => { let ok = out.status.success(); let status = out.status.code(); let stdout = preview_bytes(&out.stdout); let stderr = preview_bytes(&out.stderr); - Attempt { cmd: cmd_desc, ok, status, stdout, stderr } - } - other => { - Attempt { cmd: cmd_desc, ok: false, status: None, stdout: String::new(), stderr: format!("spawn error: {:?}", other) } + Attempt { + cmd: cmd_desc, + ok, + status, + stdout, + stderr, + } } + other => Attempt { + cmd: cmd_desc, + ok: false, + status: None, + stdout: String::new(), + stderr: format!("spawn error: {:?}", other), + }, } } @@ -493,11 +540,17 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration, libvirt_uri: & // 1) Try domifaddr via agent then lease then default for source in [Some("agent"), Some("lease"), None] { let mut args = vec!["domifaddr", domain]; - if let Some(src) = source { args.push("--source"); args.push(src); } + if let Some(src) = source { + args.push("--source"); + args.push(src); + } let att = run_cmd(&args, libvirt_uri).await; debug!(domain=%domain, method=%format!("domifaddr/{:?}", source), ok=att.ok, status=?att.status, stdout=%att.stdout, stderr=%att.stderr, cmd=%att.cmd, "virsh attempt"); if att.ok { - if let Some(ip) = parse_ipv4_from_text(&att.stdout) { debug!(domain=%domain, method=%format!("domifaddr/{:?}", source), ip=%ip, "discovered IP"); return Some(ip); } + if let Some(ip) = parse_ipv4_from_text(&att.stdout) { + debug!(domain=%domain, method=%format!("domifaddr/{:?}", source), ip=%ip, "discovered IP"); + return Some(ip); + } } last_attempts.push(att); } @@ -508,7 +561,8 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration, libvirt_uri: & let att_domiflist = run_cmd(&["domiflist", domain], libvirt_uri).await; debug!(domain=%domain, method="domiflist", ok=att_domiflist.ok, status=?att_domiflist.status, stdout=%att_domiflist.stdout, stderr=%att_domiflist.stderr, cmd=%att_domiflist.cmd, "virsh attempt"); if att_domiflist.ok { - for line in att_domiflist.stdout.lines().skip(2) { // skip header lines + for line in att_domiflist.stdout.lines().skip(2) { + // skip header lines let cols: Vec<&str> = line.split_whitespace().collect(); if cols.len() >= 5 { // columns: Interface Type Source Model MAC @@ -521,12 +575,19 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration, libvirt_uri: & last_attempts.push(att_domiflist); // Fallback to env if domiflist didn't give a Source network if net_name.is_none() { - if let Some(n) = libvirt_network { if !n.is_empty() { net_name = Some(n.to_string()); } } + if let Some(n) = libvirt_network { + if !n.is_empty() { + net_name = Some(n.to_string()); + } + } } // 2a) Parse leases file if we have network and MAC if let (Some(net), Some(mac_s)) = (net_name.clone(), mac.clone()) { let path = format!("/var/lib/libvirt/dnsmasq/{}.leases", net); - let content_opt = task::spawn_blocking(move || std::fs::read_to_string(path)).await.ok().and_then(|r| r.ok()); + let content_opt = task::spawn_blocking(move || std::fs::read_to_string(path)) + .await + .ok() + .and_then(|r| r.ok()); if let Some(content) = content_opt { let mut best_ip: Option = None; let mut best_epoch: i64 = -1; @@ -541,7 +602,10 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration, libvirt_uri: & } } } - if let Some(ip) = best_ip { debug!(domain=%domain, method="dnsmasq.leases", ip=%ip, "discovered IP"); return Some(ip); } + if let Some(ip) = best_ip { + debug!(domain=%domain, method="dnsmasq.leases", ip=%ip, "discovered IP"); + return Some(ip); + } } } // 2b) Try virsh net-dhcp-leases @@ -552,7 +616,10 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration, libvirt_uri: & if let Some(ref mac_s) = mac { for line in att_leases.stdout.lines() { if line.to_ascii_lowercase().contains(mac_s.as_str()) { - if let Some(ip) = parse_ipv4_from_text(line) { debug!(domain=%domain, method="net-dhcp-leases", ip=%ip, "discovered IP"); return Some(ip); } + if let Some(ip) = parse_ipv4_from_text(line) { + debug!(domain=%domain, method="net-dhcp-leases", ip=%ip, "discovered IP"); + return Some(ip); + } } } } @@ -585,7 +652,7 @@ async fn run_job_via_ssh_with_retry( request_id: uuid::Uuid, timeout: Duration, ) -> miette::Result<(bool, i32, Vec<(bool, String)>)> { - use tokio::time::{sleep, Instant}; + use tokio::time::{Instant, sleep}; use tracing::warn; let deadline = Instant::now() + timeout; @@ -603,7 +670,9 @@ async fn run_job_via_ssh_with_retry( repo_url.clone(), commit_sha.clone(), request_id, - ).await { + ) + .await + { Ok(r) => return Ok(r), Err(e) => { let now = Instant::now(); @@ -759,7 +828,10 @@ async fn run_job_via_ssh_owned( #[cfg(all(target_os = "linux", feature = "libvirt"))] fn is_illumos_label(label: &str) -> bool { let l = label.to_ascii_lowercase(); - l.contains("illumos") || l.contains("omnios") || l.contains("openindiana") || l.contains("oi-hipster") + l.contains("illumos") + || l.contains("omnios") + || l.contains("openindiana") + || l.contains("oi-hipster") } #[cfg(all(target_os = "linux", feature = "libvirt"))] @@ -913,7 +985,15 @@ mod tests { libvirt_uri: "qemu:///system".into(), libvirt_network: "default".into(), }; - let sched = Scheduler::new(hv, 2, &caps, persist, Duration::from_millis(10), Arc::new(common::MqConfig::default()), exec); + let sched = Scheduler::new( + hv, + 2, + &caps, + persist, + Duration::from_millis(10), + Arc::new(common::MqConfig::default()), + exec, + ); let tx = sched.sender(); let run = tokio::spawn(async move { let _ = sched.run_with_shutdown(Arc::new(Notify::new())).await; @@ -960,7 +1040,15 @@ mod tests { libvirt_uri: "qemu:///system".into(), libvirt_network: "default".into(), }; - let sched = Scheduler::new(hv, 4, &caps, persist, Duration::from_millis(10), Arc::new(common::MqConfig::default()), exec); + let sched = Scheduler::new( + hv, + 4, + &caps, + persist, + Duration::from_millis(10), + Arc::new(common::MqConfig::default()), + exec, + ); let tx = sched.sender(); let run = tokio::spawn(async move { let _ = sched.run_with_shutdown(Arc::new(Notify::new())).await; diff --git a/crates/workflow-runner/src/main.rs b/crates/workflow-runner/src/main.rs index 9addaaf..d1918f2 100644 --- a/crates/workflow-runner/src/main.rs +++ b/crates/workflow-runner/src/main.rs @@ -1,8 +1,8 @@ use clap::Parser; use miette::{IntoDiagnostic as _, Result}; use serde::Deserialize; -use std::process::Stdio; use std::collections::VecDeque; +use std::process::Stdio; use std::sync::{Arc, Mutex}; use tokio::{ fs, @@ -97,11 +97,14 @@ async fn has_cmd(name: &str) -> bool { } async fn check_writable(dir: &str) -> bool { - if fs::create_dir_all(dir).await.is_err() { return false; } + if fs::create_dir_all(dir).await.is_err() { + return false; + } let test_path = format!("{}/.solstice-writecheck", dir.trim_end_matches('/')); match fs::write(&test_path, b"ok").await { Ok(_) => { - let _ = fs::remove_file(&test_path).await; true + let _ = fs::remove_file(&test_path).await; + true } Err(_) => false, } @@ -110,11 +113,23 @@ async fn check_writable(dir: &str) -> bool { fn parse_repo_host_port(repo: &str) -> Option<(String, u16)> { let r = repo.trim(); if let Some(rest) = r.strip_prefix("https://") { - let host = rest.split('/').next()?.split('@').last()?.split(':').next()?; // ignore embedded user + let host = rest + .split('/') + .next()? + .split('@') + .last()? + .split(':') + .next()?; // ignore embedded user return Some((host.to_string(), 443)); } if let Some(rest) = r.strip_prefix("http://") { - let host = rest.split('/').next()?.split('@').last()?.split(':').next()?; + let host = rest + .split('/') + .next()? + .split('@') + .last()? + .split(':') + .next()?; return Some((host.to_string(), 80)); } if let Some(rest) = r.strip_prefix("ssh://") { @@ -125,18 +140,26 @@ fn parse_repo_host_port(repo: &str) -> Option<(String, u16)> { } // scp-like: user@host:owner/repo.git if let Some(at) = r.find('@') { - if let Some(colon) = r[at+1..].find(':') { // ensure host: - let host = &r[at+1..at+1+colon]; - if !host.is_empty() { return Some((host.to_string(), 22)); } + if let Some(colon) = r[at + 1..].find(':') { + // ensure host: + let host = &r[at + 1..at + 1 + colon]; + if !host.is_empty() { + return Some((host.to_string(), 22)); + } } } None } async fn check_network_connect(host: &str, port: u16, timeout_ms: u64) -> bool { - use tokio::time::{timeout, Duration}; - match timeout(Duration::from_millis(timeout_ms), tokio::net::TcpStream::connect((host, port))).await { - Ok(Ok(_stream)) => { true } + use tokio::time::{Duration, timeout}; + match timeout( + Duration::from_millis(timeout_ms), + tokio::net::TcpStream::connect((host, port)), + ) + .await + { + Ok(Ok(_stream)) => true, _ => false, } } @@ -148,14 +171,28 @@ async fn preflight(repo: &str, workdir: &str) -> Result<()> { let has_wget = has_cmd("wget").await; let has_tar = has_cmd("tar").await; let has_gtar = has_cmd("gtar").await; - for (tool, ok) in [("git", has_git), ("curl", has_curl), ("wget", has_wget), ("tar", has_tar), ("gtar", has_gtar)] { + for (tool, ok) in [ + ("git", has_git), + ("curl", has_curl), + ("wget", has_wget), + ("tar", has_tar), + ("gtar", has_gtar), + ] { let lvl = if ok { "info" } else { "warn" }; let msg = if ok { format!("tool {tool}: available") } else { format!("tool {tool}: missing") }; - println!("{}", ndjson_line("tool_check", lvl, &msg, Some(serde_json::json!({"available": ok, "tool": tool})))); + println!( + "{}", + ndjson_line( + "tool_check", + lvl, + &msg, + Some(serde_json::json!({"available": ok, "tool": tool})) + ) + ); } let can_clone = has_git || ((has_tar || has_gtar) && (has_curl || has_wget)); let lvl = if can_clone { "info" } else { "error" }; @@ -175,14 +212,26 @@ async fn preflight(repo: &str, workdir: &str) -> Result<()> { ) ); if !can_clone { - return Err(miette::miette!("no available method to fetch repository: need git or (tar and (curl|wget))")); + return Err(miette::miette!( + "no available method to fetch repository: need git or (tar and (curl|wget))" + )); } // Workdir writability let writable = check_writable(workdir).await; let lvl = if writable { "info" } else { "error" }; - println!("{}", ndjson_line("env_setup", lvl, "workdir writable", Some(serde_json::json!({"path": workdir, "writable": writable})))); - if !writable { return Err(miette::miette!("workdir is not writable: {}", workdir)); } + println!( + "{}", + ndjson_line( + "env_setup", + lvl, + "workdir writable", + Some(serde_json::json!({"path": workdir, "writable": writable})) + ) + ); + if !writable { + return Err(miette::miette!("workdir is not writable: {}", workdir)); + } // Network reachability (best-effort) if let Some((host, port)) = parse_repo_host_port(repo) { @@ -205,9 +254,17 @@ async fn preflight(repo: &str, workdir: &str) -> Result<()> { async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> Result<()> { // Announce chosen method - println!("{}", ndjson_line("env_setup", "info", "fetch via http archive", Some(serde_json::json!({ - "url": format!("{}/archive/{}.tar.gz", repo_https.trim_end_matches('.').trim_end_matches(".git"), sha) - })))); + println!( + "{}", + ndjson_line( + "env_setup", + "info", + "fetch via http archive", + Some(serde_json::json!({ + "url": format!("{}/archive/{}.tar.gz", repo_https.trim_end_matches('.').trim_end_matches(".git"), sha) + })) + ) + ); // Gitea/Codeberg archive URL pattern: https://codeberg.org///archive/.tar.gz let base = repo_https.trim_end_matches('.').trim_end_matches(".git"); let url = format!("{}/archive/{}.tar.gz", base, sha); @@ -216,7 +273,10 @@ async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> R let tar_bin = if has_cmd("gtar").await { "gtar" } else { "tar" }; // Check if we should allow insecure TLS (last resort) - let insecure = std::env::var("SOLSTICE_ALLOW_INSECURE").ok().map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false); + let insecure = std::env::var("SOLSTICE_ALLOW_INSECURE") + .ok() + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); let curl_flags = if insecure { "-fSLk" } else { "-fSL" }; // Try curl | tar, then wget | tar @@ -234,13 +294,21 @@ async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> R } // On illumos/SunOS images, curl/wget may be missing or CA bundle absent. Try to install tools and CA certs, then retry. - let os = std::env::var("SOLSTICE_OS_OVERRIDE").ok().unwrap_or_else(|| { - // Best-effort OS detection - std::env::consts::OS.to_string() - }); + let os = std::env::var("SOLSTICE_OS_OVERRIDE") + .ok() + .unwrap_or_else(|| { + // Best-effort OS detection + std::env::consts::OS.to_string() + }); // Prefer uname if available - let uname = Command::new("/bin/sh").arg("-lc").arg("uname -s 2>/dev/null || echo unknown").output().await.ok() - .and_then(|o| String::from_utf8(o.stdout).ok()).unwrap_or_default(); + let uname = Command::new("/bin/sh") + .arg("-lc") + .arg("uname -s 2>/dev/null || echo unknown") + .output() + .await + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .unwrap_or_default(); let is_sunos = uname.trim() == "SunOS" || os == "solaris"; if is_sunos { // Try IPS (pkg) first, then pkgin @@ -274,10 +342,14 @@ async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> R } } - Err(miette::miette!("failed to fetch repo archive via HTTP for {url}")) + Err(miette::miette!( + "failed to fetch repo archive via HTTP for {url}" + )) } -fn is_hex(s: &str) -> bool { s.chars().all(|c| c.is_ascii_hexdigit()) } +fn is_hex(s: &str) -> bool { + s.chars().all(|c| c.is_ascii_hexdigit()) +} async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { fs::create_dir_all(workdir).await.into_diagnostic()?; @@ -311,12 +383,18 @@ async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { let cmds = vec![ // Re-initialize repository to ensure correct object format format!("cd {workdir} && rm -rf .git || true"), - if let Some(fmt) = obj_fmt { format!("cd {workdir} && git init --object-format={fmt}") } else { format!("cd {workdir} && git init") }, + if let Some(fmt) = obj_fmt { + format!("cd {workdir} && git init --object-format={fmt}") + } else { + format!("cd {workdir} && git init") + }, format!( "cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo_eff}" ), // Use protocol v2 features when available and keep it light - format!("cd {workdir} && git -c protocol.version=2 fetch --filter=blob:none --depth=1 --no-tags origin {sha}"), + format!( + "cd {workdir} && git -c protocol.version=2 fetch --filter=blob:none --depth=1 --no-tags origin {sha}" + ), // Checkout the requested commit in detached HEAD format!("cd {workdir} && git checkout -q --detach {sha}"), ]; @@ -339,17 +417,32 @@ async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { async fn run_job_script(workdir: &str, script_override: Option<&str>) -> Result { // Determine the script to execute: prefer override from job.yaml, else default .solstice/job.sh let script = if let Some(path) = script_override { - if path.starts_with('/') { path.to_string() } else { format!("{}/{}", workdir, path.trim_start_matches("./")) } + if path.starts_with('/') { + path.to_string() + } else { + format!("{}/{}", workdir, path.trim_start_matches("./")) + } } else { format!("{}/.solstice/job.sh", workdir) }; if !fs::try_exists(&script).await.into_diagnostic()? { warn!(path = %script, "job script not found"); - eprintln!("{}", ndjson_line("job_run", "error", &format!("job script not found at {}", script), None)); + eprintln!( + "{}", + ndjson_line( + "job_run", + "error", + &format!("job script not found at {}", script), + None + ) + ); return Ok(1); } // Emit explicit pre-exec line to aid diagnostics - println!("{}", ndjson_line("job_run", "info", &format!("executing {}", script), None)); + println!( + "{}", + ndjson_line("job_run", "info", &format!("executing {}", script), None) + ); let _ = run_shell(&format!("chmod +x {} || true", script)).await?; let mut cmd = Command::new("/bin/sh"); @@ -371,11 +464,21 @@ async fn run_job_script(workdir: &str, script_override: Option<&str>) -> Result< match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { - let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); + let line = String::from_utf8_lossy(&buf) + .trim_end_matches(['\n', '\r']) + .to_string(); println!("{}", ndjson_line("job_run", "info", &line, None)); } Err(e) => { - eprintln!("{}", ndjson_line("job_run", "error", &format!("error reading stdout: {}", e), None)); + eprintln!( + "{}", + ndjson_line( + "job_run", + "error", + &format!("error reading stdout: {}", e), + None + ) + ); break; } } @@ -391,15 +494,27 @@ async fn run_job_script(workdir: &str, script_override: Option<&str>) -> Result< match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { - let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); + let line = String::from_utf8_lossy(&buf) + .trim_end_matches(['\n', '\r']) + .to_string(); eprintln!("{}", ndjson_line("job_run", "error", &line, None)); if let Ok(mut dq) = last_err2.lock() { - if dq.len() == 20 { dq.pop_front(); } + if dq.len() == 20 { + dq.pop_front(); + } dq.push_back(line); } } Err(e) => { - eprintln!("{}", ndjson_line("job_run", "error", &format!("error reading stderr: {}", e), None)); + eprintln!( + "{}", + ndjson_line( + "job_run", + "error", + &format!("error reading stderr: {}", e), + None + ) + ); break; } } @@ -412,7 +527,15 @@ async fn run_job_script(workdir: &str, script_override: Option<&str>) -> Result< if code != 0 { // Emit a concise failure summary (structured) - eprintln!("{}", ndjson_line("job_run", "error", &format!("job script exited with code {}", code), None)); + eprintln!( + "{}", + ndjson_line( + "job_run", + "error", + &format!("job script exited with code {}", code), + None + ) + ); // Include recent stderr lines for context (structured) let lines: Vec = last_err @@ -421,9 +544,20 @@ async fn run_job_script(workdir: &str, script_override: Option<&str>) -> Result< .map(|dq| dq.iter().cloned().collect()) .unwrap_or_default(); if lines.is_empty() { - eprintln!("{}", ndjson_line("job_run", "warn", "no stderr lines were captured from the script", None)); + eprintln!( + "{}", + ndjson_line( + "job_run", + "warn", + "no stderr lines were captured from the script", + None + ) + ); } else { - eprintln!("{}", ndjson_line("job_run", "info", "recent stderr lines follow", None)); + eprintln!( + "{}", + ndjson_line("job_run", "info", "recent stderr lines follow", None) + ); for l in lines { eprintln!("{}", ndjson_line("job_run", "error", &l, None)); } @@ -434,21 +568,31 @@ async fn run_job_script(workdir: &str, script_override: Option<&str>) -> Result< } #[derive(Debug)] -struct WorkflowStep { name: String, run: String } +struct WorkflowStep { + name: String, + run: String, +} #[derive(Debug)] -struct WorkflowJob { setup: Option, steps: Vec } +struct WorkflowJob { + setup: Option, + steps: Vec, +} fn capture_attr(line: &str, key: &str) -> Option { let pattern1 = format!("{}=\"", key); if let Some(start) = line.find(&pattern1) { let rest = &line[start + pattern1.len()..]; - if let Some(end) = rest.find('"') { return Some(rest[..end].to_string()); } + if let Some(end) = rest.find('"') { + return Some(rest[..end].to_string()); + } } let pattern2 = format!("{}='", key); if let Some(start) = line.find(&pattern2) { let rest = &line[start + pattern2.len()..]; - if let Some(end) = rest.find('\'') { return Some(rest[..end].to_string()); } + if let Some(end) = rest.find('\'') { + return Some(rest[..end].to_string()); + } } None } @@ -463,18 +607,31 @@ fn parse_workflow_for_job(kdl: &str, wanted_job: Option<&str>) -> Option = Vec::new(); let mut setup: Option = None; // If this job is the one we want (or no preference and it's the first job), collect its setup and steps - let take_this = match (wanted_job, id.as_deref()) { (Some(w), Some(i)) => w == i, (None, Some(_)) => true, _ => false }; + let take_this = match (wanted_job, id.as_deref()) { + (Some(w), Some(i)) => w == i, + (None, Some(_)) => true, + _ => false, + }; while let Some(peek) = lines.peek() { let t = peek.trim(); - if t.ends_with('{') { depth += 1; } + if t.ends_with('{') { + depth += 1; + } if t.starts_with('}') { - if depth == 0 { break; } + if depth == 0 { + break; + } depth -= 1; - if depth == 0 { lines.next(); break; } + if depth == 0 { + lines.next(); + break; + } } if take_this { if setup.is_none() && t.starts_with("setup ") && t.contains("path=") { - if let Some(p) = capture_attr(t, "path") { setup = Some(p); } + if let Some(p) = capture_attr(t, "path") { + setup = Some(p); + } } if t.starts_with("step ") && t.contains("run=") { let name = capture_attr(t, "name").unwrap_or_else(|| "unnamed".into()); @@ -485,7 +642,9 @@ fn parse_workflow_for_job(kdl: &str, wanted_job: Option<&str>) -> Option) -> Option Result { // Announce step start - println!("{}", ndjson_line( - "step", - "info", - &format!("starting step: {}", step.name), - Some(serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total})) - )); + println!( + "{}", + ndjson_line( + "step", + "info", + &format!("starting step: {}", step.name), + Some( + serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total}) + ) + ) + ); // Build command and spawn let mut cmd = Command::new("/bin/sh"); @@ -509,7 +673,8 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) let mut child = cmd.spawn().into_diagnostic()?; // Stream output with step fields - let extra = serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total}); + let extra = + serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total}); if let Some(stdout) = child.stdout.take() { let mut reader = BufReader::new(stdout); @@ -520,11 +685,24 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { - let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); - println!("{}", ndjson_line("step_run", "info", &line, Some(extra_out.clone()))); + let line = String::from_utf8_lossy(&buf) + .trim_end_matches(['\n', '\r']) + .to_string(); + println!( + "{}", + ndjson_line("step_run", "info", &line, Some(extra_out.clone())) + ); } Err(e) => { - eprintln!("{}", ndjson_line("step_run", "error", &format!("error reading stdout: {}", e), Some(extra_out.clone()))); + eprintln!( + "{}", + ndjson_line( + "step_run", + "error", + &format!("error reading stdout: {}", e), + Some(extra_out.clone()) + ) + ); break; } } @@ -540,11 +718,24 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { - let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); - eprintln!("{}", ndjson_line("step_run", "error", &line, Some(extra_err.clone()))); + let line = String::from_utf8_lossy(&buf) + .trim_end_matches(['\n', '\r']) + .to_string(); + eprintln!( + "{}", + ndjson_line("step_run", "error", &line, Some(extra_err.clone())) + ); } Err(e) => { - eprintln!("{}", ndjson_line("step_run", "error", &format!("error reading stderr: {}", e), Some(extra_err.clone()))); + eprintln!( + "{}", + ndjson_line( + "step_run", + "error", + &format!("error reading stderr: {}", e), + Some(extra_err.clone()) + ) + ); break; } } @@ -555,33 +746,62 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) let status = child.wait().await.into_diagnostic()?; let code = status.code().unwrap_or(1); if code != 0 { - eprintln!("{}", ndjson_line("step", "error", &format!("step failed: {} (exit {})", step.name, code), Some(extra))); + eprintln!( + "{}", + ndjson_line( + "step", + "error", + &format!("step failed: {} (exit {})", step.name, code), + Some(extra) + ) + ); } else { - println!("{}", ndjson_line("step", "info", &format!("completed step: {}", step.name), Some(serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total, "exit_code": code})))); + println!( + "{}", + ndjson_line( + "step", + "info", + &format!("completed step: {}", step.name), + Some( + serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total, "exit_code": code}) + ) + ) + ); } Ok(code) } async fn run_workflow_if_present(workdir: &str) -> Result> { let path = format!("{}/.solstice/workflow.kdl", workdir); - if !fs::try_exists(&path).await.into_diagnostic()? { return Ok(None); } + if !fs::try_exists(&path).await.into_diagnostic()? { + return Ok(None); + } let kdl = fs::read_to_string(&path).await.into_diagnostic()?; // Determine selected job id from job.yaml let jf = read_job_file().await.ok(); let job_id = jf.and_then(|j| j.workflow_job_id); - let job = match parse_workflow_for_job(&kdl, job_id.as_deref()) { Some(j) => j, None => return Ok(None) }; + let job = match parse_workflow_for_job(&kdl, job_id.as_deref()) { + Some(j) => j, + None => return Ok(None), + }; // Run setup if present if let Some(setup_path) = job.setup.as_deref() { let code = run_setup_script(workdir, setup_path).await?; - if code != 0 { return Ok(Some(code)); } + if code != 0 { + return Ok(Some(code)); + } } - if job.steps.is_empty() { return Ok(None); } + if job.steps.is_empty() { + return Ok(None); + } let total = job.steps.len(); for (i, step) in job.steps.iter().enumerate() { let code = run_step(workdir, step, i + 1, total).await?; - if code != 0 { return Ok(Some(code)); } + if code != 0 { + return Ok(Some(code)); + } } Ok(Some(0)) } @@ -605,24 +825,59 @@ async fn main() -> Result<()> { info!(%repo, %sha, "runner starting"); // Workdir selection: prefer explicit SOLSTICE_WORKDIR, otherwise default to "$HOME/work" - let workdir = std::env::var("SOLSTICE_WORKDIR").ok().or_else(|| { - std::env::var("HOME").ok().map(|home| format!("{}/work", home)) - }).unwrap_or_else(|| "/root/work".into()); + let workdir = std::env::var("SOLSTICE_WORKDIR") + .ok() + .or_else(|| { + std::env::var("HOME") + .ok() + .map(|home| format!("{}/work", home)) + }) + .unwrap_or_else(|| "/root/work".into()); // Emit startup environment and tool checks - let uname = Command::new("/bin/sh").arg("-lc").arg("uname -a || echo unknown").output().await.ok() - .and_then(|o| String::from_utf8(o.stdout).ok()).unwrap_or_else(|| "unknown".into()); + let uname = Command::new("/bin/sh") + .arg("-lc") + .arg("uname -a || echo unknown") + .output() + .await + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .unwrap_or_else(|| "unknown".into()); let uname_trim = uname.trim().to_string(); - println!("{}", ndjson_line("env", "info", &format!("system: {}", uname_trim), Some(serde_json::json!({"uname": uname_trim})))); + println!( + "{}", + ndjson_line( + "env", + "info", + &format!("system: {}", uname_trim), + Some(serde_json::json!({"uname": uname_trim})) + ) + ); // Preflight environment checks (tools, workdir, network) if let Err(e) = preflight(&repo, &workdir).await { - eprintln!("{}", ndjson_line("env_setup", "error", &format!("preflight failed: {}", e), None)); + eprintln!( + "{}", + ndjson_line( + "env_setup", + "error", + &format!("preflight failed: {}", e), + None + ) + ); std::process::exit(1); } // Announce workspace - println!("{}", ndjson_line("env_setup", "info", "workdir", Some(serde_json::json!({"path": workdir})))); + println!( + "{}", + ndjson_line( + "env_setup", + "info", + "workdir", + Some(serde_json::json!({"path": workdir})) + ) + ); let code = match ensure_repo(&repo, &sha, &workdir).await { Ok(_) => { @@ -637,7 +892,15 @@ async fn main() -> Result<()> { } } Err(e) => { - eprintln!("{}", ndjson_line("env_setup", "error", &format!("failed to prepare repo: {}", e), None)); + eprintln!( + "{}", + ndjson_line( + "env_setup", + "error", + &format!("failed to prepare repo: {}", e), + None + ) + ); 1 } }; @@ -651,7 +914,6 @@ async fn main() -> Result<()> { Ok(()) } - // Execute a setup script before workflow steps. Similar to run_job_script but with different categories. async fn run_setup_script(workdir: &str, setup_rel_or_abs: &str) -> Result { // Resolve path @@ -661,20 +923,26 @@ async fn run_setup_script(workdir: &str, setup_rel_or_abs: &str) -> Result format!("{}/{}", workdir, setup_rel_or_abs.trim_start_matches("./")) }; // Announce - println!("{}", ndjson_line( - "setup", - "info", - &format!("executing setup script: {}", setup_rel_or_abs), - Some(serde_json::json!({"path": setup_rel_or_abs})) - )); + println!( + "{}", + ndjson_line( + "setup", + "info", + &format!("executing setup script: {}", setup_rel_or_abs), + Some(serde_json::json!({"path": setup_rel_or_abs})) + ) + ); if !fs::try_exists(&script).await.into_diagnostic()? { - eprintln!("{}", ndjson_line( - "setup", - "error", - &format!("setup script not found at {}", script), - Some(serde_json::json!({"path": setup_rel_or_abs})) - )); + eprintln!( + "{}", + ndjson_line( + "setup", + "error", + &format!("setup script not found at {}", script), + Some(serde_json::json!({"path": setup_rel_or_abs})) + ) + ); return Ok(1); } @@ -696,11 +964,21 @@ async fn run_setup_script(workdir: &str, setup_rel_or_abs: &str) -> Result match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { - let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); + let line = String::from_utf8_lossy(&buf) + .trim_end_matches(['\n', '\r']) + .to_string(); println!("{}", ndjson_line("setup_run", "info", &line, None)); } Err(e) => { - eprintln!("{}", ndjson_line("setup_run", "error", &format!("error reading stdout: {}", e), None)); + eprintln!( + "{}", + ndjson_line( + "setup_run", + "error", + &format!("error reading stdout: {}", e), + None + ) + ); break; } } @@ -715,11 +993,21 @@ async fn run_setup_script(workdir: &str, setup_rel_or_abs: &str) -> Result match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { - let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); + let line = String::from_utf8_lossy(&buf) + .trim_end_matches(['\n', '\r']) + .to_string(); eprintln!("{}", ndjson_line("setup_run", "error", &line, None)); } Err(e) => { - eprintln!("{}", ndjson_line("setup_run", "error", &format!("error reading stderr: {}", e), None)); + eprintln!( + "{}", + ndjson_line( + "setup_run", + "error", + &format!("error reading stderr: {}", e), + None + ) + ); break; } } @@ -730,9 +1018,25 @@ async fn run_setup_script(workdir: &str, setup_rel_or_abs: &str) -> Result let status = child.wait().await.into_diagnostic()?; let code = status.code().unwrap_or(1); if code != 0 { - eprintln!("{}", ndjson_line("setup", "error", &format!("setup script exited with code {}", code), Some(serde_json::json!({"path": setup_rel_or_abs, "exit_code": code})))); + eprintln!( + "{}", + ndjson_line( + "setup", + "error", + &format!("setup script exited with code {}", code), + Some(serde_json::json!({"path": setup_rel_or_abs, "exit_code": code})) + ) + ); } else { - println!("{}", ndjson_line("setup", "info", &format!("completed setup: {}", setup_rel_or_abs), Some(serde_json::json!({"exit_code": code})))); + println!( + "{}", + ndjson_line( + "setup", + "info", + &format!("completed setup: {}", setup_rel_or_abs), + Some(serde_json::json!({"exit_code": code})) + ) + ); } Ok(code) }