mirror of
https://codeberg.org/Toasterson/solstice-ci.git
synced 2026-04-10 13:20:41 +00:00
1068 lines
34 KiB
Rust
1068 lines
34 KiB
Rust
use clap::Parser;
|
|
use miette::{IntoDiagnostic as _, Result};
|
|
use serde::Deserialize;
|
|
use std::collections::VecDeque;
|
|
use std::process::Stdio;
|
|
use std::sync::{Arc, Mutex};
|
|
use tokio::{
|
|
fs,
|
|
io::{AsyncBufReadExt, BufReader},
|
|
process::Command,
|
|
};
|
|
use tracing::{error, info, warn};
|
|
|
|
fn ndjson_line(category: &str, level: &str, msg: &str, extra: Option<serde_json::Value>) -> String {
|
|
let mut obj = serde_json::json!({
|
|
"category": category,
|
|
"level": level,
|
|
"msg": msg,
|
|
});
|
|
if let Some(ext) = extra {
|
|
if let Some(map) = obj.as_object_mut() {
|
|
if let Some(eo) = ext.as_object() {
|
|
for (k, v) in eo.iter() {
|
|
map.insert(k.clone(), v.clone());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
obj.to_string()
|
|
}
|
|
|
|
#[derive(Parser, Debug)]
|
|
#[command(
|
|
name = "solstice-runner",
|
|
version,
|
|
about = "Solstice CI Workflow Runner (VM agent)"
|
|
)]
|
|
struct Opts {
|
|
/// Optional path to workflow KDL file (for local testing only)
|
|
#[arg(long, env = "SOL_WORKFLOW_PATH")]
|
|
workflow: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
struct JobFile {
|
|
repo_url: String,
|
|
commit_sha: String,
|
|
#[serde(default)]
|
|
workflow_job_id: Option<String>,
|
|
#[serde(default)]
|
|
script_path: Option<String>,
|
|
#[serde(default)]
|
|
group_id: Option<String>,
|
|
}
|
|
|
|
async fn read_job_file() -> Result<JobFile> {
|
|
let path =
|
|
std::env::var("SOLSTICE_JOB_FILE").unwrap_or_else(|_| "/etc/solstice/job.yaml".into());
|
|
let bytes = fs::read(&path).await.into_diagnostic()?;
|
|
let jf: JobFile = serde_yaml::from_slice(&bytes).into_diagnostic()?;
|
|
Ok(jf)
|
|
}
|
|
|
|
async fn run_shell(cmd: &str) -> Result<i32> {
|
|
info!(%cmd, "exec");
|
|
let status = Command::new("/bin/sh")
|
|
.arg("-lc")
|
|
.arg(cmd)
|
|
.status()
|
|
.await
|
|
.into_diagnostic()?;
|
|
let code = status.code().unwrap_or(1);
|
|
if code != 0 {
|
|
return Err(miette::miette!("command failed ({code}): {cmd}"));
|
|
}
|
|
Ok(code)
|
|
}
|
|
|
|
fn to_https_if_codeberg(repo: &str) -> String {
|
|
// Convert SSH Codeberg URL to HTTPS for anonymous fetches
|
|
if let Some(rest) = repo.strip_prefix("ssh://git@codeberg.org/") {
|
|
return format!("https://codeberg.org/{rest}");
|
|
}
|
|
repo.to_string()
|
|
}
|
|
|
|
async fn has_cmd(name: &str) -> bool {
|
|
tokio::process::Command::new("/bin/sh")
|
|
.arg("-lc")
|
|
.arg(format!("command -v {} >/dev/null 2>&1", name))
|
|
.status()
|
|
.await
|
|
.ok()
|
|
.and_then(|s| s.code())
|
|
.map(|c| c == 0)
|
|
.unwrap_or(false)
|
|
}
|
|
|
|
async fn check_writable(dir: &str) -> bool {
|
|
if fs::create_dir_all(dir).await.is_err() {
|
|
return false;
|
|
}
|
|
let test_path = format!("{}/.solstice-writecheck", dir.trim_end_matches('/'));
|
|
match fs::write(&test_path, b"ok").await {
|
|
Ok(_) => {
|
|
let _ = fs::remove_file(&test_path).await;
|
|
true
|
|
}
|
|
Err(_) => false,
|
|
}
|
|
}
|
|
|
|
fn parse_repo_host_port(repo: &str) -> Option<(String, u16)> {
|
|
let r = repo.trim();
|
|
if let Some(rest) = r.strip_prefix("https://") {
|
|
let host = rest
|
|
.split('/')
|
|
.next()?
|
|
.split('@')
|
|
.last()?
|
|
.split(':')
|
|
.next()?; // ignore embedded user
|
|
return Some((host.to_string(), 443));
|
|
}
|
|
if let Some(rest) = r.strip_prefix("http://") {
|
|
let host = rest
|
|
.split('/')
|
|
.next()?
|
|
.split('@')
|
|
.last()?
|
|
.split(':')
|
|
.next()?;
|
|
return Some((host.to_string(), 80));
|
|
}
|
|
if let Some(rest) = r.strip_prefix("ssh://") {
|
|
// ssh://[user@]host/owner/repo
|
|
let first = rest.split('/').next()?;
|
|
let host = first.split('@').last()?.split(':').next()?;
|
|
return Some((host.to_string(), 22));
|
|
}
|
|
// scp-like: user@host:owner/repo.git
|
|
if let Some(at) = r.find('@') {
|
|
if let Some(colon) = r[at + 1..].find(':') {
|
|
// ensure host:
|
|
let host = &r[at + 1..at + 1 + colon];
|
|
if !host.is_empty() {
|
|
return Some((host.to_string(), 22));
|
|
}
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
async fn check_network_connect(host: &str, port: u16, timeout_ms: u64) -> bool {
|
|
use tokio::time::{Duration, timeout};
|
|
match timeout(
|
|
Duration::from_millis(timeout_ms),
|
|
tokio::net::TcpStream::connect((host, port)),
|
|
)
|
|
.await
|
|
{
|
|
Ok(Ok(_stream)) => true,
|
|
_ => false,
|
|
}
|
|
}
|
|
|
|
async fn preflight(repo: &str, workdir: &str) -> Result<()> {
|
|
// Tool availability
|
|
let has_git = has_cmd("git").await;
|
|
let has_curl = has_cmd("curl").await;
|
|
let has_wget = has_cmd("wget").await;
|
|
let has_tar = has_cmd("tar").await;
|
|
let has_gtar = has_cmd("gtar").await;
|
|
for (tool, ok) in [
|
|
("git", has_git),
|
|
("curl", has_curl),
|
|
("wget", has_wget),
|
|
("tar", has_tar),
|
|
("gtar", has_gtar),
|
|
] {
|
|
let lvl = if ok { "info" } else { "warn" };
|
|
let msg = if ok {
|
|
format!("tool {tool}: available")
|
|
} else {
|
|
format!("tool {tool}: missing")
|
|
};
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"tool_check",
|
|
lvl,
|
|
&msg,
|
|
Some(serde_json::json!({"available": ok, "tool": tool}))
|
|
)
|
|
);
|
|
}
|
|
let can_clone = has_git || ((has_tar || has_gtar) && (has_curl || has_wget));
|
|
let lvl = if can_clone { "info" } else { "error" };
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"env_setup",
|
|
lvl,
|
|
"clone capability",
|
|
Some(serde_json::json!({
|
|
"git": has_git,
|
|
"tar": has_tar,
|
|
"curl": has_curl,
|
|
"wget": has_wget,
|
|
"can_clone": can_clone
|
|
}))
|
|
)
|
|
);
|
|
if !can_clone {
|
|
return Err(miette::miette!(
|
|
"no available method to fetch repository: need git or (tar and (curl|wget))"
|
|
));
|
|
}
|
|
|
|
// Workdir writability
|
|
let writable = check_writable(workdir).await;
|
|
let lvl = if writable { "info" } else { "error" };
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"env_setup",
|
|
lvl,
|
|
"workdir writable",
|
|
Some(serde_json::json!({"path": workdir, "writable": writable}))
|
|
)
|
|
);
|
|
if !writable {
|
|
return Err(miette::miette!("workdir is not writable: {}", workdir));
|
|
}
|
|
|
|
// Network reachability (best-effort)
|
|
if let Some((host, port)) = parse_repo_host_port(repo) {
|
|
let ok = check_network_connect(&host, port, 2000).await;
|
|
let lvl = if ok { "info" } else { "warn" };
|
|
let status_msg = if ok { "reachable" } else { "unreachable" };
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"env",
|
|
lvl,
|
|
&format!("network: {}:{} {}", host, port, status_msg),
|
|
Some(serde_json::json!({"host": host, "port": port, "reachable": ok}))
|
|
)
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> Result<()> {
|
|
// Announce chosen method
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"env_setup",
|
|
"info",
|
|
"fetch via http archive",
|
|
Some(serde_json::json!({
|
|
"url": format!("{}/archive/{}.tar.gz", repo_https.trim_end_matches('.').trim_end_matches(".git"), sha)
|
|
}))
|
|
)
|
|
);
|
|
// Gitea/Codeberg archive URL pattern: https://codeberg.org/<owner>/<repo>/archive/<sha>.tar.gz
|
|
let base = repo_https.trim_end_matches('.').trim_end_matches(".git");
|
|
let url = format!("{}/archive/{}.tar.gz", base, sha);
|
|
|
|
// Prefer GNU tar (gtar) when available (illumos' tar is not compatible with -z/--strip-components)
|
|
let tar_bin = if has_cmd("gtar").await { "gtar" } else { "tar" };
|
|
|
|
// Check if we should allow insecure TLS (last resort)
|
|
let insecure = std::env::var("SOLSTICE_ALLOW_INSECURE")
|
|
.ok()
|
|
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
|
|
.unwrap_or(false);
|
|
let curl_flags = if insecure { "-fSLk" } else { "-fSL" };
|
|
|
|
// Try curl | tar, then wget | tar
|
|
let cmd_curl = format!(
|
|
"mkdir -p {workdir} && curl {curl_flags} {url} | {tar_bin} -xz -C {workdir} --strip-components=1"
|
|
);
|
|
if run_shell(&cmd_curl).await.is_ok() {
|
|
return Ok(());
|
|
}
|
|
let cmd_wget = format!(
|
|
"mkdir -p {workdir} && wget -qO- {url} | {tar_bin} -xz -C {workdir} --strip-components=1"
|
|
);
|
|
if run_shell(&cmd_wget).await.is_ok() {
|
|
return Ok(());
|
|
}
|
|
|
|
// On illumos/SunOS images, curl/wget may be missing or CA bundle absent. Try to install tools and CA certs, then retry.
|
|
let os = std::env::var("SOLSTICE_OS_OVERRIDE")
|
|
.ok()
|
|
.unwrap_or_else(|| {
|
|
// Best-effort OS detection
|
|
std::env::consts::OS.to_string()
|
|
});
|
|
// Prefer uname if available
|
|
let uname = Command::new("/bin/sh")
|
|
.arg("-lc")
|
|
.arg("uname -s 2>/dev/null || echo unknown")
|
|
.output()
|
|
.await
|
|
.ok()
|
|
.and_then(|o| String::from_utf8(o.stdout).ok())
|
|
.unwrap_or_default();
|
|
let is_sunos = uname.trim() == "SunOS" || os == "solaris";
|
|
if is_sunos {
|
|
// Try IPS (pkg) first, then pkgin
|
|
let _ = run_shell("sudo pkg refresh || true").await;
|
|
// curl
|
|
if run_shell("sudo pkg install -v web/curl").await.is_err() {
|
|
let _ = run_shell("sudo pkgin -y install curl").await;
|
|
}
|
|
// CA certificates (package name may differ per distro)
|
|
let _ = run_shell("sudo pkg install -v web/ca-certificates || sudo pkg install -v library/security/ca-certificates || true").await;
|
|
let _ = run_shell("sudo pkgin -y install mozilla-rootcerts || true").await;
|
|
let _ = run_shell("sudo mozilla-rootcerts install || true").await;
|
|
|
|
// Retry with curl and wget
|
|
if run_shell(&cmd_curl).await.is_ok() {
|
|
return Ok(());
|
|
}
|
|
if run_shell(&cmd_wget).await.is_ok() {
|
|
return Ok(());
|
|
}
|
|
|
|
// As a last resort with explicit opt-in, try curl --insecure
|
|
if insecure {
|
|
let cmd_curl_insecure = format!(
|
|
"mkdir -p {workdir} && curl -fSLk {url} | {tar_bin} -xz -C {workdir} --strip-components=1"
|
|
);
|
|
if run_shell(&cmd_curl_insecure).await.is_ok() {
|
|
warn!("used curl --insecure to fetch repo archive on SunOS");
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
|
|
Err(miette::miette!(
|
|
"failed to fetch repo archive via HTTP for {url}"
|
|
))
|
|
}
|
|
|
|
fn is_hex(s: &str) -> bool {
|
|
s.chars().all(|c| c.is_ascii_hexdigit())
|
|
}
|
|
|
|
async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> {
|
|
fs::create_dir_all(workdir).await.into_diagnostic()?;
|
|
let repo_eff = to_https_if_codeberg(repo);
|
|
|
|
// Prefer git when available; fall back to archive download when git is missing or fetch fails.
|
|
if has_cmd("git").await {
|
|
// Determine object format: if SHA looks like 64-hex, assume sha256 object format
|
|
let sha_len = sha.len();
|
|
let is_sha256 = sha_len == 64 && is_hex(sha);
|
|
let obj_fmt = if is_sha256 { Some("sha256") } else { None };
|
|
// Emit NDJSON about chosen method
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"env_setup",
|
|
"info",
|
|
&format!(
|
|
"fetch via git (object-format={})",
|
|
obj_fmt.unwrap_or("sha1")
|
|
),
|
|
Some(serde_json::json!({
|
|
"method": "git",
|
|
"object_format": obj_fmt.unwrap_or("sha1"),
|
|
"sha_len": sha_len,
|
|
"detached": true
|
|
}))
|
|
)
|
|
);
|
|
|
|
let cmds = vec![
|
|
// Re-initialize repository to ensure correct object format
|
|
format!("cd {workdir} && rm -rf .git || true"),
|
|
if let Some(fmt) = obj_fmt {
|
|
format!("cd {workdir} && git init --object-format={fmt}")
|
|
} else {
|
|
format!("cd {workdir} && git init")
|
|
},
|
|
format!(
|
|
"cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo_eff}"
|
|
),
|
|
// Use protocol v2 features when available and keep it light
|
|
format!(
|
|
"cd {workdir} && git -c protocol.version=2 fetch --filter=blob:none --depth=1 --no-tags origin {sha}"
|
|
),
|
|
// Checkout the requested commit in detached HEAD
|
|
format!("cd {workdir} && git checkout -q --detach {sha}"),
|
|
];
|
|
for c in cmds {
|
|
match run_shell(&c).await {
|
|
Ok(_) => {}
|
|
Err(e) => {
|
|
// Try archive fallback once on any git failure
|
|
warn!(error = %e, "git path failed; attempting archive fallback");
|
|
return fetch_repo_via_archive(&repo_eff, sha, workdir).await;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
} else {
|
|
fetch_repo_via_archive(&repo_eff, sha, workdir).await
|
|
}
|
|
}
|
|
|
|
async fn run_job_script(workdir: &str, script_override: Option<&str>) -> Result<i32> {
|
|
// Determine the script to execute: prefer override from job.yaml, else default .solstice/job.sh
|
|
let script = if let Some(path) = script_override {
|
|
if path.starts_with('/') {
|
|
path.to_string()
|
|
} else {
|
|
format!("{}/{}", workdir, path.trim_start_matches("./"))
|
|
}
|
|
} else {
|
|
format!("{}/.solstice/job.sh", workdir)
|
|
};
|
|
if !fs::try_exists(&script).await.into_diagnostic()? {
|
|
warn!(path = %script, "job script not found");
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"job_run",
|
|
"error",
|
|
&format!("job script not found at {}", script),
|
|
None
|
|
)
|
|
);
|
|
return Ok(1);
|
|
}
|
|
// Emit explicit pre-exec line to aid diagnostics
|
|
println!(
|
|
"{}",
|
|
ndjson_line("job_run", "info", &format!("executing {}", script), None)
|
|
);
|
|
let _ = run_shell(&format!("chmod +x {} || true", script)).await?;
|
|
|
|
let mut cmd = Command::new("/bin/sh");
|
|
cmd.arg("-lc")
|
|
.arg(format!("set -ex; cd {workdir}; {}", script))
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped());
|
|
let mut child = cmd.spawn().into_diagnostic()?;
|
|
|
|
// Buffer the last N stderr lines for failure summary
|
|
let last_err: Arc<Mutex<VecDeque<String>>> = Arc::new(Mutex::new(VecDeque::with_capacity(20)));
|
|
|
|
// Attach readers to child stdout/stderr so logs stream as NDJSON categorized under job_run
|
|
if let Some(stdout) = child.stdout.take() {
|
|
let mut reader = BufReader::new(stdout);
|
|
tokio::spawn(async move {
|
|
loop {
|
|
let mut buf = Vec::with_capacity(256);
|
|
match reader.read_until(b'\n', &mut buf).await {
|
|
Ok(0) => break,
|
|
Ok(_) => {
|
|
let line = String::from_utf8_lossy(&buf)
|
|
.trim_end_matches(['\n', '\r'])
|
|
.to_string();
|
|
println!("{}", ndjson_line("job_run", "info", &line, None));
|
|
}
|
|
Err(e) => {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"job_run",
|
|
"error",
|
|
&format!("error reading stdout: {}", e),
|
|
None
|
|
)
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
if let Some(stderr) = child.stderr.take() {
|
|
let mut reader = BufReader::new(stderr);
|
|
let last_err2 = last_err.clone();
|
|
tokio::spawn(async move {
|
|
loop {
|
|
let mut buf = Vec::with_capacity(256);
|
|
match reader.read_until(b'\n', &mut buf).await {
|
|
Ok(0) => break,
|
|
Ok(_) => {
|
|
let line = String::from_utf8_lossy(&buf)
|
|
.trim_end_matches(['\n', '\r'])
|
|
.to_string();
|
|
eprintln!("{}", ndjson_line("job_run", "error", &line, None));
|
|
if let Ok(mut dq) = last_err2.lock() {
|
|
if dq.len() == 20 {
|
|
dq.pop_front();
|
|
}
|
|
dq.push_back(line);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"job_run",
|
|
"error",
|
|
&format!("error reading stderr: {}", e),
|
|
None
|
|
)
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
let status = child.wait().await.into_diagnostic()?;
|
|
let code = status.code().unwrap_or(1);
|
|
|
|
if code != 0 {
|
|
// Emit a concise failure summary (structured)
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"job_run",
|
|
"error",
|
|
&format!("job script exited with code {}", code),
|
|
None
|
|
)
|
|
);
|
|
|
|
// Include recent stderr lines for context (structured)
|
|
let lines: Vec<String> = last_err
|
|
.lock()
|
|
.ok()
|
|
.map(|dq| dq.iter().cloned().collect())
|
|
.unwrap_or_default();
|
|
if lines.is_empty() {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"job_run",
|
|
"warn",
|
|
"no stderr lines were captured from the script",
|
|
None
|
|
)
|
|
);
|
|
} else {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line("job_run", "info", "recent stderr lines follow", None)
|
|
);
|
|
for l in lines {
|
|
eprintln!("{}", ndjson_line("job_run", "error", &l, None));
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(code)
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct WorkflowStep {
|
|
name: String,
|
|
run: String,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct WorkflowJob {
|
|
setup: Option<String>,
|
|
steps: Vec<WorkflowStep>,
|
|
}
|
|
|
|
fn capture_attr(line: &str, key: &str) -> Option<String> {
|
|
let pattern1 = format!("{}=\"", key);
|
|
if let Some(start) = line.find(&pattern1) {
|
|
let rest = &line[start + pattern1.len()..];
|
|
if let Some(end) = rest.find('"') {
|
|
return Some(rest[..end].to_string());
|
|
}
|
|
}
|
|
let pattern2 = format!("{}='", key);
|
|
if let Some(start) = line.find(&pattern2) {
|
|
let rest = &line[start + pattern2.len()..];
|
|
if let Some(end) = rest.find('\'') {
|
|
return Some(rest[..end].to_string());
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
fn parse_workflow_for_job(kdl: &str, wanted_job: Option<&str>) -> Option<WorkflowJob> {
|
|
let mut lines = kdl.lines().peekable();
|
|
while let Some(line) = lines.next() {
|
|
let l = line.trim();
|
|
if l.starts_with("job ") && l.contains("id=") {
|
|
let id = capture_attr(l, "id");
|
|
let mut depth = if l.ends_with('{') { 1 } else { 0 };
|
|
let mut steps: Vec<WorkflowStep> = Vec::new();
|
|
let mut setup: Option<String> = None;
|
|
// If this job is the one we want (or no preference and it's the first job), collect its setup and steps
|
|
let take_this = match (wanted_job, id.as_deref()) {
|
|
(Some(w), Some(i)) => w == i,
|
|
(None, Some(_)) => true,
|
|
_ => false,
|
|
};
|
|
while let Some(peek) = lines.peek() {
|
|
let t = peek.trim();
|
|
if t.ends_with('{') {
|
|
depth += 1;
|
|
}
|
|
if t.starts_with('}') {
|
|
if depth == 0 {
|
|
break;
|
|
}
|
|
depth -= 1;
|
|
if depth == 0 {
|
|
lines.next();
|
|
break;
|
|
}
|
|
}
|
|
if take_this {
|
|
if setup.is_none() && t.starts_with("setup ") && t.contains("path=") {
|
|
if let Some(p) = capture_attr(t, "path") {
|
|
setup = Some(p);
|
|
}
|
|
}
|
|
if t.starts_with("step ") && t.contains("run=") {
|
|
let name = capture_attr(t, "name").unwrap_or_else(|| "unnamed".into());
|
|
if let Some(run) = capture_attr(t, "run") {
|
|
steps.push(WorkflowStep { name, run });
|
|
}
|
|
}
|
|
}
|
|
lines.next();
|
|
}
|
|
if take_this {
|
|
return Some(WorkflowJob { setup, steps });
|
|
}
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
fn slugify_step_name(name: &str) -> String {
|
|
// Lowercase, replace non-alphanumeric with '-', collapse dashes, trim.
|
|
let mut out = String::with_capacity(name.len());
|
|
let mut prev_dash = false;
|
|
for ch in name.chars() {
|
|
let c = ch.to_ascii_lowercase();
|
|
if c.is_ascii_alphanumeric() {
|
|
out.push(c);
|
|
prev_dash = false;
|
|
} else if !prev_dash {
|
|
out.push('-');
|
|
prev_dash = true;
|
|
}
|
|
}
|
|
// trim leading/trailing '-'
|
|
let s = out.trim_matches('-').to_string();
|
|
if s.is_empty() { "step".to_string() } else { s }
|
|
}
|
|
|
|
async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) -> Result<i32> {
|
|
// Derive per-step category from the step name (slugified)
|
|
let step_slug = slugify_step_name(&step.name);
|
|
let step_category = format!("step:{}", step_slug);
|
|
|
|
// Announce step start in the step-specific category
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
&step_category,
|
|
"info",
|
|
&format!("starting step: {}", step.name),
|
|
Some(
|
|
serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total})
|
|
)
|
|
)
|
|
);
|
|
|
|
// Build command and spawn
|
|
let mut cmd = Command::new("/bin/sh");
|
|
cmd.arg("-lc")
|
|
.arg(format!("set -e; cd {workdir}; {}", step.run))
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped());
|
|
let mut child = cmd.spawn().into_diagnostic()?;
|
|
|
|
// Stream output with step fields; use per-step category for both stdout and stderr
|
|
let extra =
|
|
serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total});
|
|
let step_cat_clone_out = step_category.clone();
|
|
if let Some(stdout) = child.stdout.take() {
|
|
let mut reader = BufReader::new(stdout);
|
|
let extra_out = extra.clone();
|
|
tokio::spawn(async move {
|
|
let cat = step_cat_clone_out;
|
|
loop {
|
|
let mut buf = Vec::with_capacity(256);
|
|
match reader.read_until(b'\n', &mut buf).await {
|
|
Ok(0) => break,
|
|
Ok(_) => {
|
|
let line = String::from_utf8_lossy(&buf)
|
|
.trim_end_matches(['\n', '\r'])
|
|
.to_string();
|
|
println!(
|
|
"{}",
|
|
ndjson_line(&cat, "info", &line, Some(extra_out.clone()))
|
|
);
|
|
}
|
|
Err(e) => {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
&cat,
|
|
"error",
|
|
&format!("error reading stdout: {}", e),
|
|
Some(extra_out.clone())
|
|
)
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
if let Some(stderr) = child.stderr.take() {
|
|
let mut reader = BufReader::new(stderr);
|
|
let extra_err = extra.clone();
|
|
let step_cat_clone_err = step_category.clone();
|
|
tokio::spawn(async move {
|
|
let cat = step_cat_clone_err;
|
|
loop {
|
|
let mut buf = Vec::with_capacity(256);
|
|
match reader.read_until(b'\n', &mut buf).await {
|
|
Ok(0) => break,
|
|
Ok(_) => {
|
|
let line = String::from_utf8_lossy(&buf)
|
|
.trim_end_matches(['\n', '\r'])
|
|
.to_string();
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(&cat, "error", &line, Some(extra_err.clone()))
|
|
);
|
|
}
|
|
Err(e) => {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
&cat,
|
|
"error",
|
|
&format!("error reading stderr: {}", e),
|
|
Some(extra_err.clone())
|
|
)
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
let status = child.wait().await.into_diagnostic()?;
|
|
let code = status.code().unwrap_or(1);
|
|
if code != 0 {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
&step_category,
|
|
"error",
|
|
&format!("step failed: {} (exit {})", step.name, code),
|
|
Some(extra)
|
|
)
|
|
);
|
|
} else {
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
&step_category,
|
|
"info",
|
|
&format!("completed step: {}", step.name),
|
|
Some(
|
|
serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total, "exit_code": code})
|
|
)
|
|
)
|
|
);
|
|
}
|
|
Ok(code)
|
|
}
|
|
|
|
async fn run_workflow_if_present(workdir: &str) -> Result<Option<i32>> {
|
|
let path = format!("{}/.solstice/workflow.kdl", workdir);
|
|
if !fs::try_exists(&path).await.into_diagnostic()? {
|
|
return Ok(None);
|
|
}
|
|
let kdl = fs::read_to_string(&path).await.into_diagnostic()?;
|
|
// Determine selected job id from job.yaml
|
|
let jf = read_job_file().await.ok();
|
|
let job_id = jf.and_then(|j| j.workflow_job_id);
|
|
let job = match parse_workflow_for_job(&kdl, job_id.as_deref()) {
|
|
Some(j) => j,
|
|
None => return Ok(None),
|
|
};
|
|
|
|
// Run setup if present
|
|
if let Some(setup_path) = job.setup.as_deref() {
|
|
let code = run_setup_script(workdir, setup_path).await?;
|
|
if code != 0 {
|
|
return Ok(Some(code));
|
|
}
|
|
}
|
|
|
|
if job.steps.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
let total = job.steps.len();
|
|
for (i, step) in job.steps.iter().enumerate() {
|
|
let code = run_step(workdir, step, i + 1, total).await?;
|
|
if code != 0 {
|
|
return Ok(Some(code));
|
|
}
|
|
}
|
|
Ok(Some(0))
|
|
}
|
|
|
|
#[tokio::main(flavor = "multi_thread")]
|
|
async fn main() -> Result<()> {
|
|
let _t = common::init_tracing("solstice-workflow-runner")?;
|
|
let _opts = Opts::parse();
|
|
|
|
// Try env overrides first for robustness
|
|
let repo = std::env::var("SOLSTICE_REPO_URL").ok();
|
|
let sha = std::env::var("SOLSTICE_COMMIT_SHA").ok();
|
|
|
|
let (repo, sha) = match (repo, sha) {
|
|
(Some(r), Some(s)) => (r, s),
|
|
_ => {
|
|
let jf = read_job_file().await?;
|
|
(jf.repo_url, jf.commit_sha)
|
|
}
|
|
};
|
|
|
|
info!(%repo, %sha, "runner starting");
|
|
// Workdir selection: prefer explicit SOLSTICE_WORKDIR, otherwise default to "$HOME/work"
|
|
let workdir = std::env::var("SOLSTICE_WORKDIR")
|
|
.ok()
|
|
.or_else(|| {
|
|
std::env::var("HOME")
|
|
.ok()
|
|
.map(|home| format!("{}/work", home))
|
|
})
|
|
.unwrap_or_else(|| "/root/work".into());
|
|
|
|
// Emit startup environment and tool checks
|
|
let uname = Command::new("/bin/sh")
|
|
.arg("-lc")
|
|
.arg("uname -a || echo unknown")
|
|
.output()
|
|
.await
|
|
.ok()
|
|
.and_then(|o| String::from_utf8(o.stdout).ok())
|
|
.unwrap_or_else(|| "unknown".into());
|
|
let uname_trim = uname.trim().to_string();
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"env",
|
|
"info",
|
|
&format!("system: {}", uname_trim),
|
|
Some(serde_json::json!({"uname": uname_trim}))
|
|
)
|
|
);
|
|
|
|
// Preflight environment checks (tools, workdir, network)
|
|
if let Err(e) = preflight(&repo, &workdir).await {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"env_setup",
|
|
"error",
|
|
&format!("preflight failed: {}", e),
|
|
None
|
|
)
|
|
);
|
|
std::process::exit(1);
|
|
}
|
|
|
|
// Announce workspace
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"env_setup",
|
|
"info",
|
|
"workdir",
|
|
Some(serde_json::json!({"path": workdir}))
|
|
)
|
|
);
|
|
|
|
let code = match ensure_repo(&repo, &sha, &workdir).await {
|
|
Ok(_) => {
|
|
// Prefer workflow.kdl when present; otherwise run legacy script
|
|
match run_workflow_if_present(&workdir).await? {
|
|
Some(code) => code,
|
|
None => {
|
|
let jf = read_job_file().await.ok();
|
|
let script_override = jf.as_ref().and_then(|j| j.script_path.as_deref());
|
|
run_job_script(&workdir, script_override).await?
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"env_setup",
|
|
"error",
|
|
&format!("failed to prepare repo: {}", e),
|
|
None
|
|
)
|
|
);
|
|
1
|
|
}
|
|
};
|
|
|
|
if code != 0 {
|
|
error!(exit_code = code, "workflow failed");
|
|
std::process::exit(code);
|
|
}
|
|
|
|
info!("job complete");
|
|
Ok(())
|
|
}
|
|
|
|
// Execute a setup script before workflow steps. Similar to run_job_script but with different categories.
|
|
async fn run_setup_script(workdir: &str, setup_rel_or_abs: &str) -> Result<i32> {
|
|
// Resolve path
|
|
let script = if setup_rel_or_abs.starts_with('/') {
|
|
setup_rel_or_abs.to_string()
|
|
} else {
|
|
format!("{}/{}", workdir, setup_rel_or_abs.trim_start_matches("./"))
|
|
};
|
|
// Announce
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"setup",
|
|
"info",
|
|
&format!("executing setup script: {}", setup_rel_or_abs),
|
|
Some(serde_json::json!({"path": setup_rel_or_abs}))
|
|
)
|
|
);
|
|
|
|
if !fs::try_exists(&script).await.into_diagnostic()? {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"setup",
|
|
"error",
|
|
&format!("setup script not found at {}", script),
|
|
Some(serde_json::json!({"path": setup_rel_or_abs}))
|
|
)
|
|
);
|
|
return Ok(1);
|
|
}
|
|
|
|
let _ = run_shell(&format!("chmod +x {} || true", script)).await?;
|
|
|
|
let mut cmd = Command::new("/bin/sh");
|
|
cmd.arg("-lc")
|
|
.arg(format!("set -e; cd {workdir}; {}", script))
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped());
|
|
let mut child = cmd.spawn().into_diagnostic()?;
|
|
|
|
// Stream output as setup_run
|
|
if let Some(stdout) = child.stdout.take() {
|
|
let mut reader = BufReader::new(stdout);
|
|
tokio::spawn(async move {
|
|
loop {
|
|
let mut buf = Vec::with_capacity(256);
|
|
match reader.read_until(b'\n', &mut buf).await {
|
|
Ok(0) => break,
|
|
Ok(_) => {
|
|
let line = String::from_utf8_lossy(&buf)
|
|
.trim_end_matches(['\n', '\r'])
|
|
.to_string();
|
|
println!("{}", ndjson_line("setup_run", "info", &line, None));
|
|
}
|
|
Err(e) => {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"setup_run",
|
|
"error",
|
|
&format!("error reading stdout: {}", e),
|
|
None
|
|
)
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
if let Some(stderr) = child.stderr.take() {
|
|
let mut reader = BufReader::new(stderr);
|
|
tokio::spawn(async move {
|
|
loop {
|
|
let mut buf = Vec::with_capacity(256);
|
|
match reader.read_until(b'\n', &mut buf).await {
|
|
Ok(0) => break,
|
|
Ok(_) => {
|
|
let line = String::from_utf8_lossy(&buf)
|
|
.trim_end_matches(['\n', '\r'])
|
|
.to_string();
|
|
eprintln!("{}", ndjson_line("setup_run", "error", &line, None));
|
|
}
|
|
Err(e) => {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"setup_run",
|
|
"error",
|
|
&format!("error reading stderr: {}", e),
|
|
None
|
|
)
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
let status = child.wait().await.into_diagnostic()?;
|
|
let code = status.code().unwrap_or(1);
|
|
if code != 0 {
|
|
eprintln!(
|
|
"{}",
|
|
ndjson_line(
|
|
"setup",
|
|
"error",
|
|
&format!("setup script exited with code {}", code),
|
|
Some(serde_json::json!({"path": setup_rel_or_abs, "exit_code": code}))
|
|
)
|
|
);
|
|
} else {
|
|
println!(
|
|
"{}",
|
|
ndjson_line(
|
|
"setup",
|
|
"info",
|
|
&format!("completed setup: {}", setup_rel_or_abs),
|
|
Some(serde_json::json!({"exit_code": code}))
|
|
)
|
|
);
|
|
}
|
|
Ok(code)
|
|
}
|