use clap::Parser; use miette::{IntoDiagnostic as _, Result}; use serde::Deserialize; use tokio::{fs, process::Command, io::{AsyncBufReadExt, BufReader}}; use std::process::Stdio; use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; use tokio::sync::mpsc; use common::runner::v1::{runner_client::RunnerClient, log_item::Event, LogItem, LogChunk, JobEnd}; #[derive(Parser, Debug)] #[command(name = "solstice-runner", version, about = "Solstice CI Workflow Runner (VM agent)")] struct Opts { /// Optional path to workflow KDL file (for local testing only) #[arg(long, env = "SOL_WORKFLOW_PATH")] workflow: Option, } #[derive(Debug, Deserialize)] struct JobFile { repo_url: String, commit_sha: String, } async fn read_job_file() -> Result { let path = std::env::var("SOLSTICE_JOB_FILE").unwrap_or_else(|_| "/etc/solstice/job.yaml".into()); let bytes = fs::read(&path).await.into_diagnostic()?; let jf: JobFile = serde_yaml::from_slice(&bytes).into_diagnostic()?; Ok(jf) } async fn run_shell(cmd: &str) -> Result { info!(%cmd, "exec"); let status = Command::new("/bin/sh").arg("-lc").arg(cmd).status().await.into_diagnostic()?; Ok(status.code().unwrap_or(1)) } async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { fs::create_dir_all(workdir).await.into_diagnostic()?; // Use system git to avoid libgit2 cross issues let cmds = vec![ format!("cd {workdir} && git init"), format!("cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo}"), format!("cd {workdir} && git fetch --depth=1 origin {sha}"), format!("cd {workdir} && git checkout -q FETCH_HEAD"), ]; for c in cmds { let _ = run_shell(&c).await?; } Ok(()) } async fn run_job_script_streamed(workdir: &str, tx: Option>, request_id: &str) -> Result { let script = format!("{}/.solstice/job.sh", workdir); if !fs::try_exists(&script).await.into_diagnostic()? { warn!(path = %script, "job script not found"); return Ok(0); } let _ = run_shell(&format!("chmod +x {} || true", script)).await?; let mut cmd = Command::new("/bin/sh"); cmd.arg("-lc").arg(format!("cd {workdir} && {}", script)) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let mut child = cmd.spawn().into_diagnostic()?; if let Some(tx) = tx.clone() { if let Some(stdout) = child.stdout.take() { let mut lines = BufReader::new(stdout).lines(); let tx2 = tx.clone(); let req = request_id.to_string(); tokio::spawn(async move { while let Ok(Some(line)) = lines.next_line().await { let _ = tx2.send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line, stderr: false })) }).await; } }); } if let Some(stderr) = child.stderr.take() { let mut lines = BufReader::new(stderr).lines(); let tx2 = tx.clone(); let req = request_id.to_string(); tokio::spawn(async move { while let Ok(Some(line)) = lines.next_line().await { let _ = tx2.send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line, stderr: true })) }).await; } }); } } else { // If no streaming, still attach to child I/O to avoid blocking let _ = child.stdout.take(); let _ = child.stderr.take(); } let status = child.wait().await.into_diagnostic()?; Ok(status.code().unwrap_or(1)) } #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let _t = common::init_tracing("solstice-workflow-runner")?; let _opts = Opts::parse(); // Try env overrides first for robustness let repo = std::env::var("SOLSTICE_REPO_URL").ok(); let sha = std::env::var("SOLSTICE_COMMIT_SHA").ok(); let (repo, sha) = match (repo, sha) { (Some(r), Some(s)) => (r, s), _ => { let jf = read_job_file().await?; (jf.repo_url, jf.commit_sha) } }; info!(%repo, %sha, "runner starting"); let workdir = std::env::var("SOLSTICE_WORKDIR").unwrap_or_else(|_| "/root/work".into()); // Setup gRPC streaming if orchestrator address and request id are provided let orch_addr = std::env::var("SOLSTICE_ORCH_ADDR").ok(); let request_id = std::env::var("SOLSTICE_REQUEST_ID").ok(); let mut tx_opt: Option> = None; if let (Some(addr), Some(req_id)) = (orch_addr.clone(), request_id.clone()) { let (tx, rx) = mpsc::channel::(256); let stream = ReceiverStream::new(rx); // Spawn client task tokio::spawn(async move { match RunnerClient::connect(format!("http://{addr}" )).await { Ok(mut client) => { let _ = client.stream_logs(stream).await; // ignore result } Err(e) => { warn!(error = %e, "failed to connect to orchestrator gRPC; logs will not be streamed"); } } }); tx_opt = Some(tx); // Send a first line if let Some(ref tx) = tx_opt { let _ = tx.send(LogItem { request_id: req_id.clone(), event: Some(Event::Log(LogChunk { line: format!("runner starting: repo={repo} sha={sha}"), stderr: false })) }).await; } } ensure_repo(&repo, &sha, &workdir).await?; let code = run_job_script_streamed(&workdir, tx_opt.clone(), request_id.as_deref().unwrap_or("")).await?; // Send JobEnd if streaming enabled if let (Some(tx), Some(req_id)) = (tx_opt.clone(), request_id.clone()) { let _ = tx.send(LogItem { request_id: req_id.clone(), event: Some(Event::End(JobEnd { exit_code: code, success: code == 0, repo_url: repo.clone(), commit_sha: sha.clone() })) }).await; // Give the client task a brief moment to flush tokio::time::sleep(std::time::Duration::from_millis(50)).await; } if code != 0 { error!(exit_code = code, "job script failed"); std::process::exit(code); } info!("job complete"); Ok(()) }