use clap::Parser; use miette::{IntoDiagnostic as _, Result}; use serde::Deserialize; use std::process::Stdio; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use tokio::{ fs, io::{AsyncBufReadExt, BufReader}, process::Command, }; use tracing::{error, info, warn}; fn ndjson_line(category: &str, level: &str, msg: &str, extra: Option) -> String { let mut obj = serde_json::json!({ "category": category, "level": level, "msg": msg, }); if let Some(ext) = extra { if let Some(map) = obj.as_object_mut() { if let Some(eo) = ext.as_object() { for (k, v) in eo.iter() { map.insert(k.clone(), v.clone()); } } } } obj.to_string() } #[derive(Parser, Debug)] #[command( name = "solstice-runner", version, about = "Solstice CI Workflow Runner (VM agent)" )] struct Opts { /// Optional path to workflow KDL file (for local testing only) #[arg(long, env = "SOL_WORKFLOW_PATH")] workflow: Option, } #[derive(Debug, Deserialize)] struct JobFile { repo_url: String, commit_sha: String, #[serde(default)] workflow_job_id: Option, #[serde(default)] script_path: Option, #[serde(default)] group_id: Option, } async fn read_job_file() -> Result { let path = std::env::var("SOLSTICE_JOB_FILE").unwrap_or_else(|_| "/etc/solstice/job.yaml".into()); let bytes = fs::read(&path).await.into_diagnostic()?; let jf: JobFile = serde_yaml::from_slice(&bytes).into_diagnostic()?; Ok(jf) } async fn run_shell(cmd: &str) -> Result { info!(%cmd, "exec"); let status = Command::new("/bin/sh") .arg("-lc") .arg(cmd) .status() .await .into_diagnostic()?; let code = status.code().unwrap_or(1); if code != 0 { return Err(miette::miette!("command failed ({code}): {cmd}")); } Ok(code) } fn to_https_if_codeberg(repo: &str) -> String { // Convert SSH Codeberg URL to HTTPS for anonymous fetches if let Some(rest) = repo.strip_prefix("ssh://git@codeberg.org/") { return format!("https://codeberg.org/{rest}"); } repo.to_string() } async fn has_cmd(name: &str) -> bool { tokio::process::Command::new("/bin/sh") .arg("-lc") .arg(format!("command -v {} >/dev/null 2>&1", name)) .status() .await .ok() .and_then(|s| s.code()) .map(|c| c == 0) .unwrap_or(false) } async fn check_writable(dir: &str) -> bool { if fs::create_dir_all(dir).await.is_err() { return false; } let test_path = format!("{}/.solstice-writecheck", dir.trim_end_matches('/')); match fs::write(&test_path, b"ok").await { Ok(_) => { let _ = fs::remove_file(&test_path).await; true } Err(_) => false, } } fn parse_repo_host_port(repo: &str) -> Option<(String, u16)> { let r = repo.trim(); if let Some(rest) = r.strip_prefix("https://") { let host = rest.split('/').next()?.split('@').last()?.split(':').next()?; // ignore embedded user return Some((host.to_string(), 443)); } if let Some(rest) = r.strip_prefix("http://") { let host = rest.split('/').next()?.split('@').last()?.split(':').next()?; return Some((host.to_string(), 80)); } if let Some(rest) = r.strip_prefix("ssh://") { // ssh://[user@]host/owner/repo let first = rest.split('/').next()?; let host = first.split('@').last()?.split(':').next()?; return Some((host.to_string(), 22)); } // scp-like: user@host:owner/repo.git if let Some(at) = r.find('@') { if let Some(colon) = r[at+1..].find(':') { // ensure host: let host = &r[at+1..at+1+colon]; if !host.is_empty() { return Some((host.to_string(), 22)); } } } None } async fn check_network_connect(host: &str, port: u16, timeout_ms: u64) -> bool { use tokio::time::{timeout, Duration}; match timeout(Duration::from_millis(timeout_ms), tokio::net::TcpStream::connect((host, port))).await { Ok(Ok(_stream)) => { true } _ => false, } } async fn preflight(repo: &str, workdir: &str) -> Result<()> { // Tool availability let has_git = has_cmd("git").await; let has_curl = has_cmd("curl").await; let has_wget = has_cmd("wget").await; let has_tar = has_cmd("tar").await; for (tool, ok) in [("git", has_git), ("curl", has_curl), ("wget", has_wget), ("tar", has_tar)] { let lvl = if ok { "info" } else { "warn" }; let msg = if ok { format!("tool {tool}: available") } else { format!("tool {tool}: missing") }; println!("{}", ndjson_line("tool_check", lvl, &msg, Some(serde_json::json!({"available": ok, "tool": tool})))); } let can_clone = has_git || (has_tar && (has_curl || has_wget)); let lvl = if can_clone { "info" } else { "error" }; println!( "{}", ndjson_line( "env_setup", lvl, "clone capability", Some(serde_json::json!({ "git": has_git, "tar": has_tar, "curl": has_curl, "wget": has_wget, "can_clone": can_clone })) ) ); if !can_clone { return Err(miette::miette!("no available method to fetch repository: need git or (tar and (curl|wget))")); } // Workdir writability let writable = check_writable(workdir).await; let lvl = if writable { "info" } else { "error" }; println!("{}", ndjson_line("env_setup", lvl, "workdir writable", Some(serde_json::json!({"path": workdir, "writable": writable})))); if !writable { return Err(miette::miette!("workdir is not writable: {}", workdir)); } // Network reachability (best-effort) if let Some((host, port)) = parse_repo_host_port(repo) { let ok = check_network_connect(&host, port, 2000).await; let lvl = if ok { "info" } else { "warn" }; let status_msg = if ok { "reachable" } else { "unreachable" }; println!( "{}", ndjson_line( "env", lvl, &format!("network: {}:{} {}", host, port, status_msg), Some(serde_json::json!({"host": host, "port": port, "reachable": ok})) ) ); } Ok(()) } async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> Result<()> { // Announce chosen method println!("{}", ndjson_line("env_setup", "info", "fetch via http archive", Some(serde_json::json!({ "url": format!("{}/archive/{}.tar.gz", repo_https.trim_end_matches('.').trim_end_matches(".git"), sha) })))); // Gitea/Codeberg archive URL pattern: https://codeberg.org///archive/.tar.gz let base = repo_https.trim_end_matches('.').trim_end_matches(".git"); let url = format!("{}/archive/{}.tar.gz", base, sha); // Check if we should allow insecure TLS (last resort) let insecure = std::env::var("SOLSTICE_ALLOW_INSECURE").ok().map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false); let curl_flags = if insecure { "-fSLk" } else { "-fSL" }; // Try curl | tar, then wget | tar let cmd_curl = format!( "mkdir -p {workdir} && curl {curl_flags} {url} | tar -xz -C {workdir} --strip-components=1" ); if run_shell(&cmd_curl).await.is_ok() { return Ok(()); } let cmd_wget = format!( "mkdir -p {workdir} && wget -qO- {url} | tar -xz -C {workdir} --strip-components=1" ); if run_shell(&cmd_wget).await.is_ok() { return Ok(()); } // On illumos/SunOS images, curl/wget may be missing or CA bundle absent. Try to install tools and CA certs, then retry. let os = std::env::var("SOLSTICE_OS_OVERRIDE").ok().unwrap_or_else(|| { // Best-effort OS detection std::env::consts::OS.to_string() }); // Prefer uname if available let uname = Command::new("/bin/sh").arg("-lc").arg("uname -s 2>/dev/null || echo unknown").output().await.ok() .and_then(|o| String::from_utf8(o.stdout).ok()).unwrap_or_default(); let is_sunos = uname.trim() == "SunOS" || os == "solaris"; if is_sunos { // Try IPS (pkg) first, then pkgin let _ = run_shell("sudo pkg refresh || true").await; // curl if run_shell("sudo pkg install -v web/curl").await.is_err() { let _ = run_shell("sudo pkgin -y install curl").await; } // CA certificates (package name may differ per distro) let _ = run_shell("sudo pkg install -v web/ca-certificates || sudo pkg install -v library/security/ca-certificates || true").await; let _ = run_shell("sudo pkgin -y install mozilla-rootcerts || true").await; let _ = run_shell("sudo mozilla-rootcerts install || true").await; // Retry with curl and wget if run_shell(&cmd_curl).await.is_ok() { return Ok(()); } if run_shell(&cmd_wget).await.is_ok() { return Ok(()); } // As a last resort with explicit opt-in, try curl --insecure if insecure { let cmd_curl_insecure = format!( "mkdir -p {workdir} && curl -fSLk {url} | tar -xz -C {workdir} --strip-components=1" ); if run_shell(&cmd_curl_insecure).await.is_ok() { warn!("used curl --insecure to fetch repo archive on SunOS"); return Ok(()); } } } Err(miette::miette!("failed to fetch repo archive via HTTP for {url}")) } fn is_hex(s: &str) -> bool { s.chars().all(|c| c.is_ascii_hexdigit()) } async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { fs::create_dir_all(workdir).await.into_diagnostic()?; let repo_eff = to_https_if_codeberg(repo); // Prefer git when available; fall back to archive download when git is missing or fetch fails. if has_cmd("git").await { // Determine object format: if SHA looks like 64-hex, assume sha256 object format let sha_len = sha.len(); let is_sha256 = sha_len == 64 && is_hex(sha); let obj_fmt = if is_sha256 { Some("sha256") } else { None }; // Emit NDJSON about chosen method println!( "{}", ndjson_line( "env_setup", "info", &format!( "fetch via git (object-format={})", obj_fmt.unwrap_or("sha1") ), Some(serde_json::json!({ "method": "git", "object_format": obj_fmt.unwrap_or("sha1"), "sha_len": sha_len, "detached": true })) ) ); let cmds = vec![ // Re-initialize repository to ensure correct object format format!("cd {workdir} && rm -rf .git || true"), if let Some(fmt) = obj_fmt { format!("cd {workdir} && git init --object-format={fmt}") } else { format!("cd {workdir} && git init") }, format!( "cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo_eff}" ), // Use protocol v2 features when available and keep it light format!("cd {workdir} && git -c protocol.version=2 fetch --filter=blob:none --depth=1 --no-tags origin {sha}"), // Checkout the requested commit in detached HEAD format!("cd {workdir} && git checkout -q --detach {sha}"), ]; for c in cmds { match run_shell(&c).await { Ok(_) => {} Err(e) => { // Try archive fallback once on any git failure warn!(error = %e, "git path failed; attempting archive fallback"); return fetch_repo_via_archive(&repo_eff, sha, workdir).await; } } } Ok(()) } else { fetch_repo_via_archive(&repo_eff, sha, workdir).await } } async fn run_job_script(workdir: &str, script_override: Option<&str>) -> Result { // Determine the script to execute: prefer override from job.yaml, else default .solstice/job.sh let script = if let Some(path) = script_override { if path.starts_with('/') { path.to_string() } else { format!("{}/{}", workdir, path.trim_start_matches("./")) } } else { format!("{}/.solstice/job.sh", workdir) }; if !fs::try_exists(&script).await.into_diagnostic()? { warn!(path = %script, "job script not found"); eprintln!("{}", ndjson_line("job_run", "error", &format!("job script not found at {}", script), None)); return Ok(1); } // Emit explicit pre-exec line to aid diagnostics println!("{}", ndjson_line("job_run", "info", &format!("executing {}", script), None)); let _ = run_shell(&format!("chmod +x {} || true", script)).await?; let mut cmd = Command::new("/bin/sh"); cmd.arg("-lc") .arg(format!("set -ex; cd {workdir}; {}", script)) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = cmd.spawn().into_diagnostic()?; // Buffer the last N stderr lines for failure summary let last_err: Arc>> = Arc::new(Mutex::new(VecDeque::with_capacity(20))); // Attach readers to child stdout/stderr so logs stream as NDJSON categorized under job_run if let Some(stdout) = child.stdout.take() { let mut reader = BufReader::new(stdout); tokio::spawn(async move { loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); println!("{}", ndjson_line("job_run", "info", &line, None)); } Err(e) => { eprintln!("{}", ndjson_line("job_run", "error", &format!("error reading stdout: {}", e), None)); break; } } } }); } if let Some(stderr) = child.stderr.take() { let mut reader = BufReader::new(stderr); let last_err2 = last_err.clone(); tokio::spawn(async move { loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); eprintln!("{}", ndjson_line("job_run", "error", &line, None)); if let Ok(mut dq) = last_err2.lock() { if dq.len() == 20 { dq.pop_front(); } dq.push_back(line); } } Err(e) => { eprintln!("{}", ndjson_line("job_run", "error", &format!("error reading stderr: {}", e), None)); break; } } } }); } let status = child.wait().await.into_diagnostic()?; let code = status.code().unwrap_or(1); if code != 0 { // Emit a concise failure summary (structured) eprintln!("{}", ndjson_line("job_run", "error", &format!("job script exited with code {}", code), None)); // Include recent stderr lines for context (structured) let lines: Vec = last_err .lock() .ok() .map(|dq| dq.iter().cloned().collect()) .unwrap_or_default(); if lines.is_empty() { eprintln!("{}", ndjson_line("job_run", "warn", "no stderr lines were captured from the script", None)); } else { eprintln!("{}", ndjson_line("job_run", "info", "recent stderr lines follow", None)); for l in lines { eprintln!("{}", ndjson_line("job_run", "error", &l, None)); } } } Ok(code) } #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let _t = common::init_tracing("solstice-workflow-runner")?; let _opts = Opts::parse(); // Try env overrides first for robustness let repo = std::env::var("SOLSTICE_REPO_URL").ok(); let sha = std::env::var("SOLSTICE_COMMIT_SHA").ok(); let (repo, sha) = match (repo, sha) { (Some(r), Some(s)) => (r, s), _ => { let jf = read_job_file().await?; (jf.repo_url, jf.commit_sha) } }; info!(%repo, %sha, "runner starting"); // Workdir selection: prefer explicit SOLSTICE_WORKDIR, otherwise default to "$HOME/work" let workdir = std::env::var("SOLSTICE_WORKDIR").ok().or_else(|| { std::env::var("HOME").ok().map(|home| format!("{}/work", home)) }).unwrap_or_else(|| "/root/work".into()); // Emit startup environment and tool checks let uname = Command::new("/bin/sh").arg("-lc").arg("uname -a || echo unknown").output().await.ok() .and_then(|o| String::from_utf8(o.stdout).ok()).unwrap_or_else(|| "unknown".into()); let uname_trim = uname.trim().to_string(); println!("{}", ndjson_line("env", "info", &format!("system: {}", uname_trim), Some(serde_json::json!({"uname": uname_trim})))); // Preflight environment checks (tools, workdir, network) if let Err(e) = preflight(&repo, &workdir).await { eprintln!("{}", ndjson_line("env_setup", "error", &format!("preflight failed: {}", e), None)); std::process::exit(1); } // Announce workspace println!("{}", ndjson_line("env_setup", "info", "workdir", Some(serde_json::json!({"path": workdir})))); let code = match ensure_repo(&repo, &sha, &workdir).await { Ok(_) => { // Read job.yaml to get optional script override let jf = read_job_file().await.ok(); let script_override = jf.as_ref().and_then(|j| j.script_path.as_deref()); // proceed to run job script run_job_script(&workdir, script_override).await? } Err(e) => { eprintln!("{}", ndjson_line("env_setup", "error", &format!("failed to prepare repo: {}", e), None)); 1 } }; if code != 0 { error!(exit_code = code, "job script failed"); std::process::exit(code); } info!("job complete"); Ok(()) }