diff --git a/.solstice/job.sh b/.solstice/job.sh index 2f85b29..3fa753e 100755 --- a/.solstice/job.sh +++ b/.solstice/job.sh @@ -4,6 +4,11 @@ set -euo pipefail # The runner clones the repo at the requested commit and executes this script. # It attempts to ensure required tools (git, curl, protobuf compiler, Rust) exist. +# Ensure a sane HOME even under non-login shells with set -u +export HOME=${HOME:-/root} +# Quieter noninteractive installs where supported +export DEBIAN_FRONTEND=${DEBIAN_FRONTEND:-noninteractive} + log() { printf "[job] %s\n" "$*" >&2; } detect_pm() { @@ -71,7 +76,11 @@ ensure_rust() { log "installing Rust toolchain with rustup" curl -fsSL https://sh.rustup.rs | sh -s -- -y # shellcheck disable=SC1091 - source "$HOME/.cargo/env" + if [ -f "$HOME/.cargo/env" ]; then + . "$HOME/.cargo/env" + else + export PATH="$HOME/.cargo/bin:$PATH" + fi } main() { diff --git a/crates/orchestrator/src/grpc.rs b/crates/orchestrator/src/grpc.rs index f0eb372..9b1de33 100644 --- a/crates/orchestrator/src/grpc.rs +++ b/crates/orchestrator/src/grpc.rs @@ -32,7 +32,11 @@ impl Runner for RunnerSvc { let mut commit_sha: Option = None; let mut exit_code: i32 = 0; let mut success: bool = true; + let mut lines_stdout: usize = 0; + let mut lines_stderr: usize = 0; + let mut got_end: bool = false; + info!("runner log stream opened"); while let Some(item) = stream .next() .await @@ -42,7 +46,10 @@ impl Runner for RunnerSvc { // Correlate request id if req_id.is_none() { match uuid::Uuid::parse_str(&item.request_id) { - Ok(u) => req_id = Some(u), + Ok(u) => { + info!(request_id = %u, "runner log stream identified"); + req_id = Some(u) + } Err(_) => { warn!(request_id = %item.request_id, "invalid request_id from runner"); } @@ -52,8 +59,10 @@ impl Runner for RunnerSvc { match ev { common::runner::v1::log_item::Event::Log(chunk) => { if chunk.stderr { + lines_stderr += 1; info!(request_id = %item.request_id, line = %chunk.line, "runner:stderr"); } else { + lines_stdout += 1; info!(request_id = %item.request_id, line = %chunk.line, "runner:stdout"); } } @@ -62,11 +71,14 @@ impl Runner for RunnerSvc { success = end.success; repo_url = Some(end.repo_url); commit_sha = Some(end.commit_sha); + got_end = true; + info!(exit_code, success, "runner log stream received End"); } } } } + info!(lines_stdout, lines_stderr, got_end, "runner log stream closed"); // Publish final status if we have enough context if let (Some(id), Some(repo), Some(sha)) = (req_id.as_ref(), repo_url.as_ref(), commit_sha.as_ref()) diff --git a/crates/workflow-runner/src/main.rs b/crates/workflow-runner/src/main.rs index dd22a58..c099ca9 100644 --- a/crates/workflow-runner/src/main.rs +++ b/crates/workflow-runner/src/main.rs @@ -46,24 +46,107 @@ async fn run_shell(cmd: &str) -> Result { .status() .await .into_diagnostic()?; - Ok(status.code().unwrap_or(1)) + let code = status.code().unwrap_or(1); + if code != 0 { + return Err(miette::miette!("command failed ({code}): {cmd}")); + } + Ok(code) +} + +fn to_https_if_codeberg(repo: &str) -> String { + // Convert SSH Codeberg URL to HTTPS for anonymous fetches + if let Some(rest) = repo.strip_prefix("ssh://git@codeberg.org/") { + return format!("https://codeberg.org/{rest}"); + } + repo.to_string() +} + +async fn has_cmd(name: &str) -> bool { + tokio::process::Command::new("/bin/sh") + .arg("-lc") + .arg(format!("command -v {} >/dev/null 2>&1", name)) + .status() + .await + .ok() + .and_then(|s| s.code()) + .map(|c| c == 0) + .unwrap_or(false) +} + +async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> Result<()> { + // Gitea/Codeberg archive URL pattern: https://codeberg.org///archive/.tar.gz + let base = repo_https.trim_end_matches('.').trim_end_matches(".git"); + let url = format!("{}/archive/{}.tar.gz", base, sha); + + // Try curl | tar, then wget | tar + let cmd_curl = format!( + "mkdir -p {workdir} && curl -fSL {url} | tar -xz -C {workdir} --strip-components=1" + ); + if run_shell(&cmd_curl).await.is_ok() { + return Ok(()); + } + let cmd_wget = format!( + "mkdir -p {workdir} && wget -qO- {url} | tar -xz -C {workdir} --strip-components=1" + ); + if run_shell(&cmd_wget).await.is_ok() { + return Ok(()); + } + + // On illumos/SunOS images, curl/wget may be missing. Try to install curl and retry. + let os = std::env::var("SOLSTICE_OS_OVERRIDE").ok().unwrap_or_else(|| { + // Best-effort OS detection + std::env::consts::OS.to_string() + }); + // Prefer uname if available + let uname = Command::new("/bin/sh").arg("-lc").arg("uname -s 2>/dev/null || echo unknown").output().await.ok() + .and_then(|o| String::from_utf8(o.stdout).ok()).unwrap_or_default(); + let is_sunos = uname.trim() == "SunOS" || os == "solaris"; + if is_sunos { + // Try IPS (pkg) first, then pkgin + let _ = run_shell("sudo pkg refresh || true").await; + if run_shell("sudo pkg install -v web/curl").await.is_err() { + let _ = run_shell("sudo pkgin -y install curl").await; + } + // Retry with curl + if run_shell(&cmd_curl).await.is_ok() { + return Ok(()); + } + if run_shell(&cmd_wget).await.is_ok() { + return Ok(()); + } + } + + Err(miette::miette!("failed to fetch repo archive via HTTP for {url}")) } async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { fs::create_dir_all(workdir).await.into_diagnostic()?; - // Use system git to avoid libgit2 cross issues - let cmds = vec![ - format!("cd {workdir} && git init"), - format!( - "cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo}" - ), - format!("cd {workdir} && git fetch --depth=1 origin {sha}"), - format!("cd {workdir} && git checkout -q FETCH_HEAD"), - ]; - for c in cmds { - let _ = run_shell(&c).await?; + let repo_eff = to_https_if_codeberg(repo); + + // Prefer git when available; fall back to archive download when git is missing or fetch fails. + if has_cmd("git").await { + let cmds = vec![ + format!("cd {workdir} && git init"), + format!( + "cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo_eff}" + ), + format!("cd {workdir} && git fetch --depth=1 origin {sha}"), + format!("cd {workdir} && git checkout -q FETCH_HEAD"), + ]; + for c in cmds { + match run_shell(&c).await { + Ok(_) => {} + Err(e) => { + // Try archive fallback once on any git failure + warn!(error = %e, "git path failed; attempting archive fallback"); + return fetch_repo_via_archive(&repo_eff, sha, workdir).await; + } + } + } + Ok(()) + } else { + fetch_repo_via_archive(&repo_eff, sha, workdir).await } - Ok(()) } async fn run_job_script_streamed( @@ -74,7 +157,30 @@ async fn run_job_script_streamed( let script = format!("{}/.solstice/job.sh", workdir); if !fs::try_exists(&script).await.into_diagnostic()? { warn!(path = %script, "job script not found"); - return Ok(0); + if let Some(tx0) = tx.as_ref() { + let _ = tx0 + .send(LogItem { + request_id: request_id.to_string(), + event: Some(Event::Log(LogChunk { + line: format!("[runner] job script not found at {}", script), + stderr: true, + })), + }) + .await; + } + return Ok(1); + } + // Emit explicit pre-exec line to aid diagnostics + if let Some(tx0) = tx.as_ref() { + let _ = tx0 + .send(LogItem { + request_id: request_id.to_string(), + event: Some(Event::Log(LogChunk { + line: format!("[runner] executing {}", script), + stderr: false, + })), + }) + .await; } let _ = run_shell(&format!("chmod +x {} || true", script)).await?; @@ -87,35 +193,73 @@ async fn run_job_script_streamed( if let Some(tx) = tx.clone() { if let Some(stdout) = child.stdout.take() { - let mut lines = BufReader::new(stdout).lines(); + let mut reader = BufReader::new(stdout); let tx2 = tx.clone(); let req = request_id.to_string(); tokio::spawn(async move { - while let Ok(Some(line)) = lines.next_line().await { - let _ = tx2 - .send(LogItem { - request_id: req.clone(), - event: Some(Event::Log(LogChunk { - line, - stderr: false, - })), - }) - .await; + loop { + let mut buf = Vec::with_capacity(256); + match reader.read_until(b'\n', &mut buf).await { + Ok(0) => break, + Ok(_) => { + let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); + let _ = tx2 + .send(LogItem { + request_id: req.clone(), + event: Some(Event::Log(LogChunk { + line, + stderr: false, + })), + }) + .await; + } + Err(e) => { + let _ = tx2 + .send(LogItem { + request_id: req.clone(), + event: Some(Event::Log(LogChunk { + line: format!("[runner] error reading stdout: {e}"), + stderr: true, + })), + }) + .await; + break; + } + } } }); } if let Some(stderr) = child.stderr.take() { - let mut lines = BufReader::new(stderr).lines(); + let mut reader = BufReader::new(stderr); let tx2 = tx.clone(); let req = request_id.to_string(); tokio::spawn(async move { - while let Ok(Some(line)) = lines.next_line().await { - let _ = tx2 - .send(LogItem { - request_id: req.clone(), - event: Some(Event::Log(LogChunk { line, stderr: true })), - }) - .await; + loop { + let mut buf = Vec::with_capacity(256); + match reader.read_until(b'\n', &mut buf).await { + Ok(0) => break, + Ok(_) => { + let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); + let _ = tx2 + .send(LogItem { + request_id: req.clone(), + event: Some(Event::Log(LogChunk { line, stderr: true })), + }) + .await; + } + Err(e) => { + let _ = tx2 + .send(LogItem { + request_id: req.clone(), + event: Some(Event::Log(LogChunk { + line: format!("[runner] error reading stderr: {e}"), + stderr: true, + })), + }) + .await; + break; + } + } } }); } @@ -151,28 +295,34 @@ async fn main() -> Result<()> { // Setup gRPC streaming if orchestrator address and request id are provided let orch_addr = std::env::var("SOLSTICE_ORCH_ADDR").ok(); - let request_id = std::env::var("SOLSTICE_REQUEST_ID").ok(); + let request_id_env = std::env::var("SOLSTICE_REQUEST_ID").ok(); + // Use provided request id or empty string (server will warn if invalid) + let request_id_effective = request_id_env.unwrap_or_default(); let mut tx_opt: Option> = None; - if let (Some(addr), Some(req_id)) = (orch_addr.clone(), request_id.clone()) { - let (tx, rx) = mpsc::channel::(256); + let mut stream_handle: Option> = None; + if let Some(addr) = orch_addr.clone() { + let (tx, rx) = mpsc::channel::(512); let stream = ReceiverStream::new(rx); - // Spawn client task - tokio::spawn(async move { + // Spawn client task and keep a handle so we can await graceful flush on shutdown. + let handle = tokio::spawn(async move { match RunnerClient::connect(format!("http://{addr}")).await { Ok(mut client) => { - let _ = client.stream_logs(stream).await; // ignore result + if let Err(e) = client.stream_logs(stream).await { + warn!(error = %e, "log stream to orchestrator terminated with error"); + } } Err(e) => { warn!(error = %e, "failed to connect to orchestrator gRPC; logs will not be streamed"); } } }); + stream_handle = Some(handle); tx_opt = Some(tx); // Send a first line if let Some(ref tx) = tx_opt { let _ = tx .send(LogItem { - request_id: req_id.clone(), + request_id: request_id_effective.clone(), event: Some(Event::Log(LogChunk { line: format!("runner starting: repo={repo} sha={sha}"), stderr: false, @@ -182,19 +332,33 @@ async fn main() -> Result<()> { } } - ensure_repo(&repo, &sha, &workdir).await?; - let code = run_job_script_streamed( - &workdir, - tx_opt.clone(), - request_id.as_deref().unwrap_or(""), - ) - .await?; + let code = match ensure_repo(&repo, &sha, &workdir).await { + Ok(_) => { + // proceed to run job script + run_job_script_streamed(&workdir, tx_opt.clone(), &request_id_effective).await? + } + Err(e) => { + // Report checkout failure to orchestrator and return non-zero + if let Some(tx) = tx_opt.clone() { + let _ = tx + .send(LogItem { + request_id: request_id_effective.clone(), + event: Some(Event::Log(LogChunk { + line: format!("[runner] failed to prepare repo: {e}"), + stderr: true, + })), + }) + .await; + } + 1 + } + }; // Send JobEnd if streaming enabled - if let (Some(tx), Some(req_id)) = (tx_opt.clone(), request_id.clone()) { + if let Some(tx) = tx_opt.clone() { let _ = tx .send(LogItem { - request_id: req_id.clone(), + request_id: request_id_effective.clone(), event: Some(Event::End(JobEnd { exit_code: code, success: code == 0, @@ -203,8 +367,14 @@ async fn main() -> Result<()> { })), }) .await; - // Give the client task a brief moment to flush - tokio::time::sleep(std::time::Duration::from_millis(50)).await; + // Drop the last sender to close the client stream and allow server to flush + drop(tx); + tx_opt = None; + // Give the client task a brief moment to flush and then await it + tokio::time::sleep(std::time::Duration::from_millis(150)).await; + if let Some(h) = stream_handle { + let _ = h.await; + } } if code != 0 {