2025-10-25 20:00:32 +02:00
|
|
|
use clap::Parser;
|
2025-11-01 14:56:46 +01:00
|
|
|
use common::runner::v1::{JobEnd, LogChunk, LogItem, log_item::Event, runner_client::RunnerClient};
|
2025-11-01 12:14:50 +01:00
|
|
|
use miette::{IntoDiagnostic as _, Result};
|
|
|
|
|
use serde::Deserialize;
|
|
|
|
|
use std::process::Stdio;
|
2025-11-01 14:56:46 +01:00
|
|
|
use tokio::sync::mpsc;
|
|
|
|
|
use tokio::{
|
|
|
|
|
fs,
|
|
|
|
|
io::{AsyncBufReadExt, BufReader},
|
|
|
|
|
process::Command,
|
|
|
|
|
};
|
2025-11-01 12:14:50 +01:00
|
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
|
|
|
use tracing::{error, info, warn};
|
2025-10-25 20:00:32 +02:00
|
|
|
|
|
|
|
|
#[derive(Parser, Debug)]
|
2025-11-01 14:56:46 +01:00
|
|
|
#[command(
|
|
|
|
|
name = "solstice-runner",
|
|
|
|
|
version,
|
|
|
|
|
about = "Solstice CI Workflow Runner (VM agent)"
|
|
|
|
|
)]
|
2025-10-25 20:00:32 +02:00
|
|
|
struct Opts {
|
2025-11-01 12:14:50 +01:00
|
|
|
/// Optional path to workflow KDL file (for local testing only)
|
2025-10-25 20:00:32 +02:00
|
|
|
#[arg(long, env = "SOL_WORKFLOW_PATH")]
|
2025-11-01 12:14:50 +01:00
|
|
|
workflow: Option<String>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
|
|
struct JobFile {
|
|
|
|
|
repo_url: String,
|
|
|
|
|
commit_sha: String,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn read_job_file() -> Result<JobFile> {
|
2025-11-01 14:56:46 +01:00
|
|
|
let path =
|
|
|
|
|
std::env::var("SOLSTICE_JOB_FILE").unwrap_or_else(|_| "/etc/solstice/job.yaml".into());
|
2025-11-01 12:14:50 +01:00
|
|
|
let bytes = fs::read(&path).await.into_diagnostic()?;
|
|
|
|
|
let jf: JobFile = serde_yaml::from_slice(&bytes).into_diagnostic()?;
|
|
|
|
|
Ok(jf)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn run_shell(cmd: &str) -> Result<i32> {
|
|
|
|
|
info!(%cmd, "exec");
|
2025-11-01 14:56:46 +01:00
|
|
|
let status = Command::new("/bin/sh")
|
|
|
|
|
.arg("-lc")
|
|
|
|
|
.arg(cmd)
|
|
|
|
|
.status()
|
|
|
|
|
.await
|
|
|
|
|
.into_diagnostic()?;
|
2025-11-02 20:36:13 +01:00
|
|
|
let code = status.code().unwrap_or(1);
|
|
|
|
|
if code != 0 {
|
|
|
|
|
return Err(miette::miette!("command failed ({code}): {cmd}"));
|
|
|
|
|
}
|
|
|
|
|
Ok(code)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn to_https_if_codeberg(repo: &str) -> String {
|
|
|
|
|
// Convert SSH Codeberg URL to HTTPS for anonymous fetches
|
|
|
|
|
if let Some(rest) = repo.strip_prefix("ssh://git@codeberg.org/") {
|
|
|
|
|
return format!("https://codeberg.org/{rest}");
|
|
|
|
|
}
|
|
|
|
|
repo.to_string()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn has_cmd(name: &str) -> bool {
|
|
|
|
|
tokio::process::Command::new("/bin/sh")
|
|
|
|
|
.arg("-lc")
|
|
|
|
|
.arg(format!("command -v {} >/dev/null 2>&1", name))
|
|
|
|
|
.status()
|
|
|
|
|
.await
|
|
|
|
|
.ok()
|
|
|
|
|
.and_then(|s| s.code())
|
|
|
|
|
.map(|c| c == 0)
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> Result<()> {
|
|
|
|
|
// Gitea/Codeberg archive URL pattern: https://codeberg.org/<owner>/<repo>/archive/<sha>.tar.gz
|
|
|
|
|
let base = repo_https.trim_end_matches('.').trim_end_matches(".git");
|
|
|
|
|
let url = format!("{}/archive/{}.tar.gz", base, sha);
|
|
|
|
|
|
2025-11-02 20:48:05 +01:00
|
|
|
// Check if we should allow insecure TLS (last resort)
|
|
|
|
|
let insecure = std::env::var("SOLSTICE_ALLOW_INSECURE").ok().map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false);
|
|
|
|
|
let curl_flags = if insecure { "-fSLk" } else { "-fSL" };
|
|
|
|
|
|
2025-11-02 20:36:13 +01:00
|
|
|
// Try curl | tar, then wget | tar
|
|
|
|
|
let cmd_curl = format!(
|
2025-11-02 20:48:05 +01:00
|
|
|
"mkdir -p {workdir} && curl {curl_flags} {url} | tar -xz -C {workdir} --strip-components=1"
|
2025-11-02 20:36:13 +01:00
|
|
|
);
|
|
|
|
|
if run_shell(&cmd_curl).await.is_ok() {
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
let cmd_wget = format!(
|
|
|
|
|
"mkdir -p {workdir} && wget -qO- {url} | tar -xz -C {workdir} --strip-components=1"
|
|
|
|
|
);
|
|
|
|
|
if run_shell(&cmd_wget).await.is_ok() {
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-02 20:48:05 +01:00
|
|
|
// On illumos/SunOS images, curl/wget may be missing or CA bundle absent. Try to install tools and CA certs, then retry.
|
2025-11-02 20:36:13 +01:00
|
|
|
let os = std::env::var("SOLSTICE_OS_OVERRIDE").ok().unwrap_or_else(|| {
|
|
|
|
|
// Best-effort OS detection
|
|
|
|
|
std::env::consts::OS.to_string()
|
|
|
|
|
});
|
|
|
|
|
// Prefer uname if available
|
|
|
|
|
let uname = Command::new("/bin/sh").arg("-lc").arg("uname -s 2>/dev/null || echo unknown").output().await.ok()
|
|
|
|
|
.and_then(|o| String::from_utf8(o.stdout).ok()).unwrap_or_default();
|
|
|
|
|
let is_sunos = uname.trim() == "SunOS" || os == "solaris";
|
|
|
|
|
if is_sunos {
|
|
|
|
|
// Try IPS (pkg) first, then pkgin
|
|
|
|
|
let _ = run_shell("sudo pkg refresh || true").await;
|
2025-11-02 20:48:05 +01:00
|
|
|
// curl
|
2025-11-02 20:36:13 +01:00
|
|
|
if run_shell("sudo pkg install -v web/curl").await.is_err() {
|
|
|
|
|
let _ = run_shell("sudo pkgin -y install curl").await;
|
|
|
|
|
}
|
2025-11-02 20:48:05 +01:00
|
|
|
// CA certificates (package name may differ per distro)
|
|
|
|
|
let _ = run_shell("sudo pkg install -v web/ca-certificates || sudo pkg install -v library/security/ca-certificates || true").await;
|
|
|
|
|
let _ = run_shell("sudo pkgin -y install mozilla-rootcerts || true").await;
|
|
|
|
|
let _ = run_shell("sudo mozilla-rootcerts install || true").await;
|
|
|
|
|
|
|
|
|
|
// Retry with curl and wget
|
2025-11-02 20:36:13 +01:00
|
|
|
if run_shell(&cmd_curl).await.is_ok() {
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
if run_shell(&cmd_wget).await.is_ok() {
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
2025-11-02 20:48:05 +01:00
|
|
|
|
|
|
|
|
// As a last resort with explicit opt-in, try curl --insecure
|
|
|
|
|
if insecure {
|
|
|
|
|
let cmd_curl_insecure = format!(
|
|
|
|
|
"mkdir -p {workdir} && curl -fSLk {url} | tar -xz -C {workdir} --strip-components=1"
|
|
|
|
|
);
|
|
|
|
|
if run_shell(&cmd_curl_insecure).await.is_ok() {
|
|
|
|
|
warn!("used curl --insecure to fetch repo archive on SunOS");
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-11-02 20:36:13 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Err(miette::miette!("failed to fetch repo archive via HTTP for {url}"))
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> {
|
|
|
|
|
fs::create_dir_all(workdir).await.into_diagnostic()?;
|
2025-11-02 20:36:13 +01:00
|
|
|
let repo_eff = to_https_if_codeberg(repo);
|
|
|
|
|
|
|
|
|
|
// Prefer git when available; fall back to archive download when git is missing or fetch fails.
|
|
|
|
|
if has_cmd("git").await {
|
|
|
|
|
let cmds = vec![
|
|
|
|
|
format!("cd {workdir} && git init"),
|
|
|
|
|
format!(
|
|
|
|
|
"cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo_eff}"
|
|
|
|
|
),
|
|
|
|
|
format!("cd {workdir} && git fetch --depth=1 origin {sha}"),
|
|
|
|
|
format!("cd {workdir} && git checkout -q FETCH_HEAD"),
|
|
|
|
|
];
|
|
|
|
|
for c in cmds {
|
|
|
|
|
match run_shell(&c).await {
|
|
|
|
|
Ok(_) => {}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
// Try archive fallback once on any git failure
|
|
|
|
|
warn!(error = %e, "git path failed; attempting archive fallback");
|
|
|
|
|
return fetch_repo_via_archive(&repo_eff, sha, workdir).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(())
|
|
|
|
|
} else {
|
|
|
|
|
fetch_repo_via_archive(&repo_eff, sha, workdir).await
|
2025-11-01 14:56:46 +01:00
|
|
|
}
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
|
2025-11-01 14:56:46 +01:00
|
|
|
async fn run_job_script_streamed(
|
|
|
|
|
workdir: &str,
|
|
|
|
|
tx: Option<mpsc::Sender<LogItem>>,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<i32> {
|
2025-11-01 12:14:50 +01:00
|
|
|
let script = format!("{}/.solstice/job.sh", workdir);
|
|
|
|
|
if !fs::try_exists(&script).await.into_diagnostic()? {
|
|
|
|
|
warn!(path = %script, "job script not found");
|
2025-11-02 20:36:13 +01:00
|
|
|
if let Some(tx0) = tx.as_ref() {
|
|
|
|
|
let _ = tx0
|
|
|
|
|
.send(LogItem {
|
|
|
|
|
request_id: request_id.to_string(),
|
|
|
|
|
event: Some(Event::Log(LogChunk {
|
|
|
|
|
line: format!("[runner] job script not found at {}", script),
|
|
|
|
|
stderr: true,
|
|
|
|
|
})),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
|
|
|
|
}
|
|
|
|
|
return Ok(1);
|
|
|
|
|
}
|
|
|
|
|
// Emit explicit pre-exec line to aid diagnostics
|
|
|
|
|
if let Some(tx0) = tx.as_ref() {
|
|
|
|
|
let _ = tx0
|
|
|
|
|
.send(LogItem {
|
|
|
|
|
request_id: request_id.to_string(),
|
|
|
|
|
event: Some(Event::Log(LogChunk {
|
|
|
|
|
line: format!("[runner] executing {}", script),
|
|
|
|
|
stderr: false,
|
|
|
|
|
})),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
let _ = run_shell(&format!("chmod +x {} || true", script)).await?;
|
|
|
|
|
|
|
|
|
|
let mut cmd = Command::new("/bin/sh");
|
2025-11-01 14:56:46 +01:00
|
|
|
cmd.arg("-lc")
|
|
|
|
|
.arg(format!("cd {workdir} && {}", script))
|
2025-11-01 12:14:50 +01:00
|
|
|
.stdout(Stdio::piped())
|
|
|
|
|
.stderr(Stdio::piped());
|
|
|
|
|
let mut child = cmd.spawn().into_diagnostic()?;
|
|
|
|
|
|
|
|
|
|
if let Some(tx) = tx.clone() {
|
|
|
|
|
if let Some(stdout) = child.stdout.take() {
|
2025-11-02 20:36:13 +01:00
|
|
|
let mut reader = BufReader::new(stdout);
|
2025-11-01 12:14:50 +01:00
|
|
|
let tx2 = tx.clone();
|
|
|
|
|
let req = request_id.to_string();
|
|
|
|
|
tokio::spawn(async move {
|
2025-11-02 20:36:13 +01:00
|
|
|
loop {
|
|
|
|
|
let mut buf = Vec::with_capacity(256);
|
|
|
|
|
match reader.read_until(b'\n', &mut buf).await {
|
|
|
|
|
Ok(0) => break,
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string();
|
|
|
|
|
let _ = tx2
|
|
|
|
|
.send(LogItem {
|
|
|
|
|
request_id: req.clone(),
|
|
|
|
|
event: Some(Event::Log(LogChunk {
|
|
|
|
|
line,
|
|
|
|
|
stderr: false,
|
|
|
|
|
})),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
let _ = tx2
|
|
|
|
|
.send(LogItem {
|
|
|
|
|
request_id: req.clone(),
|
|
|
|
|
event: Some(Event::Log(LogChunk {
|
|
|
|
|
line: format!("[runner] error reading stdout: {e}"),
|
|
|
|
|
stderr: true,
|
|
|
|
|
})),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
if let Some(stderr) = child.stderr.take() {
|
2025-11-02 20:36:13 +01:00
|
|
|
let mut reader = BufReader::new(stderr);
|
2025-11-01 12:14:50 +01:00
|
|
|
let tx2 = tx.clone();
|
|
|
|
|
let req = request_id.to_string();
|
|
|
|
|
tokio::spawn(async move {
|
2025-11-02 20:36:13 +01:00
|
|
|
loop {
|
|
|
|
|
let mut buf = Vec::with_capacity(256);
|
|
|
|
|
match reader.read_until(b'\n', &mut buf).await {
|
|
|
|
|
Ok(0) => break,
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string();
|
|
|
|
|
let _ = tx2
|
|
|
|
|
.send(LogItem {
|
|
|
|
|
request_id: req.clone(),
|
|
|
|
|
event: Some(Event::Log(LogChunk { line, stderr: true })),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
let _ = tx2
|
|
|
|
|
.send(LogItem {
|
|
|
|
|
request_id: req.clone(),
|
|
|
|
|
event: Some(Event::Log(LogChunk {
|
|
|
|
|
line: format!("[runner] error reading stderr: {e}"),
|
|
|
|
|
stderr: true,
|
|
|
|
|
})),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// If no streaming, still attach to child I/O to avoid blocking
|
|
|
|
|
let _ = child.stdout.take();
|
|
|
|
|
let _ = child.stderr.take();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let status = child.wait().await.into_diagnostic()?;
|
|
|
|
|
Ok(status.code().unwrap_or(1))
|
2025-10-25 20:00:32 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::main(flavor = "multi_thread")]
|
|
|
|
|
async fn main() -> Result<()> {
|
|
|
|
|
let _t = common::init_tracing("solstice-workflow-runner")?;
|
2025-11-01 12:14:50 +01:00
|
|
|
let _opts = Opts::parse();
|
2025-10-25 20:00:32 +02:00
|
|
|
|
2025-11-01 12:14:50 +01:00
|
|
|
// Try env overrides first for robustness
|
|
|
|
|
let repo = std::env::var("SOLSTICE_REPO_URL").ok();
|
|
|
|
|
let sha = std::env::var("SOLSTICE_COMMIT_SHA").ok();
|
|
|
|
|
|
|
|
|
|
let (repo, sha) = match (repo, sha) {
|
|
|
|
|
(Some(r), Some(s)) => (r, s),
|
|
|
|
|
_ => {
|
|
|
|
|
let jf = read_job_file().await?;
|
|
|
|
|
(jf.repo_url, jf.commit_sha)
|
2025-10-25 20:00:32 +02:00
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2025-11-01 12:14:50 +01:00
|
|
|
info!(%repo, %sha, "runner starting");
|
|
|
|
|
let workdir = std::env::var("SOLSTICE_WORKDIR").unwrap_or_else(|_| "/root/work".into());
|
2025-10-25 20:00:32 +02:00
|
|
|
|
2025-11-01 12:14:50 +01:00
|
|
|
// Setup gRPC streaming if orchestrator address and request id are provided
|
|
|
|
|
let orch_addr = std::env::var("SOLSTICE_ORCH_ADDR").ok();
|
2025-11-02 20:36:13 +01:00
|
|
|
let request_id_env = std::env::var("SOLSTICE_REQUEST_ID").ok();
|
|
|
|
|
// Use provided request id or empty string (server will warn if invalid)
|
|
|
|
|
let request_id_effective = request_id_env.unwrap_or_default();
|
2025-11-01 12:14:50 +01:00
|
|
|
let mut tx_opt: Option<mpsc::Sender<LogItem>> = None;
|
2025-11-02 20:36:13 +01:00
|
|
|
let mut stream_handle: Option<tokio::task::JoinHandle<()>> = None;
|
|
|
|
|
if let Some(addr) = orch_addr.clone() {
|
|
|
|
|
let (tx, rx) = mpsc::channel::<LogItem>(512);
|
2025-11-01 12:14:50 +01:00
|
|
|
let stream = ReceiverStream::new(rx);
|
2025-11-02 20:36:13 +01:00
|
|
|
// Spawn client task and keep a handle so we can await graceful flush on shutdown.
|
|
|
|
|
let handle = tokio::spawn(async move {
|
2025-11-01 14:56:46 +01:00
|
|
|
match RunnerClient::connect(format!("http://{addr}")).await {
|
2025-11-01 12:14:50 +01:00
|
|
|
Ok(mut client) => {
|
2025-11-02 20:36:13 +01:00
|
|
|
if let Err(e) = client.stream_logs(stream).await {
|
|
|
|
|
warn!(error = %e, "log stream to orchestrator terminated with error");
|
|
|
|
|
}
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
warn!(error = %e, "failed to connect to orchestrator gRPC; logs will not be streamed");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
2025-11-02 20:36:13 +01:00
|
|
|
stream_handle = Some(handle);
|
2025-11-01 12:14:50 +01:00
|
|
|
tx_opt = Some(tx);
|
|
|
|
|
// Send a first line
|
|
|
|
|
if let Some(ref tx) = tx_opt {
|
2025-11-01 14:56:46 +01:00
|
|
|
let _ = tx
|
|
|
|
|
.send(LogItem {
|
2025-11-02 20:36:13 +01:00
|
|
|
request_id: request_id_effective.clone(),
|
2025-11-01 14:56:46 +01:00
|
|
|
event: Some(Event::Log(LogChunk {
|
|
|
|
|
line: format!("runner starting: repo={repo} sha={sha}"),
|
|
|
|
|
stderr: false,
|
|
|
|
|
})),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
2025-10-25 20:00:32 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-02 20:36:13 +01:00
|
|
|
let code = match ensure_repo(&repo, &sha, &workdir).await {
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
// proceed to run job script
|
|
|
|
|
run_job_script_streamed(&workdir, tx_opt.clone(), &request_id_effective).await?
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
// Report checkout failure to orchestrator and return non-zero
|
|
|
|
|
if let Some(tx) = tx_opt.clone() {
|
|
|
|
|
let _ = tx
|
|
|
|
|
.send(LogItem {
|
|
|
|
|
request_id: request_id_effective.clone(),
|
|
|
|
|
event: Some(Event::Log(LogChunk {
|
|
|
|
|
line: format!("[runner] failed to prepare repo: {e}"),
|
|
|
|
|
stderr: true,
|
|
|
|
|
})),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
|
|
|
|
}
|
|
|
|
|
1
|
|
|
|
|
}
|
|
|
|
|
};
|
2025-11-01 12:14:50 +01:00
|
|
|
|
|
|
|
|
// Send JobEnd if streaming enabled
|
2025-11-02 20:36:13 +01:00
|
|
|
if let Some(tx) = tx_opt.clone() {
|
2025-11-01 14:56:46 +01:00
|
|
|
let _ = tx
|
|
|
|
|
.send(LogItem {
|
2025-11-02 20:36:13 +01:00
|
|
|
request_id: request_id_effective.clone(),
|
2025-11-01 14:56:46 +01:00
|
|
|
event: Some(Event::End(JobEnd {
|
|
|
|
|
exit_code: code,
|
|
|
|
|
success: code == 0,
|
|
|
|
|
repo_url: repo.clone(),
|
|
|
|
|
commit_sha: sha.clone(),
|
|
|
|
|
})),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
2025-11-02 20:36:13 +01:00
|
|
|
// Drop the last sender to close the client stream and allow server to flush
|
|
|
|
|
drop(tx);
|
|
|
|
|
tx_opt = None;
|
|
|
|
|
// Give the client task a brief moment to flush and then await it
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
|
|
|
|
|
if let Some(h) = stream_handle {
|
|
|
|
|
let _ = h.await;
|
|
|
|
|
}
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if code != 0 {
|
|
|
|
|
error!(exit_code = code, "job script failed");
|
|
|
|
|
std::process::exit(code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
info!("job complete");
|
2025-10-25 20:00:32 +02:00
|
|
|
Ok(())
|
|
|
|
|
}
|