use clap::Parser; use common::runner::v1::{JobEnd, LogChunk, LogItem, log_item::Event, runner_client::RunnerClient}; use miette::{IntoDiagnostic as _, Result}; use serde::Deserialize; use std::process::Stdio; use tokio::sync::mpsc; use tokio::{ fs, io::{AsyncBufReadExt, BufReader}, process::Command, }; use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; #[derive(Parser, Debug)] #[command( name = "solstice-runner", version, about = "Solstice CI Workflow Runner (VM agent)" )] struct Opts { /// Optional path to workflow KDL file (for local testing only) #[arg(long, env = "SOL_WORKFLOW_PATH")] workflow: Option, } #[derive(Debug, Deserialize)] struct JobFile { repo_url: String, commit_sha: String, } async fn read_job_file() -> Result { let path = std::env::var("SOLSTICE_JOB_FILE").unwrap_or_else(|_| "/etc/solstice/job.yaml".into()); let bytes = fs::read(&path).await.into_diagnostic()?; let jf: JobFile = serde_yaml::from_slice(&bytes).into_diagnostic()?; Ok(jf) } async fn run_shell(cmd: &str) -> Result { info!(%cmd, "exec"); let status = Command::new("/bin/sh") .arg("-lc") .arg(cmd) .status() .await .into_diagnostic()?; let code = status.code().unwrap_or(1); if code != 0 { return Err(miette::miette!("command failed ({code}): {cmd}")); } Ok(code) } fn to_https_if_codeberg(repo: &str) -> String { // Convert SSH Codeberg URL to HTTPS for anonymous fetches if let Some(rest) = repo.strip_prefix("ssh://git@codeberg.org/") { return format!("https://codeberg.org/{rest}"); } repo.to_string() } async fn has_cmd(name: &str) -> bool { tokio::process::Command::new("/bin/sh") .arg("-lc") .arg(format!("command -v {} >/dev/null 2>&1", name)) .status() .await .ok() .and_then(|s| s.code()) .map(|c| c == 0) .unwrap_or(false) } async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> Result<()> { // Gitea/Codeberg archive URL pattern: https://codeberg.org///archive/.tar.gz let base = repo_https.trim_end_matches('.').trim_end_matches(".git"); let url = format!("{}/archive/{}.tar.gz", base, sha); // Check if we should allow insecure TLS (last resort) let insecure = std::env::var("SOLSTICE_ALLOW_INSECURE").ok().map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false); let curl_flags = if insecure { "-fSLk" } else { "-fSL" }; // Try curl | tar, then wget | tar let cmd_curl = format!( "mkdir -p {workdir} && curl {curl_flags} {url} | tar -xz -C {workdir} --strip-components=1" ); if run_shell(&cmd_curl).await.is_ok() { return Ok(()); } let cmd_wget = format!( "mkdir -p {workdir} && wget -qO- {url} | tar -xz -C {workdir} --strip-components=1" ); if run_shell(&cmd_wget).await.is_ok() { return Ok(()); } // On illumos/SunOS images, curl/wget may be missing or CA bundle absent. Try to install tools and CA certs, then retry. let os = std::env::var("SOLSTICE_OS_OVERRIDE").ok().unwrap_or_else(|| { // Best-effort OS detection std::env::consts::OS.to_string() }); // Prefer uname if available let uname = Command::new("/bin/sh").arg("-lc").arg("uname -s 2>/dev/null || echo unknown").output().await.ok() .and_then(|o| String::from_utf8(o.stdout).ok()).unwrap_or_default(); let is_sunos = uname.trim() == "SunOS" || os == "solaris"; if is_sunos { // Try IPS (pkg) first, then pkgin let _ = run_shell("sudo pkg refresh || true").await; // curl if run_shell("sudo pkg install -v web/curl").await.is_err() { let _ = run_shell("sudo pkgin -y install curl").await; } // CA certificates (package name may differ per distro) let _ = run_shell("sudo pkg install -v web/ca-certificates || sudo pkg install -v library/security/ca-certificates || true").await; let _ = run_shell("sudo pkgin -y install mozilla-rootcerts || true").await; let _ = run_shell("sudo mozilla-rootcerts install || true").await; // Retry with curl and wget if run_shell(&cmd_curl).await.is_ok() { return Ok(()); } if run_shell(&cmd_wget).await.is_ok() { return Ok(()); } // As a last resort with explicit opt-in, try curl --insecure if insecure { let cmd_curl_insecure = format!( "mkdir -p {workdir} && curl -fSLk {url} | tar -xz -C {workdir} --strip-components=1" ); if run_shell(&cmd_curl_insecure).await.is_ok() { warn!("used curl --insecure to fetch repo archive on SunOS"); return Ok(()); } } } Err(miette::miette!("failed to fetch repo archive via HTTP for {url}")) } async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { fs::create_dir_all(workdir).await.into_diagnostic()?; let repo_eff = to_https_if_codeberg(repo); // Prefer git when available; fall back to archive download when git is missing or fetch fails. if has_cmd("git").await { let cmds = vec![ format!("cd {workdir} && git init"), format!( "cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo_eff}" ), format!("cd {workdir} && git fetch --depth=1 origin {sha}"), format!("cd {workdir} && git checkout -q FETCH_HEAD"), ]; for c in cmds { match run_shell(&c).await { Ok(_) => {} Err(e) => { // Try archive fallback once on any git failure warn!(error = %e, "git path failed; attempting archive fallback"); return fetch_repo_via_archive(&repo_eff, sha, workdir).await; } } } Ok(()) } else { fetch_repo_via_archive(&repo_eff, sha, workdir).await } } async fn run_job_script_streamed( workdir: &str, tx: Option>, request_id: &str, ) -> Result { let script = format!("{}/.solstice/job.sh", workdir); if !fs::try_exists(&script).await.into_diagnostic()? { warn!(path = %script, "job script not found"); if let Some(tx0) = tx.as_ref() { let _ = tx0 .send(LogItem { request_id: request_id.to_string(), event: Some(Event::Log(LogChunk { line: format!("[runner] job script not found at {}", script), stderr: true, })), }) .await; } else { eprintln!("[runner] job script not found at {}", script); } return Ok(1); } // Emit explicit pre-exec line to aid diagnostics if let Some(tx0) = tx.as_ref() { let _ = tx0 .send(LogItem { request_id: request_id.to_string(), event: Some(Event::Log(LogChunk { line: format!("[runner] executing {}", script), stderr: false, })), }) .await; } else { println!("[runner] executing {}", script); } let _ = run_shell(&format!("chmod +x {} || true", script)).await?; let mut cmd = Command::new("/bin/sh"); cmd.arg("-lc") .arg(format!("cd {workdir} && {}", script)) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = cmd.spawn().into_diagnostic()?; if let Some(tx) = tx.clone() { if let Some(stdout) = child.stdout.take() { let mut reader = BufReader::new(stdout); let tx2 = tx.clone(); let req = request_id.to_string(); tokio::spawn(async move { loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); // Always echo to console so serial captures logs even without gRPC println!("{}", line); let _ = tx2 .send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line, stderr: false, })), }) .await; } Err(e) => { let _ = tx2 .send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line: format!("[runner] error reading stdout: {e}"), stderr: true, })), }) .await; break; } } } }); } if let Some(stderr) = child.stderr.take() { let mut reader = BufReader::new(stderr); let tx2 = tx.clone(); let req = request_id.to_string(); tokio::spawn(async move { loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); let _ = tx2 .send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line, stderr: true })), }) .await; } Err(e) => { let _ = tx2 .send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line: format!("[runner] error reading stderr: {e}"), stderr: true, })), }) .await; break; } } } }); } } else { // If no streaming, still attach to child I/O to avoid blocking let _ = child.stdout.take(); let _ = child.stderr.take(); } let status = child.wait().await.into_diagnostic()?; Ok(status.code().unwrap_or(1)) } #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let _t = common::init_tracing("solstice-workflow-runner")?; let _opts = Opts::parse(); // Try env overrides first for robustness let repo = std::env::var("SOLSTICE_REPO_URL").ok(); let sha = std::env::var("SOLSTICE_COMMIT_SHA").ok(); let (repo, sha) = match (repo, sha) { (Some(r), Some(s)) => (r, s), _ => { let jf = read_job_file().await?; (jf.repo_url, jf.commit_sha) } }; info!(%repo, %sha, "runner starting"); let workdir = std::env::var("SOLSTICE_WORKDIR").unwrap_or_else(|_| "/root/work".into()); // Setup gRPC streaming if orchestrator address and request id are provided let orch_addr = std::env::var("SOLSTICE_ORCH_ADDR").ok(); let request_id_env = std::env::var("SOLSTICE_REQUEST_ID").ok(); // Use provided request id or empty string (server will warn if invalid) let request_id_effective = request_id_env.unwrap_or_default(); let mut tx_opt: Option> = None; let mut stream_handle: Option> = None; if let Some(addr) = orch_addr.clone() { match RunnerClient::connect(format!("http://{addr}")).await { Ok(mut client) => { let (tx, rx) = mpsc::channel::(512); let stream = ReceiverStream::new(rx); // Spawn client task and keep a handle so we can await graceful flush on shutdown. let handle = tokio::spawn(async move { if let Err(e) = client.stream_logs(stream).await { warn!(error = %e, "log stream to orchestrator terminated with error"); } }); stream_handle = Some(handle); tx_opt = Some(tx); } Err(e) => { // Explicitly inform console so serial capture gets this eprintln!("[runner] failed to connect to orchestrator gRPC at {}: {e}. Falling back to serial console only.", addr); } } } // Emit a first line visibly regardless of streaming println!("runner starting: repo={} sha={}", repo, sha); if let Some(ref tx) = tx_opt { let _ = tx .send(LogItem { request_id: request_id_effective.clone(), event: Some(Event::Log(LogChunk { line: format!("runner starting: repo={repo} sha={sha}"), stderr: false, })), }) .await; } let code = match ensure_repo(&repo, &sha, &workdir).await { Ok(_) => { // proceed to run job script run_job_script_streamed(&workdir, tx_opt.clone(), &request_id_effective).await? } Err(e) => { // Report checkout failure to orchestrator and return non-zero if let Some(tx) = tx_opt.clone() { let _ = tx .send(LogItem { request_id: request_id_effective.clone(), event: Some(Event::Log(LogChunk { line: format!("[runner] failed to prepare repo: {e}"), stderr: true, })), }) .await; } 1 } }; // Send JobEnd if streaming enabled if let Some(tx) = tx_opt.clone() { let _ = tx .send(LogItem { request_id: request_id_effective.clone(), event: Some(Event::End(JobEnd { exit_code: code, success: code == 0, repo_url: repo.clone(), commit_sha: sha.clone(), })), }) .await; // Drop the last sender to close the client stream and allow server to flush drop(tx); tx_opt = None; // Give the client task a brief moment to flush and then await it tokio::time::sleep(std::time::Duration::from_millis(150)).await; if let Some(h) = stream_handle { let _ = h.await; } } if code != 0 { error!(exit_code = code, "job script failed"); std::process::exit(code); } info!("job complete"); Ok(()) }