use clap::Parser; use miette::{IntoDiagnostic as _, Result}; use serde::Deserialize; use std::collections::VecDeque; use std::process::Stdio; use std::sync::{Arc, Mutex}; use tokio::{ fs, io::{AsyncBufReadExt, BufReader}, process::Command, }; use tracing::{error, info, warn}; fn ndjson_line(category: &str, level: &str, msg: &str, extra: Option) -> String { let mut obj = serde_json::json!({ "category": category, "level": level, "msg": msg, }); if let Some(ext) = extra { if let Some(map) = obj.as_object_mut() { if let Some(eo) = ext.as_object() { for (k, v) in eo.iter() { map.insert(k.clone(), v.clone()); } } } } obj.to_string() } #[derive(Parser, Debug)] #[command( name = "solstice-runner", version, about = "Solstice CI Workflow Runner (VM agent)" )] struct Opts { /// Optional path to workflow KDL file (for local testing only) #[arg(long, env = "SOL_WORKFLOW_PATH")] workflow: Option, } #[derive(Debug, Deserialize)] struct JobFile { repo_url: String, commit_sha: String, #[serde(default)] workflow_job_id: Option, #[serde(default)] script_path: Option, #[serde(default)] group_id: Option, } async fn read_job_file() -> Result { let path = std::env::var("SOLSTICE_JOB_FILE").unwrap_or_else(|_| "/etc/solstice/job.yaml".into()); let bytes = fs::read(&path).await.into_diagnostic()?; let jf: JobFile = serde_yaml::from_slice(&bytes).into_diagnostic()?; Ok(jf) } async fn run_shell(cmd: &str) -> Result { info!(%cmd, "exec"); let status = Command::new("/bin/sh") .arg("-lc") .arg(cmd) .status() .await .into_diagnostic()?; let code = status.code().unwrap_or(1); if code != 0 { return Err(miette::miette!("command failed ({code}): {cmd}")); } Ok(code) } fn to_https_if_codeberg(repo: &str) -> String { // Convert SSH Codeberg URL to HTTPS for anonymous fetches if let Some(rest) = repo.strip_prefix("ssh://git@codeberg.org/") { return format!("https://codeberg.org/{rest}"); } repo.to_string() } async fn has_cmd(name: &str) -> bool { tokio::process::Command::new("/bin/sh") .arg("-lc") .arg(format!("command -v {} >/dev/null 2>&1", name)) .status() .await .ok() .and_then(|s| s.code()) .map(|c| c == 0) .unwrap_or(false) } async fn check_writable(dir: &str) -> bool { if fs::create_dir_all(dir).await.is_err() { return false; } let test_path = format!("{}/.solstice-writecheck", dir.trim_end_matches('/')); match fs::write(&test_path, b"ok").await { Ok(_) => { let _ = fs::remove_file(&test_path).await; true } Err(_) => false, } } fn parse_repo_host_port(repo: &str) -> Option<(String, u16)> { let r = repo.trim(); if let Some(rest) = r.strip_prefix("https://") { let host = rest .split('/') .next()? .split('@') .last()? .split(':') .next()?; // ignore embedded user return Some((host.to_string(), 443)); } if let Some(rest) = r.strip_prefix("http://") { let host = rest .split('/') .next()? .split('@') .last()? .split(':') .next()?; return Some((host.to_string(), 80)); } if let Some(rest) = r.strip_prefix("ssh://") { // ssh://[user@]host/owner/repo let first = rest.split('/').next()?; let host = first.split('@').last()?.split(':').next()?; return Some((host.to_string(), 22)); } // scp-like: user@host:owner/repo.git if let Some(at) = r.find('@') { if let Some(colon) = r[at + 1..].find(':') { // ensure host: let host = &r[at + 1..at + 1 + colon]; if !host.is_empty() { return Some((host.to_string(), 22)); } } } None } async fn check_network_connect(host: &str, port: u16, timeout_ms: u64) -> bool { use tokio::time::{Duration, timeout}; match timeout( Duration::from_millis(timeout_ms), tokio::net::TcpStream::connect((host, port)), ) .await { Ok(Ok(_stream)) => true, _ => false, } } async fn preflight(repo: &str, workdir: &str) -> Result<()> { // Tool availability let has_git = has_cmd("git").await; let has_curl = has_cmd("curl").await; let has_wget = has_cmd("wget").await; let has_tar = has_cmd("tar").await; let has_gtar = has_cmd("gtar").await; for (tool, ok) in [ ("git", has_git), ("curl", has_curl), ("wget", has_wget), ("tar", has_tar), ("gtar", has_gtar), ] { let lvl = if ok { "info" } else { "warn" }; let msg = if ok { format!("tool {tool}: available") } else { format!("tool {tool}: missing") }; println!( "{}", ndjson_line( "tool_check", lvl, &msg, Some(serde_json::json!({"available": ok, "tool": tool})) ) ); } let can_clone = has_git || ((has_tar || has_gtar) && (has_curl || has_wget)); let lvl = if can_clone { "info" } else { "error" }; println!( "{}", ndjson_line( "env_setup", lvl, "clone capability", Some(serde_json::json!({ "git": has_git, "tar": has_tar, "curl": has_curl, "wget": has_wget, "can_clone": can_clone })) ) ); if !can_clone { return Err(miette::miette!( "no available method to fetch repository: need git or (tar and (curl|wget))" )); } // Workdir writability let writable = check_writable(workdir).await; let lvl = if writable { "info" } else { "error" }; println!( "{}", ndjson_line( "env_setup", lvl, "workdir writable", Some(serde_json::json!({"path": workdir, "writable": writable})) ) ); if !writable { return Err(miette::miette!("workdir is not writable: {}", workdir)); } // Network reachability (best-effort) if let Some((host, port)) = parse_repo_host_port(repo) { let ok = check_network_connect(&host, port, 2000).await; let lvl = if ok { "info" } else { "warn" }; let status_msg = if ok { "reachable" } else { "unreachable" }; println!( "{}", ndjson_line( "env", lvl, &format!("network: {}:{} {}", host, port, status_msg), Some(serde_json::json!({"host": host, "port": port, "reachable": ok})) ) ); } Ok(()) } async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> Result<()> { // Announce chosen method println!( "{}", ndjson_line( "env_setup", "info", "fetch via http archive", Some(serde_json::json!({ "url": format!("{}/archive/{}.tar.gz", repo_https.trim_end_matches('.').trim_end_matches(".git"), sha) })) ) ); // Gitea/Codeberg archive URL pattern: https://codeberg.org///archive/.tar.gz let base = repo_https.trim_end_matches('.').trim_end_matches(".git"); let url = format!("{}/archive/{}.tar.gz", base, sha); // Prefer GNU tar (gtar) when available (illumos' tar is not compatible with -z/--strip-components) let tar_bin = if has_cmd("gtar").await { "gtar" } else { "tar" }; // Check if we should allow insecure TLS (last resort) let insecure = std::env::var("SOLSTICE_ALLOW_INSECURE") .ok() .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) .unwrap_or(false); let curl_flags = if insecure { "-fSLk" } else { "-fSL" }; // Try curl | tar, then wget | tar let cmd_curl = format!( "mkdir -p {workdir} && curl {curl_flags} {url} | {tar_bin} -xz -C {workdir} --strip-components=1" ); if run_shell(&cmd_curl).await.is_ok() { return Ok(()); } let cmd_wget = format!( "mkdir -p {workdir} && wget -qO- {url} | {tar_bin} -xz -C {workdir} --strip-components=1" ); if run_shell(&cmd_wget).await.is_ok() { return Ok(()); } // On illumos/SunOS images, curl/wget may be missing or CA bundle absent. Try to install tools and CA certs, then retry. let os = std::env::var("SOLSTICE_OS_OVERRIDE") .ok() .unwrap_or_else(|| { // Best-effort OS detection std::env::consts::OS.to_string() }); // Prefer uname if available let uname = Command::new("/bin/sh") .arg("-lc") .arg("uname -s 2>/dev/null || echo unknown") .output() .await .ok() .and_then(|o| String::from_utf8(o.stdout).ok()) .unwrap_or_default(); let is_sunos = uname.trim() == "SunOS" || os == "solaris"; if is_sunos { // Try IPS (pkg) first, then pkgin let _ = run_shell("sudo pkg refresh || true").await; // curl if run_shell("sudo pkg install -v web/curl").await.is_err() { let _ = run_shell("sudo pkgin -y install curl").await; } // CA certificates (package name may differ per distro) let _ = run_shell("sudo pkg install -v web/ca-certificates || sudo pkg install -v library/security/ca-certificates || true").await; let _ = run_shell("sudo pkgin -y install mozilla-rootcerts || true").await; let _ = run_shell("sudo mozilla-rootcerts install || true").await; // Retry with curl and wget if run_shell(&cmd_curl).await.is_ok() { return Ok(()); } if run_shell(&cmd_wget).await.is_ok() { return Ok(()); } // As a last resort with explicit opt-in, try curl --insecure if insecure { let cmd_curl_insecure = format!( "mkdir -p {workdir} && curl -fSLk {url} | {tar_bin} -xz -C {workdir} --strip-components=1" ); if run_shell(&cmd_curl_insecure).await.is_ok() { warn!("used curl --insecure to fetch repo archive on SunOS"); return Ok(()); } } } Err(miette::miette!( "failed to fetch repo archive via HTTP for {url}" )) } fn is_hex(s: &str) -> bool { s.chars().all(|c| c.is_ascii_hexdigit()) } async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { fs::create_dir_all(workdir).await.into_diagnostic()?; let repo_eff = to_https_if_codeberg(repo); // Prefer git when available; fall back to archive download when git is missing or fetch fails. if has_cmd("git").await { // Determine object format: if SHA looks like 64-hex, assume sha256 object format let sha_len = sha.len(); let is_sha256 = sha_len == 64 && is_hex(sha); let obj_fmt = if is_sha256 { Some("sha256") } else { None }; // Emit NDJSON about chosen method println!( "{}", ndjson_line( "env_setup", "info", &format!( "fetch via git (object-format={})", obj_fmt.unwrap_or("sha1") ), Some(serde_json::json!({ "method": "git", "object_format": obj_fmt.unwrap_or("sha1"), "sha_len": sha_len, "detached": true })) ) ); let cmds = vec![ // Re-initialize repository to ensure correct object format format!("cd {workdir} && rm -rf .git || true"), if let Some(fmt) = obj_fmt { format!("cd {workdir} && git init --object-format={fmt}") } else { format!("cd {workdir} && git init") }, format!( "cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo_eff}" ), // Use protocol v2 features when available and keep it light format!( "cd {workdir} && git -c protocol.version=2 fetch --filter=blob:none --depth=1 --no-tags origin {sha}" ), // Checkout the requested commit in detached HEAD format!("cd {workdir} && git checkout -q --detach {sha}"), ]; for c in cmds { match run_shell(&c).await { Ok(_) => {} Err(e) => { // Try archive fallback once on any git failure warn!(error = %e, "git path failed; attempting archive fallback"); return fetch_repo_via_archive(&repo_eff, sha, workdir).await; } } } Ok(()) } else { fetch_repo_via_archive(&repo_eff, sha, workdir).await } } async fn run_job_script(workdir: &str, script_override: Option<&str>) -> Result { // Determine the script to execute: prefer override from job.yaml, else default .solstice/job.sh let script = if let Some(path) = script_override { if path.starts_with('/') { path.to_string() } else { format!("{}/{}", workdir, path.trim_start_matches("./")) } } else { format!("{}/.solstice/job.sh", workdir) }; if !fs::try_exists(&script).await.into_diagnostic()? { warn!(path = %script, "job script not found"); eprintln!( "{}", ndjson_line( "job_run", "error", &format!("job script not found at {}", script), None ) ); return Ok(1); } // Emit explicit pre-exec line to aid diagnostics println!( "{}", ndjson_line("job_run", "info", &format!("executing {}", script), None) ); let _ = run_shell(&format!("chmod +x {} || true", script)).await?; let mut cmd = Command::new("/bin/sh"); cmd.arg("-lc") .arg(format!("set -ex; cd {workdir}; {}", script)) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = cmd.spawn().into_diagnostic()?; // Buffer the last N stderr lines for failure summary let last_err: Arc>> = Arc::new(Mutex::new(VecDeque::with_capacity(20))); // Attach readers to child stdout/stderr so logs stream as NDJSON categorized under job_run if let Some(stdout) = child.stdout.take() { let mut reader = BufReader::new(stdout); tokio::spawn(async move { loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf) .trim_end_matches(['\n', '\r']) .to_string(); println!("{}", ndjson_line("job_run", "info", &line, None)); } Err(e) => { eprintln!( "{}", ndjson_line( "job_run", "error", &format!("error reading stdout: {}", e), None ) ); break; } } } }); } if let Some(stderr) = child.stderr.take() { let mut reader = BufReader::new(stderr); let last_err2 = last_err.clone(); tokio::spawn(async move { loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf) .trim_end_matches(['\n', '\r']) .to_string(); eprintln!("{}", ndjson_line("job_run", "error", &line, None)); if let Ok(mut dq) = last_err2.lock() { if dq.len() == 20 { dq.pop_front(); } dq.push_back(line); } } Err(e) => { eprintln!( "{}", ndjson_line( "job_run", "error", &format!("error reading stderr: {}", e), None ) ); break; } } } }); } let status = child.wait().await.into_diagnostic()?; let code = status.code().unwrap_or(1); if code != 0 { // Emit a concise failure summary (structured) eprintln!( "{}", ndjson_line( "job_run", "error", &format!("job script exited with code {}", code), None ) ); // Include recent stderr lines for context (structured) let lines: Vec = last_err .lock() .ok() .map(|dq| dq.iter().cloned().collect()) .unwrap_or_default(); if lines.is_empty() { eprintln!( "{}", ndjson_line( "job_run", "warn", "no stderr lines were captured from the script", None ) ); } else { eprintln!( "{}", ndjson_line("job_run", "info", "recent stderr lines follow", None) ); for l in lines { eprintln!("{}", ndjson_line("job_run", "error", &l, None)); } } } Ok(code) } #[derive(Debug)] struct WorkflowStep { name: String, run: String, } #[derive(Debug)] struct WorkflowJob { setup: Option, steps: Vec, } fn capture_attr(line: &str, key: &str) -> Option { let pattern1 = format!("{}=\"", key); if let Some(start) = line.find(&pattern1) { let rest = &line[start + pattern1.len()..]; if let Some(end) = rest.find('"') { return Some(rest[..end].to_string()); } } let pattern2 = format!("{}='", key); if let Some(start) = line.find(&pattern2) { let rest = &line[start + pattern2.len()..]; if let Some(end) = rest.find('\'') { return Some(rest[..end].to_string()); } } None } fn parse_workflow_for_job(kdl: &str, wanted_job: Option<&str>) -> Option { let mut lines = kdl.lines().peekable(); while let Some(line) = lines.next() { let l = line.trim(); if l.starts_with("job ") && l.contains("id=") { let id = capture_attr(l, "id"); let mut depth = if l.ends_with('{') { 1 } else { 0 }; let mut steps: Vec = Vec::new(); let mut setup: Option = None; // If this job is the one we want (or no preference and it's the first job), collect its setup and steps let take_this = match (wanted_job, id.as_deref()) { (Some(w), Some(i)) => w == i, (None, Some(_)) => true, _ => false, }; while let Some(peek) = lines.peek() { let t = peek.trim(); if t.ends_with('{') { depth += 1; } if t.starts_with('}') { if depth == 0 { break; } depth -= 1; if depth == 0 { lines.next(); break; } } if take_this { if setup.is_none() && t.starts_with("setup ") && t.contains("path=") { if let Some(p) = capture_attr(t, "path") { setup = Some(p); } } if t.starts_with("step ") && t.contains("run=") { let name = capture_attr(t, "name").unwrap_or_else(|| "unnamed".into()); if let Some(run) = capture_attr(t, "run") { steps.push(WorkflowStep { name, run }); } } } lines.next(); } if take_this { return Some(WorkflowJob { setup, steps }); } } } None } fn slugify_step_name(name: &str) -> String { // Lowercase, replace non-alphanumeric with '-', collapse dashes, trim. let mut out = String::with_capacity(name.len()); let mut prev_dash = false; for ch in name.chars() { let c = ch.to_ascii_lowercase(); if c.is_ascii_alphanumeric() { out.push(c); prev_dash = false; } else if !prev_dash { out.push('-'); prev_dash = true; } } // trim leading/trailing '-' let s = out.trim_matches('-').to_string(); if s.is_empty() { "step".to_string() } else { s } } async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) -> Result { // Derive per-step category from the step name (slugified) let step_slug = slugify_step_name(&step.name); let step_category = format!("step:{}", step_slug); // Announce step start in the step-specific category println!( "{}", ndjson_line( &step_category, "info", &format!("starting step: {}", step.name), Some( serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total}) ) ) ); // Build command and spawn let mut cmd = Command::new("/bin/sh"); cmd.arg("-lc") .arg(format!("set -e; cd {workdir}; {}", step.run)) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = cmd.spawn().into_diagnostic()?; // Stream output with step fields; use per-step category for both stdout and stderr let extra = serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total}); let step_cat_clone_out = step_category.clone(); if let Some(stdout) = child.stdout.take() { let mut reader = BufReader::new(stdout); let extra_out = extra.clone(); tokio::spawn(async move { let cat = step_cat_clone_out; loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf) .trim_end_matches(['\n', '\r']) .to_string(); println!( "{}", ndjson_line(&cat, "info", &line, Some(extra_out.clone())) ); } Err(e) => { eprintln!( "{}", ndjson_line( &cat, "error", &format!("error reading stdout: {}", e), Some(extra_out.clone()) ) ); break; } } } }); } if let Some(stderr) = child.stderr.take() { let mut reader = BufReader::new(stderr); let extra_err = extra.clone(); let step_cat_clone_err = step_category.clone(); tokio::spawn(async move { let cat = step_cat_clone_err; loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf) .trim_end_matches(['\n', '\r']) .to_string(); eprintln!( "{}", ndjson_line(&cat, "error", &line, Some(extra_err.clone())) ); } Err(e) => { eprintln!( "{}", ndjson_line( &cat, "error", &format!("error reading stderr: {}", e), Some(extra_err.clone()) ) ); break; } } } }); } let status = child.wait().await.into_diagnostic()?; let code = status.code().unwrap_or(1); if code != 0 { eprintln!( "{}", ndjson_line( &step_category, "error", &format!("step failed: {} (exit {})", step.name, code), Some(extra) ) ); } else { println!( "{}", ndjson_line( &step_category, "info", &format!("completed step: {}", step.name), Some( serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total, "exit_code": code}) ) ) ); } Ok(code) } async fn run_workflow_if_present(workdir: &str) -> Result> { let path = format!("{}/.solstice/workflow.kdl", workdir); if !fs::try_exists(&path).await.into_diagnostic()? { return Ok(None); } let kdl = fs::read_to_string(&path).await.into_diagnostic()?; // Determine selected job id from job.yaml let jf = read_job_file().await.ok(); let job_id = jf.and_then(|j| j.workflow_job_id); let job = match parse_workflow_for_job(&kdl, job_id.as_deref()) { Some(j) => j, None => return Ok(None), }; // Run setup if present if let Some(setup_path) = job.setup.as_deref() { let code = run_setup_script(workdir, setup_path).await?; if code != 0 { return Ok(Some(code)); } } if job.steps.is_empty() { return Ok(None); } let total = job.steps.len(); for (i, step) in job.steps.iter().enumerate() { let code = run_step(workdir, step, i + 1, total).await?; if code != 0 { return Ok(Some(code)); } } Ok(Some(0)) } #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let _t = common::init_tracing("solstice-workflow-runner")?; let _opts = Opts::parse(); // Try env overrides first for robustness let repo = std::env::var("SOLSTICE_REPO_URL").ok(); let sha = std::env::var("SOLSTICE_COMMIT_SHA").ok(); let (repo, sha) = match (repo, sha) { (Some(r), Some(s)) => (r, s), _ => { let jf = read_job_file().await?; (jf.repo_url, jf.commit_sha) } }; info!(%repo, %sha, "runner starting"); // Workdir selection: prefer explicit SOLSTICE_WORKDIR, otherwise default to "$HOME/work" let workdir = std::env::var("SOLSTICE_WORKDIR") .ok() .or_else(|| { std::env::var("HOME") .ok() .map(|home| format!("{}/work", home)) }) .unwrap_or_else(|| "/root/work".into()); // Emit startup environment and tool checks let uname = Command::new("/bin/sh") .arg("-lc") .arg("uname -a || echo unknown") .output() .await .ok() .and_then(|o| String::from_utf8(o.stdout).ok()) .unwrap_or_else(|| "unknown".into()); let uname_trim = uname.trim().to_string(); println!( "{}", ndjson_line( "env", "info", &format!("system: {}", uname_trim), Some(serde_json::json!({"uname": uname_trim})) ) ); // Preflight environment checks (tools, workdir, network) if let Err(e) = preflight(&repo, &workdir).await { eprintln!( "{}", ndjson_line( "env_setup", "error", &format!("preflight failed: {}", e), None ) ); std::process::exit(1); } // Announce workspace println!( "{}", ndjson_line( "env_setup", "info", "workdir", Some(serde_json::json!({"path": workdir})) ) ); let code = match ensure_repo(&repo, &sha, &workdir).await { Ok(_) => { // Prefer workflow.kdl when present; otherwise run legacy script match run_workflow_if_present(&workdir).await? { Some(code) => code, None => { let jf = read_job_file().await.ok(); let script_override = jf.as_ref().and_then(|j| j.script_path.as_deref()); run_job_script(&workdir, script_override).await? } } } Err(e) => { eprintln!( "{}", ndjson_line( "env_setup", "error", &format!("failed to prepare repo: {}", e), None ) ); 1 } }; if code != 0 { error!(exit_code = code, "workflow failed"); std::process::exit(code); } info!("job complete"); Ok(()) } // Execute a setup script before workflow steps. Similar to run_job_script but with different categories. async fn run_setup_script(workdir: &str, setup_rel_or_abs: &str) -> Result { // Resolve path let script = if setup_rel_or_abs.starts_with('/') { setup_rel_or_abs.to_string() } else { format!("{}/{}", workdir, setup_rel_or_abs.trim_start_matches("./")) }; // Announce println!( "{}", ndjson_line( "setup", "info", &format!("executing setup script: {}", setup_rel_or_abs), Some(serde_json::json!({"path": setup_rel_or_abs})) ) ); if !fs::try_exists(&script).await.into_diagnostic()? { eprintln!( "{}", ndjson_line( "setup", "error", &format!("setup script not found at {}", script), Some(serde_json::json!({"path": setup_rel_or_abs})) ) ); return Ok(1); } let _ = run_shell(&format!("chmod +x {} || true", script)).await?; let mut cmd = Command::new("/bin/sh"); cmd.arg("-lc") .arg(format!("set -e; cd {workdir}; {}", script)) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = cmd.spawn().into_diagnostic()?; // Stream output as setup_run if let Some(stdout) = child.stdout.take() { let mut reader = BufReader::new(stdout); tokio::spawn(async move { loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf) .trim_end_matches(['\n', '\r']) .to_string(); println!("{}", ndjson_line("setup_run", "info", &line, None)); } Err(e) => { eprintln!( "{}", ndjson_line( "setup_run", "error", &format!("error reading stdout: {}", e), None ) ); break; } } } }); } if let Some(stderr) = child.stderr.take() { let mut reader = BufReader::new(stderr); tokio::spawn(async move { loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf) .trim_end_matches(['\n', '\r']) .to_string(); eprintln!("{}", ndjson_line("setup_run", "error", &line, None)); } Err(e) => { eprintln!( "{}", ndjson_line( "setup_run", "error", &format!("error reading stderr: {}", e), None ) ); break; } } } }); } let status = child.wait().await.into_diagnostic()?; let code = status.code().unwrap_or(1); if code != 0 { eprintln!( "{}", ndjson_line( "setup", "error", &format!("setup script exited with code {}", code), Some(serde_json::json!({"path": setup_rel_or_abs, "exit_code": code})) ) ); } else { println!( "{}", ndjson_line( "setup", "info", &format!("completed setup: {}", setup_rel_or_abs), Some(serde_json::json!({"exit_code": code})) ) ); } Ok(code) }