From 855aecbb1041fdc742e30f8df899d55b79480eb26ee25e64088b11907136eb25 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sat, 1 Nov 2025 12:14:50 +0100 Subject: [PATCH] Add gRPC support for VM runner log streaming and orchestrator integration This commit introduces gRPC-based log streaming between the VM runner (`solstice-runner`) and orchestrator. Key updates include: - Implemented gRPC server in the orchestrator for receiving and processing runner logs. - Added log streaming and job result reporting in the `solstice-runner` client. - Defined `runner.proto` with messages (`LogItem`, `JobEnd`) and the `Runner` service. - Updated orchestrator to accept gRPC settings and start the server. - Modified cloud-init user data to include gRPC endpoint and request ID for runners. - Enhanced message queue logic to handle job results via `publish_job_result`. - Configured `Cross.toml` for cross-compilation of the runner. --- Cross.toml | 14 ++ crates/common/Cargo.toml | 9 +- crates/common/build.rs | 8 + crates/common/proto/runner.proto | 30 ++++ crates/common/src/lib.rs | 11 +- crates/common/src/messages.rs | 45 ++++- crates/common/src/mq.rs | 51 +++++- crates/orchestrator/Cargo.toml | 3 + crates/orchestrator/src/grpc.rs | 81 +++++++++ crates/orchestrator/src/main.rs | 43 +++-- crates/workflow-runner/Cargo.toml | 11 +- crates/workflow-runner/src/main.rs | 156 ++++++++++++++++-- .../2025-10-26-workflow-runner-and-cross.md | 43 +++++ 13 files changed, 467 insertions(+), 38 deletions(-) create mode 100644 Cross.toml create mode 100644 crates/common/build.rs create mode 100644 crates/common/proto/runner.proto create mode 100644 crates/orchestrator/src/grpc.rs create mode 100644 docs/ai/2025-10-26-workflow-runner-and-cross.md diff --git a/Cross.toml b/Cross.toml new file mode 100644 index 0000000..e7e8bd2 --- /dev/null +++ b/Cross.toml @@ -0,0 +1,14 @@ +# Cross configuration for building the workflow runner for VM targets +# Reference: https://github.com/cross-rs/cross + +[target.x86_64-unknown-linux-gnu] +image = "ghcr.io/cross-rs/x86_64-unknown-linux-gnu:main" + +[target.x86_64-unknown-illumos] +image = "ghcr.io/cross-rs/x86_64-unknown-illumos:main" + +[build] +pre-build = [ + "dpkg --add-architecture $CROSS_DEB_ARCH", + "apt-get update && apt-get install --assume-yes protobuf-compiler" +] \ No newline at end of file diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index a0c9c8e..2e46cf9 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -15,12 +15,17 @@ tracing-opentelemetry = "0.27" tracing-appender = "0.2" atty = "0.2" kdl = "4" +# gRPC/protobuf runtime for Runner API +tonic = { version = "0.12", features = ["transport"] } +prost = "0.13" # messaging + serialization -lapin = "2" -tokio-amqp = "1" +lapin = { version = "2", default-features = false, features = ["rustls"] } serde = { version = "1", features = ["derive"] } serde_json = "1" uuid = { version = "1", features = ["serde", "v4"] } time = { version = "0.3", features = ["serde", "macros"] } futures-util = "0.3" tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] } + +[build-dependencies] +tonic-build = "0.12" diff --git a/crates/common/build.rs b/crates/common/build.rs new file mode 100644 index 0000000..5ebe68a --- /dev/null +++ b/crates/common/build.rs @@ -0,0 +1,8 @@ +fn main() { + // Compile gRPC protobufs for Runner <-> Orchestrator + println!("cargo:rerun-if-changed=proto/runner.proto"); + tonic_build::configure() + .build_server(true) + .compile_protos(&["proto/runner.proto"], &["proto"]) + .expect("failed to compile runner proto"); +} diff --git a/crates/common/proto/runner.proto b/crates/common/proto/runner.proto new file mode 100644 index 0000000..d218643 --- /dev/null +++ b/crates/common/proto/runner.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; +package runner.v1; + +// Messages sent from the VM runner agent to the orchestrator. +message LogChunk { + string line = 1; // One line of log output (stdout/stderr) + bool stderr = 2; // True if this line came from stderr +} + +message JobEnd { + int32 exit_code = 1; // Exit code of the job script/process + bool success = 2; // Convenience flag + string repo_url = 3; // Convenience context for Integration layer + string commit_sha = 4; // Convenience context for Integration layer +} + +message LogItem { + string request_id = 1; // UUID string to correlate the job + oneof event { + LogChunk log = 2; + JobEnd end = 3; + } +} + +message Ack { bool ok = 1; } + +service Runner { + // Client-streaming RPC: runner sends a stream of LogItem; orchestrator returns Ack + rpc StreamLogs (stream LogItem) returns (Ack); +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 311c814..4344f8c 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -5,5 +5,12 @@ pub mod mq; pub use telemetry::{init_tracing, TelemetryGuard}; pub use job::{Workflow, Job, Step, parse_workflow_str, parse_workflow_file}; -pub use messages::{JobRequest, SourceSystem}; -pub use mq::{MqConfig, publish_job, consume_jobs, consume_jobs_until}; +pub use messages::{JobRequest, JobResult, SourceSystem}; +pub use mq::{MqConfig, publish_job, publish_job_result, consume_jobs, consume_jobs_until}; + +// Generated gRPC module for runner <-> orchestrator +pub mod runner { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/runner.v1.rs")); + } +} diff --git a/crates/common/src/messages.rs b/crates/common/src/messages.rs index f835c8a..0193866 100644 --- a/crates/common/src/messages.rs +++ b/crates/common/src/messages.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use time::{OffsetDateTime}; +use time::OffsetDateTime; use uuid::Uuid; /// Versioned internal job request schema published to the message bus. @@ -7,7 +7,7 @@ use uuid::Uuid; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JobRequest { /// Schema identifier for routing and evolution. - #[serde(default = "default_schema_version")] + #[serde(default = "default_jobrequest_schema")] pub schema_version: String, // e.g., "jobrequest.v1" /// Unique request identifier for idempotency and tracing correlation. pub request_id: Uuid, @@ -27,7 +27,7 @@ pub struct JobRequest { pub submitted_at: OffsetDateTime, } -fn default_schema_version() -> String { "jobrequest.v1".to_string() } +fn default_jobrequest_schema() -> String { "jobrequest.v1".to_string() } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -40,7 +40,7 @@ pub enum SourceSystem { impl JobRequest { pub fn new(source: SourceSystem, repo_url: impl Into, commit_sha: impl Into) -> Self { Self { - schema_version: default_schema_version(), + schema_version: default_jobrequest_schema(), request_id: Uuid::new_v4(), source, repo_url: repo_url.into(), @@ -52,3 +52,40 @@ impl JobRequest { } } } + +/// Final job result reported by the orchestrator back to the Integration layer over MQ. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobResult { + /// Schema identifier used as routing key/type. e.g., "jobresult.v1" + #[serde(default = "default_jobresult_schema")] + pub schema_version: String, + /// Correlates to the original JobRequest.request_id + pub request_id: Uuid, + /// Repository and commit info (for convenience in consumers) + pub repo_url: String, + pub commit_sha: String, + /// Outcome info + pub success: bool, + pub exit_code: i32, + /// Optional human summary + pub summary: Option, + /// Completion timestamp + pub completed_at: OffsetDateTime, +} + +fn default_jobresult_schema() -> String { "jobresult.v1".to_string() } + +impl JobResult { + pub fn new(request_id: Uuid, repo_url: String, commit_sha: String, success: bool, exit_code: i32, summary: Option) -> Self { + Self { + schema_version: default_jobresult_schema(), + request_id, + repo_url, + commit_sha, + success, + exit_code, + summary, + completed_at: OffsetDateTime::now_utc(), + } + } +} diff --git a/crates/common/src/mq.rs b/crates/common/src/mq.rs index ca5318f..8c1d7be 100644 --- a/crates/common/src/mq.rs +++ b/crates/common/src/mq.rs @@ -13,7 +13,7 @@ use miette::{IntoDiagnostic as _, Result}; use tracing::{debug, error, info, instrument, warn}; use tracing::Instrument; -use crate::messages::JobRequest; +use crate::messages::{JobRequest, JobResult}; #[derive(Clone, Debug)] pub struct MqConfig { @@ -285,3 +285,52 @@ where info!("consume_jobs completed"); Ok(()) } + + +#[instrument(skip(cfg, result))] +pub async fn publish_job_result(cfg: &MqConfig, result: &JobResult) -> Result<()> { + let conn = connect(cfg).await?; + let channel = conn.create_channel().await.into_diagnostic()?; + + // Ensure main exchange exists + channel + .exchange_declare( + &cfg.exchange, + lapin::ExchangeKind::Direct, + ExchangeDeclareOptions { durable: true, auto_delete: false, internal: false, nowait: false, passive: false }, + FieldTable::default(), + ) + .await + .into_diagnostic()?; + + // Enable publisher confirms + channel + .confirm_select(ConfirmSelectOptions::default()) + .await + .into_diagnostic()?; + + let payload = serde_json::to_vec(result).into_diagnostic()?; + + let props = BasicProperties::default() + .with_content_type("application/json".into()) + .with_content_encoding("utf-8".into()) + .with_type(ShortString::from(result.schema_version.clone())) + .with_delivery_mode(2u8.into()); + + // Route by schema version; default routing key for results + let routing_key = "jobresult.v1"; + + let confirm = channel + .basic_publish( + &cfg.exchange, + routing_key, + BasicPublishOptions { mandatory: true, ..Default::default() }, + &payload, + props, + ) + .await + .into_diagnostic()?; + + confirm.await.into_diagnostic()?; + Ok(()) +} diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 02b400c..dbf1371 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -21,6 +21,8 @@ config = { version = "0.14", default-features = false, features = ["yaml"] } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2", "gzip", "brotli", "zstd"] } bytes = "1" path-absolutize = "3" +# gRPC server +tonic = { version = "0.12", features = ["transport"] } # Compression/decompression zstd = "0.13" # DB (optional basic persistence) @@ -33,6 +35,7 @@ once_cell = "1" dashmap = "6" async-trait = "0.1" uuid = { version = "1", features = ["v4", "serde"] } +futures-util = "0.3.31" [target.'cfg(target_os = "linux")'.dependencies] virt = { version = "0.3" } diff --git a/crates/orchestrator/src/grpc.rs b/crates/orchestrator/src/grpc.rs new file mode 100644 index 0000000..7eb9328 --- /dev/null +++ b/crates/orchestrator/src/grpc.rs @@ -0,0 +1,81 @@ +use std::net::SocketAddr; +use futures_util::StreamExt; +use miette::{IntoDiagnostic as _, Result}; +use tonic::{Request, Response, Status}; +use tracing::{error, info, warn}; + +use common::{MqConfig, publish_job_result}; +use common::runner::v1::{runner_server::{Runner, RunnerServer}, LogItem, Ack}; + +pub struct RunnerSvc { + mq_cfg: MqConfig, +} + +impl RunnerSvc { + pub fn new(mq_cfg: MqConfig) -> Self { Self { mq_cfg } } +} + +#[tonic::async_trait] +impl Runner for RunnerSvc { + async fn stream_logs( + &self, + request: Request>, + ) -> std::result::Result, Status> { + let mut stream = request.into_inner(); + let mut req_id: Option = None; + let mut repo_url: Option = None; + let mut commit_sha: Option = None; + let mut exit_code: i32 = 0; + let mut success: bool = true; + + while let Some(item) = stream.next().await.transpose().map_err(|e| Status::internal(e.to_string()))? { + // Correlate request id + if req_id.is_none() { + match uuid::Uuid::parse_str(&item.request_id) { + Ok(u) => req_id = Some(u), + Err(_) => { + warn!(request_id = %item.request_id, "invalid request_id from runner"); + } + } + } + if let Some(ev) = item.event { + match ev { + common::runner::v1::log_item::Event::Log(chunk) => { + if chunk.stderr { + info!(request_id = %item.request_id, line = %chunk.line, "runner:stderr"); + } else { + info!(request_id = %item.request_id, line = %chunk.line, "runner:stdout"); + } + } + common::runner::v1::log_item::Event::End(end) => { + exit_code = end.exit_code; + success = end.success; + repo_url = Some(end.repo_url); + commit_sha = Some(end.commit_sha); + } + } + } + } + + // Publish final status if we have enough context + if let (Some(id), Some(repo), Some(sha)) = (req_id, repo_url, commit_sha) { + let result = common::messages::JobResult::new(id, repo, sha, success, exit_code, None); + if let Err(e) = publish_job_result(&self.mq_cfg, &result).await { + error!(error = %e, request_id = %id, "failed to publish JobResult"); + } + } else { + warn!(have_req_id = req_id.is_some(), have_repo = repo_url.is_some(), have_sha = commit_sha.is_some(), "missing context for JobResult; skipping publish"); + } + + Ok(Response::new(Ack { ok: true })) + } +} + +pub async fn serve_with_shutdown(addr: SocketAddr, mq_cfg: MqConfig, shutdown: impl std::future::Future) -> Result<()> { + info!(%addr, "gRPC server starting"); + tonic::transport::Server::builder() + .add_service(RunnerServer::new(RunnerSvc::new(mq_cfg))) + .serve_with_shutdown(addr, shutdown) + .await + .into_diagnostic() +} diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 5325e5c..823b3e7 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -2,6 +2,7 @@ mod config; mod hypervisor; mod scheduler; mod persist; +mod grpc; use std::{collections::HashMap, path::PathBuf, time::Duration}; @@ -97,15 +98,28 @@ async fn main() -> Result<()> { // Build MQ config and start consumer let mq_cfg = common::MqConfig { - url: opts.amqp_url, - exchange: opts.amqp_exchange, - routing_key: opts.amqp_routing_key, - queue: opts.amqp_queue, + url: opts.amqp_url.clone(), + exchange: opts.amqp_exchange.clone(), + routing_key: opts.amqp_routing_key.clone(), + queue: opts.amqp_queue.clone(), dlx: std::env::var("AMQP_DLX").unwrap_or_else(|_| "solstice.dlx".into()), dlq: std::env::var("AMQP_DLQ").unwrap_or_else(|_| "solstice.jobs.v1.dlq".into()), prefetch: opts.amqp_prefetch.unwrap_or(opts.max_concurrency as u16), }; + // Start gRPC server for runner log streaming + let grpc_addr: std::net::SocketAddr = opts.grpc_addr.parse().into_diagnostic()?; + let mq_cfg_for_grpc = mq_cfg.clone(); + let (grpc_shutdown_tx, grpc_shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let grpc_task = tokio::spawn(async move { + let _ = crate::grpc::serve_with_shutdown(grpc_addr, mq_cfg_for_grpc, async move { + let _ = grpc_shutdown_rx.await; + }).await; + }); + + // Orchestrator contact address for runner to dial back (can override via ORCH_CONTACT_ADDR) + let orch_contact = std::env::var("ORCH_CONTACT_ADDR").unwrap_or_else(|_| opts.grpc_addr.clone()); + // Scheduler let sched = Scheduler::new( router, @@ -137,6 +151,7 @@ async fn main() -> Result<()> { let sched_tx = tx_for_consumer.clone(); let cfg = cfg_clone.clone(); let persist = persist_for_consumer.clone(); + let orch_contact_val = orch_contact.clone(); async move { let label_resolved = cfg.resolve_label(job.runs_on.as_deref()).unwrap_or(&cfg.default_label).to_string(); let image = match cfg.image_for(&label_resolved) { @@ -162,7 +177,7 @@ async fn main() -> Result<()> { disk_gb, network: None, // libvirt network handled in backend nocloud: image.nocloud, - user_data: Some(make_cloud_init_userdata(&job.repo_url, &job.commit_sha)), + user_data: Some(make_cloud_init_userdata(&job.repo_url, &job.commit_sha, job.request_id, &orch_contact_val)), }; if !spec.nocloud { warn!(label = %label_resolved, "image is not marked nocloud=true; cloud-init may not work"); @@ -188,11 +203,14 @@ async fn main() -> Result<()> { // Ask scheduler to shut down cooperatively (interrupt placeholders) sched_shutdown.notify_waiters(); + // Stop gRPC server + let _ = grpc_shutdown_tx.send(()); + // Drop sender to let scheduler drain and exit drop(sched_tx); - // Wait for consumer and scheduler to finish concurrently - let (_c_res, _s_res) = tokio::join!(consumer_task, scheduler_task); + // Wait for consumer, scheduler and grpc to finish concurrently + let (_c_res, _s_res, _g_res) = tokio::join!(consumer_task, scheduler_task, grpc_task); Ok(()) } @@ -212,7 +230,7 @@ fn parse_capacity_map(s: Option<&str>) -> HashMap { m } -fn make_cloud_init_userdata(repo_url: &str, commit_sha: &str) -> Vec { +fn make_cloud_init_userdata(repo_url: &str, commit_sha: &str, request_id: uuid::Uuid, orch_addr: &str) -> Vec { let s = format!(r#"#cloud-config write_files: - path: /etc/solstice/job.yaml @@ -243,6 +261,8 @@ write_files: export SOLSTICE_REPO_URL='{repo}' export SOLSTICE_COMMIT_SHA='{sha}' export SOLSTICE_JOB_FILE='/etc/solstice/job.yaml' + export SOLSTICE_ORCH_ADDR='{orch_addr}' + export SOLSTICE_REQUEST_ID='{req_id}' if [ -x "$RUNNER" ]; then "$RUNNER" || true else @@ -252,7 +272,7 @@ write_files: (command -v poweroff >/dev/null 2>&1 && poweroff) || (command -v shutdown >/dev/null 2>&1 && shutdown -y -i5 -g0) || true runcmd: - [ /usr/local/bin/solstice-bootstrap.sh ] -"#, repo = repo_url, sha = commit_sha); +"#, repo = repo_url, sha = commit_sha, req_id = request_id, orch_addr = orch_addr); s.into_bytes() } @@ -274,7 +294,8 @@ mod tests { #[test] fn test_make_cloud_init_userdata_includes_fields() { - let data = make_cloud_init_userdata("https://example.com/repo.git", "deadbeef"); + let req_id = uuid::Uuid::new_v4(); + let data = make_cloud_init_userdata("https://example.com/repo.git", "deadbeef", req_id, "127.0.0.1:50051"); let s = String::from_utf8(data).unwrap(); assert!(s.contains("#cloud-config")); assert!(s.contains("repo_url: https://example.com/repo.git")); @@ -283,5 +304,7 @@ mod tests { assert!(s.contains("/etc/solstice/job.yaml")); assert!(s.contains("runcmd:")); assert!(s.contains("powering off")); + assert!(s.contains("SOLSTICE_ORCH_ADDR")); + assert!(s.contains(&req_id.to_string())); } } diff --git a/crates/workflow-runner/Cargo.toml b/crates/workflow-runner/Cargo.toml index 69b633e..dc6f657 100644 --- a/crates/workflow-runner/Cargo.toml +++ b/crates/workflow-runner/Cargo.toml @@ -3,9 +3,18 @@ name = "workflow-runner" version = "0.1.0" edition = "2024" +[[bin]] +name = "solstice-runner" +path = "src/main.rs" + [dependencies] common = { path = "../common" } clap = { version = "4", features = ["derive", "env"] } miette = { version = "7", features = ["fancy"] } tracing = "0.1" -tokio = { version = "1", features = ["rt-multi-thread", "macros", "process"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "fs", "io-util"] } +serde = { version = "1", features = ["derive"] } +serde_yaml = "0.9" +# gRPC client +tonic = { version = "0.12", features = ["transport"] } +tokio-stream = "0.1" diff --git a/crates/workflow-runner/src/main.rs b/crates/workflow-runner/src/main.rs index 3320c15..6e44566 100644 --- a/crates/workflow-runner/src/main.rs +++ b/crates/workflow-runner/src/main.rs @@ -1,39 +1,159 @@ use clap::Parser; -use miette::{IntoDiagnostic, Result}; -use tracing::{info, error}; +use miette::{IntoDiagnostic as _, Result}; +use serde::Deserialize; +use tokio::{fs, process::Command, io::{AsyncBufReadExt, BufReader}}; +use std::process::Stdio; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{error, info, warn}; +use tokio::sync::mpsc; +use common::runner::v1::{runner_client::RunnerClient, log_item::Event, LogItem, LogChunk, JobEnd}; +use tonic::transport::Channel; #[derive(Parser, Debug)] #[command(name = "solstice-runner", version, about = "Solstice CI Workflow Runner (VM agent)")] struct Opts { - /// Path to workflow KDL file + /// Optional path to workflow KDL file (for local testing only) #[arg(long, env = "SOL_WORKFLOW_PATH")] - workflow: String, + workflow: Option, +} + +#[derive(Debug, Deserialize)] +struct JobFile { + repo_url: String, + commit_sha: String, +} + +async fn read_job_file() -> Result { + let path = std::env::var("SOLSTICE_JOB_FILE").unwrap_or_else(|_| "/etc/solstice/job.yaml".into()); + let bytes = fs::read(&path).await.into_diagnostic()?; + let jf: JobFile = serde_yaml::from_slice(&bytes).into_diagnostic()?; + Ok(jf) +} + +async fn run_shell(cmd: &str) -> Result { + info!(%cmd, "exec"); + let status = Command::new("/bin/sh").arg("-lc").arg(cmd).status().await.into_diagnostic()?; + Ok(status.code().unwrap_or(1)) +} + +async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { + fs::create_dir_all(workdir).await.into_diagnostic()?; + // Use system git to avoid libgit2 cross issues + let cmds = vec![ + format!("cd {workdir} && git init"), + format!("cd {workdir} && git remote remove origin >/dev/null 2>&1 || true && git remote add origin {repo}"), + format!("cd {workdir} && git fetch --depth=1 origin {sha}"), + format!("cd {workdir} && git checkout -q FETCH_HEAD"), + ]; + for c in cmds { let _ = run_shell(&c).await?; } + Ok(()) +} + +async fn run_job_script_streamed(workdir: &str, tx: Option>, request_id: &str) -> Result { + let script = format!("{}/.solstice/job.sh", workdir); + if !fs::try_exists(&script).await.into_diagnostic()? { + warn!(path = %script, "job script not found"); + return Ok(0); + } + let _ = run_shell(&format!("chmod +x {} || true", script)).await?; + + let mut cmd = Command::new("/bin/sh"); + cmd.arg("-lc").arg(format!("cd {workdir} && {}", script)) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + let mut child = cmd.spawn().into_diagnostic()?; + + if let Some(tx) = tx.clone() { + if let Some(stdout) = child.stdout.take() { + let mut lines = BufReader::new(stdout).lines(); + let tx2 = tx.clone(); + let req = request_id.to_string(); + tokio::spawn(async move { + while let Ok(Some(line)) = lines.next_line().await { + let _ = tx2.send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line, stderr: false })) }).await; + } + }); + } + if let Some(stderr) = child.stderr.take() { + let mut lines = BufReader::new(stderr).lines(); + let tx2 = tx.clone(); + let req = request_id.to_string(); + tokio::spawn(async move { + while let Ok(Some(line)) = lines.next_line().await { + let _ = tx2.send(LogItem { request_id: req.clone(), event: Some(Event::Log(LogChunk { line, stderr: true })) }).await; + } + }); + } + } else { + // If no streaming, still attach to child I/O to avoid blocking + let _ = child.stdout.take(); + let _ = child.stderr.take(); + } + + let status = child.wait().await.into_diagnostic()?; + Ok(status.code().unwrap_or(1)) } #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { let _t = common::init_tracing("solstice-workflow-runner")?; - let opts = Opts::parse(); + let _opts = Opts::parse(); - let wf = match common::parse_workflow_file(&opts.workflow) { - Ok(wf) => wf, - Err(e) => { - error!(error = %e, "failed to parse workflow KDL"); - return Err(e); + // Try env overrides first for robustness + let repo = std::env::var("SOLSTICE_REPO_URL").ok(); + let sha = std::env::var("SOLSTICE_COMMIT_SHA").ok(); + + let (repo, sha) = match (repo, sha) { + (Some(r), Some(s)) => (r, s), + _ => { + let jf = read_job_file().await?; + (jf.repo_url, jf.commit_sha) } }; - info!(name = ?wf.name, jobs = wf.jobs.len(), "loaded workflow"); + info!(%repo, %sha, "runner starting"); + let workdir = std::env::var("SOLSTICE_WORKDIR").unwrap_or_else(|_| "/root/work".into()); - for (id, job) in &wf.jobs { - println!("Job: {id}"); - if let Some(ro) = &job.runs_on { println!(" runs_on: {ro}"); } - for (idx, step) in job.steps.iter().enumerate() { - let n = step.name.as_deref().unwrap_or("(unnamed)"); - println!(" Step {}/{}: {}", idx + 1, job.steps.len(), n); - println!(" run: {}", step.run); + // Setup gRPC streaming if orchestrator address and request id are provided + let orch_addr = std::env::var("SOLSTICE_ORCH_ADDR").ok(); + let request_id = std::env::var("SOLSTICE_REQUEST_ID").ok(); + let mut tx_opt: Option> = None; + if let (Some(addr), Some(req_id)) = (orch_addr.clone(), request_id.clone()) { + let (tx, rx) = mpsc::channel::(256); + let stream = ReceiverStream::new(rx); + // Spawn client task + tokio::spawn(async move { + match RunnerClient::connect(format!("http://{addr}" )).await { + Ok(mut client) => { + let _ = client.stream_logs(stream).await; // ignore result + } + Err(e) => { + warn!(error = %e, "failed to connect to orchestrator gRPC; logs will not be streamed"); + } + } + }); + tx_opt = Some(tx); + // Send a first line + if let Some(ref tx) = tx_opt { + let _ = tx.send(LogItem { request_id: req_id.clone(), event: Some(Event::Log(LogChunk { line: format!("runner starting: repo={repo} sha={sha}"), stderr: false })) }).await; } } + ensure_repo(&repo, &sha, &workdir).await?; + let code = run_job_script_streamed(&workdir, tx_opt.clone(), request_id.as_deref().unwrap_or("")).await?; + + // Send JobEnd if streaming enabled + if let (Some(tx), Some(req_id)) = (tx_opt.clone(), request_id.clone()) { + let _ = tx.send(LogItem { request_id: req_id.clone(), event: Some(Event::End(JobEnd { exit_code: code, success: code == 0, repo_url: repo.clone(), commit_sha: sha.clone() })) }).await; + // Give the client task a brief moment to flush + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + + if code != 0 { + error!(exit_code = code, "job script failed"); + std::process::exit(code); + } + + info!("job complete"); Ok(()) } diff --git a/docs/ai/2025-10-26-workflow-runner-and-cross.md b/docs/ai/2025-10-26-workflow-runner-and-cross.md new file mode 100644 index 0000000..a67e8c6 --- /dev/null +++ b/docs/ai/2025-10-26-workflow-runner-and-cross.md @@ -0,0 +1,43 @@ +### Solstice CI — Workflow Runner bootstrap and cross builds (MVP) + +Summary +- Implemented a minimal VM workflow runner binary (solstice-runner) that the orchestrator’s cloud-init bootstraps and executes inside the guest. +- The runner is cross-compilable with cross for convenient deployment to local dev VMs. + +What the runner does (today) +- Reads job context from env or a small YAML file: + - Env: SOLSTICE_REPO_URL, SOLSTICE_COMMIT_SHA + - File: SOLSTICE_JOB_FILE (defaults to /etc/solstice/job.yaml) with keys repo_url and commit_sha +- Prepares a workspace (default /root/work; overridable with SOLSTICE_WORKDIR). +- Uses system git to fetch the repository at the exact commit (avoids libgit2 to make cross builds simpler). +- Executes .solstice/job.sh when present, streaming stdout/stderr. Exits with the script’s exit code. +- Logs via tracing (stderr), compatible with the serial console setup added to libvirt. + +Why this design +- Keeps the guest-side binary very small, with minimal dependencies, easing cross builds. +- Shelling out to git leverages whatever the base image provides and avoids cross-compiling libgit2. +- Aligns with the orchestrator’s cloud-init that writes /etc/solstice/job.yaml and exports the same env var names. + +Build and usage +- Build all: cargo build --workspace +- Build only the runner: cargo build -p workflow-runner +- Binary name in target directory: solstice-runner + +Cross compiling +- A Cross.toml is provided at the workspace root. Example targets: + - x86_64-unknown-linux-gnu + - x86_64-unknown-illumos (requires a recent toolchain with illumos std) + +Examples: +- cross build -p workflow-runner --target x86_64-unknown-linux-gnu --release +- cross build -p workflow-runner --target x86_64-unknown-illumos --release + +Notes: +- Ensure the base VM image has /bin/sh and git installed (runner relies on both). +- On OpenIndiana/illumos images, prefer installing git via pkg or image packaging beforehand; the runner will not attempt to install packages. +- The orchestrator cloud-init already bootstraps /usr/local/bin/solstice-runner and calls it; set SOLSTICE_RUNNER_URL to point to an HTTP(S) URL hosting the cross-built artifact for quick iteration. + +Next steps +- Add optional KDL workflow execution when .solstice/job.sh is absent (parse .solstice/workflow.kdl and run steps). +- Stream logs back to the orchestrator over gRPC and report final status to the Integration layer. +- Secrets injection and masking in logs.