From d8ef6ef236797bb5745d70fcd80fcb6ad1bcc7769496b05ce2d3353b72f68e55 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Mon, 6 Apr 2026 23:59:26 +0200 Subject: [PATCH] Add log delivery and step state reporting to Forgejo runner Fetches logs from logs-service per category, uploads them to Forgejo via UpdateLog, and reports per-step StepState entries so the Forgejo UI shows individual step results and log output. --- crates/runner-integration/src/main.rs | 6 +- crates/runner-integration/src/poller.rs | 15 +- crates/runner-integration/src/reporter.rs | 160 +++++++++++++++++++- crates/runner-integration/src/state.rs | 21 ++- crates/runner-integration/src/translator.rs | 36 +++-- deploy/podman/compose.yml | 10 +- 6 files changed, 223 insertions(+), 25 deletions(-) diff --git a/crates/runner-integration/src/main.rs b/crates/runner-integration/src/main.rs index 39cbd83..1a52e8d 100644 --- a/crates/runner-integration/src/main.rs +++ b/crates/runner-integration/src/main.rs @@ -151,7 +151,11 @@ async fn main() -> Result<()> { .await?; // Build shared state - let state = Arc::new(RunnerState::new(identity, opts.max_concurrency)); + let state = Arc::new(RunnerState::new( + identity, + opts.max_concurrency, + opts.logs_base_url, + )); // Translation context let translate_ctx = Arc::new(TranslateCtx { diff --git a/crates/runner-integration/src/poller.rs b/crates/runner-integration/src/poller.rs index 00c6a04..98fabe6 100644 --- a/crates/runner-integration/src/poller.rs +++ b/crates/runner-integration/src/poller.rs @@ -85,7 +85,7 @@ pub async fn run( match translate_task(&task, &translate_ctx).await { Ok(TranslateResult::Jobs(jobs)) => { let mut published_any = false; - for jr in &jobs { + for (jr, steps) in &jobs { state.in_flight.insert( jr.request_id, TaskMeta { @@ -93,6 +93,7 @@ pub async fn run( repo_url: jr.repo_url.clone(), commit_sha: jr.commit_sha.clone(), started_at: Instant::now(), + steps: steps.clone(), }, ); @@ -189,7 +190,9 @@ async fn report_running(client: &ConnectClient, state: &RunnerState, task_id: i6 }), outputs: Default::default(), }; - client.update_task(&req, &state.identity.uuid, &state.identity.token).await?; + client + .update_task(&req, &state.identity.uuid, &state.identity.token) + .await?; Ok(()) } @@ -213,7 +216,9 @@ async fn report_failure( }), outputs: Default::default(), }; - client.update_task(&req, &state.identity.uuid, &state.identity.token).await?; + client + .update_task(&req, &state.identity.uuid, &state.identity.token) + .await?; // Also send the error message as a log line let log_req = crate::proto::runner::v1::UpdateLogRequest { @@ -228,7 +233,9 @@ async fn report_failure( }], no_more: true, }; - client.update_log(&log_req, &state.identity.uuid, &state.identity.token).await?; + client + .update_log(&log_req, &state.identity.uuid, &state.identity.token) + .await?; Ok(()) } diff --git a/crates/runner-integration/src/reporter.rs b/crates/runner-integration/src/reporter.rs index f6b9efc..62f7f99 100644 --- a/crates/runner-integration/src/reporter.rs +++ b/crates/runner-integration/src/reporter.rs @@ -3,10 +3,12 @@ use std::sync::Arc; use futures_util::StreamExt; use miette::{IntoDiagnostic, Result}; use tokio::sync::watch; -use tracing::{info, warn}; +use tracing::{debug, error, info, warn}; use crate::connect::ConnectClient; -use crate::proto::runner::v1::{self, TaskState, UpdateTaskRequest}; +use crate::proto::runner::v1::{ + self, LogRow, StepState, TaskState, UpdateLogRequest, UpdateTaskRequest, +}; use crate::state::RunnerState; /// Consume JobResults from RabbitMQ and report them back to Forgejo. @@ -165,24 +167,174 @@ async fn report_to_forgejo( v1::Result::Failure }; + // --- Fetch and upload logs --- + let mut log_index: i64 = 0; + let mut step_states: Vec = Vec::new(); + + // Fetch all logs from logs-service and send to Forgejo + if let Some(logs_base) = &state.logs_base_url { + // First get the log categories + let categories_url = format!( + "{}/jobs/{}/logs", + logs_base.trim_end_matches('/'), + jobres.request_id + ); + + let http = reqwest::Client::new(); + match http.get(&categories_url).send().await { + Ok(resp) if resp.status().is_success() => { + #[derive(serde::Deserialize)] + struct LogCategory { + category: String, + #[allow(dead_code)] + count: i64, + has_errors: bool, + } + + if let Ok(categories) = resp.json::>().await { + for (step_idx, cat) in categories.iter().enumerate() { + let step_log_start = log_index; + + // Fetch log lines for this category + let log_url = format!( + "{}/jobs/{}/logs/{}", + logs_base.trim_end_matches('/'), + jobres.request_id, + cat.category + ); + + match http.get(&log_url).send().await { + Ok(resp) if resp.status().is_success() => { + if let Ok(text) = resp.text().await { + let lines: Vec<&str> = text.lines().collect(); + let line_count = lines.len() as i64; + + if !lines.is_empty() { + // Build LogRow entries + let rows: Vec = lines + .iter() + .map(|line| LogRow { + time: Some(now.clone()), + content: line.to_string(), + }) + .collect(); + + // Send log chunk to Forgejo + let log_req = UpdateLogRequest { + task_id: task_meta.forgejo_task_id, + index: log_index, + rows, + no_more: false, + }; + + match client + .update_log( + &log_req, + &state.identity.uuid, + &state.identity.token, + ) + .await + { + Ok(resp) => { + debug!( + task_id = task_meta.forgejo_task_id, + category = %cat.category, + lines = line_count, + ack_index = resp.ack_index, + "uploaded logs" + ); + } + Err(e) => { + warn!( + error = %e, + category = %cat.category, + "failed to upload logs" + ); + } + } + + log_index += line_count; + } + + // Build step state for this category + let step_result = if cat.has_errors { + v1::Result::Failure + } else { + v1::Result::Success + }; + + step_states.push(StepState { + id: step_idx as i64, + result: step_result as i32, + started_at: Some(now.clone()), + stopped_at: Some(now.clone()), + log_index: step_log_start, + log_length: log_index - step_log_start, + }); + } + } + Ok(resp) => { + debug!( + status = %resp.status(), + category = %cat.category, + "failed to fetch log category" + ); + } + Err(e) => { + warn!(error = %e, category = %cat.category, "failed to fetch logs"); + } + } + } + } + } + Ok(resp) => { + debug!( + status = %resp.status(), + "failed to fetch log categories" + ); + } + Err(e) => { + warn!(error = %e, "failed to connect to logs-service"); + } + } + } + + // Send final "no more logs" marker + let final_log = UpdateLogRequest { + task_id: task_meta.forgejo_task_id, + index: log_index, + rows: vec![], + no_more: true, + }; + if let Err(e) = client + .update_log(&final_log, &state.identity.uuid, &state.identity.token) + .await + { + warn!(error = %e, "failed to send final log marker"); + } + + // --- Report task completion with step states --- let req = UpdateTaskRequest { state: Some(TaskState { id: task_meta.forgejo_task_id, result: result as i32, started_at: None, // already reported when task started stopped_at: Some(now), - steps: vec![], + steps: step_states, }), outputs: Default::default(), }; - client.update_task(&req, &state.identity.uuid, &state.identity.token).await?; + client + .update_task(&req, &state.identity.uuid, &state.identity.token) + .await?; info!( request_id = %jobres.request_id, task_id = task_meta.forgejo_task_id, success = jobres.success, exit_code = jobres.exit_code, + log_lines = log_index, "reported result to Forgejo" ); diff --git a/crates/runner-integration/src/state.rs b/crates/runner-integration/src/state.rs index 8bc7538..47c0bb7 100644 --- a/crates/runner-integration/src/state.rs +++ b/crates/runner-integration/src/state.rs @@ -15,14 +15,24 @@ pub struct RunnerIdentity { pub registered_at: String, } +/// Info about a workflow step, used for reporting step states to Forgejo. +#[derive(Debug, Clone)] +pub struct StepInfo { + /// Step name as defined in the KDL workflow (e.g., "System info"). + pub name: String, + /// Log category in the logs-service (e.g., "step:system-info"). + pub log_category: String, +} + /// Metadata for a Forgejo task that is currently in-flight within Solstice. #[derive(Debug)] pub struct TaskMeta { pub forgejo_task_id: i64, pub repo_url: String, pub commit_sha: String, - #[allow(dead_code)] pub started_at: Instant, + /// Known workflow steps (populated during translation for log/step reporting). + pub steps: Vec, } /// Shared state accessible by the poller and reporter tasks. @@ -32,14 +42,21 @@ pub struct RunnerState { pub in_flight: DashMap, /// Controls how many tasks can be in-flight simultaneously. pub semaphore: Arc, + /// Logs service base URL for fetching job logs. + pub logs_base_url: Option, } impl RunnerState { - pub fn new(identity: RunnerIdentity, max_concurrency: usize) -> Self { + pub fn new( + identity: RunnerIdentity, + max_concurrency: usize, + logs_base_url: Option, + ) -> Self { Self { identity, in_flight: DashMap::new(), semaphore: Arc::new(Semaphore::new(max_concurrency)), + logs_base_url, } } } diff --git a/crates/runner-integration/src/translator.rs b/crates/runner-integration/src/translator.rs index 851405f..fab3bac 100644 --- a/crates/runner-integration/src/translator.rs +++ b/crates/runner-integration/src/translator.rs @@ -3,11 +3,12 @@ use tracing::{debug, info, warn}; use uuid::Uuid; use crate::proto::runner::v1::Task; +use crate::state::StepInfo; /// The result of translating a Forgejo task. pub enum TranslateResult { - /// Successfully translated into one or more JobRequests. - Jobs(Vec), + /// Successfully translated into one or more JobRequests, with step info for reporting. + Jobs(Vec<(common::JobRequest, Vec)>), /// The workflow is not supported — return this message to Forgejo as a failure. Unsupported(String), } @@ -141,7 +142,7 @@ async fn try_kdl_workflow( repo_url: &str, sha: &str, group_id: Uuid, -) -> Result>> { +) -> Result)>>> { let base = match ctx.forgejo_base.as_deref() { Some(b) => b, None => return Ok(None), @@ -206,7 +207,7 @@ fn parse_kdl_jobs( repo: &str, sha: &str, group_id: Uuid, -) -> Vec { +) -> Vec<(common::JobRequest, Vec)> { let mut out = Vec::new(); let mut lines = kdl.lines().peekable(); @@ -216,6 +217,7 @@ fn parse_kdl_jobs( let id = capture_attr(l, "id"); let mut runs_on = capture_attr(l, "runs_on"); let mut script: Option = None; + let mut steps = Vec::new(); let mut depth = if l.ends_with('{') { 1i32 } else { 0 }; while let Some(ln) = lines.peek().copied() { @@ -238,6 +240,15 @@ fn parse_kdl_jobs( script = Some(p); } } + if t.starts_with("step ") { + if let Some(name) = capture_attr(t, "name") { + let slug = name.to_lowercase().replace(' ', "-"); + steps.push(StepInfo { + name, + log_category: format!("step:{}", slug), + }); + } + } if t.contains("runs_on=") && runs_on.is_none() { runs_on = capture_attr(t, "runs_on"); } @@ -254,7 +265,7 @@ fn parse_kdl_jobs( jr.workflow_job_id = Some(id_val); jr.runs_on = runs_on; jr.script_path = script; - out.push(jr); + out.push((jr, steps)); } } } @@ -266,7 +277,7 @@ fn parse_kdl_jobs( jr.repo_owner = Some(owner.to_string()); jr.repo_name = Some(repo.to_string()); jr.workflow_path = Some(".solstice/workflow.kdl".to_string()); - out.push(jr); + out.push((jr, vec![])); } out @@ -296,7 +307,7 @@ fn try_actions_yaml( repo: &str, sha: &str, group_id: Uuid, -) -> Result>> { +) -> Result)>>> { let yaml_str = std::str::from_utf8(payload_bytes).into_diagnostic()?; let doc: serde_yaml::Value = serde_yaml::from_str(yaml_str).into_diagnostic()?; @@ -387,10 +398,13 @@ fn try_actions_yaml( return Ok(None); } - // For tier 2, we just create the JobRequests — the script content will need - // to be handled by the orchestrator. For now, embed a hint in the script_path. - // TODO: Consider passing the script content via a sidecar mechanism. - Ok(Some(results.into_iter().map(|(jr, _)| jr).collect())) + // For tier 2, we don't have per-step log categories (orchestrator runs as one block) + Ok(Some( + results + .into_iter() + .map(|(jr, _script)| (jr, vec![])) + .collect(), + )) } fn has_unsupported_features(job_map: &serde_yaml::Mapping) -> bool { diff --git a/deploy/podman/compose.yml b/deploy/podman/compose.yml index 5ada5ba..d5c6f52 100644 --- a/deploy/podman/compose.yml +++ b/deploy/podman/compose.yml @@ -363,17 +363,21 @@ services: AMQP_RESULTS_QUEUE: solstice.runner-results.v1 # Forgejo runner configuration FORGEJO_URL: ${FORGEJO_URL} - FORGEJO_BASE_URL: ${FORGEJO_BASE_URL} - FORGEJO_TOKEN: ${FORGEJO_TOKEN} + # API base for fetching workflow files from repos (self-hosted Forgejo) + FORGEJO_BASE_URL: ${FORGEJO_URL}/api/v1 + FORGEJO_TOKEN: ${RUNNER_FORGEJO_TOKEN} RUNNER_REGISTRATION_TOKEN: ${RUNNER_REGISTRATION_TOKEN} RUNNER_NAME: solstice-runner-${ENV} RUNNER_LABELS: ${RUNNER_LABELS:-self-hosted} RUNNER_STATE_PATH: /data/runner-state.json MAX_CONCURRENCY: ${RUNNER_MAX_CONCURRENCY:-4} - LOGS_BASE_URL: https://logs.${ENV}.${DOMAIN} + # Internal URL for fetching logs (same Docker network) + LOGS_BASE_URL: http://solstice-logs-service:8082 depends_on: rabbitmq: condition: service_healthy + logs-service: + condition: service_started volumes: - runner-state:/data:Z networks: