From 3a261b3f2e38e579682e5d0435a5e71f559a6be0e50705cd20c0c7d1cf8c8e54 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Tue, 7 Apr 2026 00:13:54 +0200 Subject: [PATCH] Add log streaming and fix Forgejo step mapping - Stream logs to Forgejo in real-time during job execution (polls logs-service every 3s) - Map setup logs (boot, env, tool_check) to "Set up job" step - Map KDL workflow step logs to the main Actions step - Add summary line to "Complete job" step - Use ::group::/::endgroup:: markers for log category sections --- crates/runner-integration/src/main.rs | 1 + crates/runner-integration/src/poller.rs | 21 ++ crates/runner-integration/src/reporter.rs | 370 ++++++++++++++-------- crates/runner-integration/src/state.rs | 2 + crates/runner-integration/src/streamer.rs | 187 +++++++++++ 5 files changed, 451 insertions(+), 130 deletions(-) create mode 100644 crates/runner-integration/src/streamer.rs diff --git a/crates/runner-integration/src/main.rs b/crates/runner-integration/src/main.rs index 1a52e8d..0b0c6ab 100644 --- a/crates/runner-integration/src/main.rs +++ b/crates/runner-integration/src/main.rs @@ -3,6 +3,7 @@ mod poller; mod registration; mod reporter; mod state; +mod streamer; mod translator; pub mod proto { diff --git a/crates/runner-integration/src/poller.rs b/crates/runner-integration/src/poller.rs index 98fabe6..f24e701 100644 --- a/crates/runner-integration/src/poller.rs +++ b/crates/runner-integration/src/poller.rs @@ -86,6 +86,10 @@ pub async fn run( Ok(TranslateResult::Jobs(jobs)) => { let mut published_any = false; for (jr, steps) in &jobs { + // Create a stop channel for the log streamer + let (stream_stop_tx, stream_stop_rx) = + tokio::sync::watch::channel(false); + state.in_flight.insert( jr.request_id, TaskMeta { @@ -94,6 +98,7 @@ pub async fn run( commit_sha: jr.commit_sha.clone(), started_at: Instant::now(), steps: steps.clone(), + stream_stop: Some(stream_stop_tx), }, ); @@ -108,6 +113,22 @@ pub async fn run( "published JobRequest" ); published_any = true; + + // Spawn log streamer if logs-service is configured + if let Some(logs_base) = &state.logs_base_url { + let s_client = client.clone(); + let s_state = state.clone(); + let s_request_id = jr.request_id; + let s_logs_base = logs_base.clone(); + tokio::spawn(crate::streamer::stream_logs( + s_client, + s_state, + s_request_id, + task_id, + s_logs_base, + stream_stop_rx, + )); + } } Err(e) => { error!(error = %e, request_id = %jr.request_id, "failed to publish JobRequest"); diff --git a/crates/runner-integration/src/reporter.rs b/crates/runner-integration/src/reporter.rs index 62f7f99..373d783 100644 --- a/crates/runner-integration/src/reporter.rs +++ b/crates/runner-integration/src/reporter.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use futures_util::StreamExt; use miette::{IntoDiagnostic, Result}; use tokio::sync::watch; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; use crate::connect::ConnectClient; use crate::proto::runner::v1::{ @@ -11,6 +11,9 @@ use crate::proto::runner::v1::{ }; use crate::state::RunnerState; +/// Categories that belong to the "Set up job" phase. +const SETUP_CATEGORIES: &[&str] = &["boot", "default", "env", "env_setup", "tool_check"]; + /// Consume JobResults from RabbitMQ and report them back to Forgejo. pub async fn run( client: Arc, @@ -23,7 +26,6 @@ pub async fn run( .into_diagnostic()?; let channel = conn.create_channel().await.into_diagnostic()?; - // Ensure exchange exists channel .exchange_declare( &mq_cfg.exchange, @@ -40,7 +42,6 @@ pub async fn run( .await .into_diagnostic()?; - // Declare our own results queue (separate from forge-integration) let results_queue = &mq_cfg.results_queue; channel .queue_declare( @@ -102,10 +103,17 @@ pub async fn run( let tag = d.delivery_tag; match serde_json::from_slice::(&d.data) { Ok(jobres) => { - // Look up the in-flight task - if let Some((_, task_meta)) = + if let Some((_, mut task_meta)) = state.in_flight.remove(&jobres.request_id) { + // Stop the log streamer and wait briefly for it to flush + if let Some(stop_tx) = task_meta.stream_stop.take() { + let _ = stop_tx.send(true); + // Give the streamer a moment to do its final poll + tokio::time::sleep(std::time::Duration::from_millis(500)) + .await; + } + if let Err(e) = report_to_forgejo(&client, &state, &jobres, &task_meta).await { @@ -116,10 +124,8 @@ pub async fn run( "failed to report result to Forgejo" ); } - // Release the semaphore permit state.semaphore.add_permits(1); } - // If not in our in-flight map, it's for another consumer — ack anyway channel .basic_ack(tag, lapin::options::BasicAckOptions { multiple: false }) .await @@ -150,6 +156,15 @@ pub async fn run( Ok(()) } +/// Log category metadata from the logs-service. +#[derive(serde::Deserialize)] +struct LogCategory { + category: String, + #[allow(dead_code)] + count: i64, + has_errors: bool, +} + async fn report_to_forgejo( client: &ConnectClient, state: &RunnerState, @@ -167,136 +182,155 @@ async fn report_to_forgejo( v1::Result::Failure }; - // --- Fetch and upload logs --- let mut log_index: i64 = 0; let mut step_states: Vec = Vec::new(); - // Fetch all logs from logs-service and send to Forgejo if let Some(logs_base) = &state.logs_base_url { - // First get the log categories + let http = reqwest::Client::new(); + + // Fetch log categories let categories_url = format!( "{}/jobs/{}/logs", logs_base.trim_end_matches('/'), jobres.request_id ); - let http = reqwest::Client::new(); - match http.get(&categories_url).send().await { + let categories = match http.get(&categories_url).send().await { Ok(resp) if resp.status().is_success() => { - #[derive(serde::Deserialize)] - struct LogCategory { - category: String, - #[allow(dead_code)] - count: i64, - has_errors: bool, - } - - if let Ok(categories) = resp.json::>().await { - for (step_idx, cat) in categories.iter().enumerate() { - let step_log_start = log_index; - - // Fetch log lines for this category - let log_url = format!( - "{}/jobs/{}/logs/{}", - logs_base.trim_end_matches('/'), - jobres.request_id, - cat.category - ); - - match http.get(&log_url).send().await { - Ok(resp) if resp.status().is_success() => { - if let Ok(text) = resp.text().await { - let lines: Vec<&str> = text.lines().collect(); - let line_count = lines.len() as i64; - - if !lines.is_empty() { - // Build LogRow entries - let rows: Vec = lines - .iter() - .map(|line| LogRow { - time: Some(now.clone()), - content: line.to_string(), - }) - .collect(); - - // Send log chunk to Forgejo - let log_req = UpdateLogRequest { - task_id: task_meta.forgejo_task_id, - index: log_index, - rows, - no_more: false, - }; - - match client - .update_log( - &log_req, - &state.identity.uuid, - &state.identity.token, - ) - .await - { - Ok(resp) => { - debug!( - task_id = task_meta.forgejo_task_id, - category = %cat.category, - lines = line_count, - ack_index = resp.ack_index, - "uploaded logs" - ); - } - Err(e) => { - warn!( - error = %e, - category = %cat.category, - "failed to upload logs" - ); - } - } - - log_index += line_count; - } - - // Build step state for this category - let step_result = if cat.has_errors { - v1::Result::Failure - } else { - v1::Result::Success - }; - - step_states.push(StepState { - id: step_idx as i64, - result: step_result as i32, - started_at: Some(now.clone()), - stopped_at: Some(now.clone()), - log_index: step_log_start, - log_length: log_index - step_log_start, - }); - } - } - Ok(resp) => { - debug!( - status = %resp.status(), - category = %cat.category, - "failed to fetch log category" - ); - } - Err(e) => { - warn!(error = %e, category = %cat.category, "failed to fetch logs"); - } - } - } - } - } - Ok(resp) => { - debug!( - status = %resp.status(), - "failed to fetch log categories" - ); - } - Err(e) => { - warn!(error = %e, "failed to connect to logs-service"); + resp.json::>().await.unwrap_or_default() } + _ => vec![], + }; + + // Partition categories into setup vs step categories + let setup_cats: Vec<&LogCategory> = categories + .iter() + .filter(|c| SETUP_CATEGORIES.contains(&c.category.as_str())) + .collect(); + let step_cats: Vec<&LogCategory> = categories + .iter() + .filter(|c| c.category.starts_with("step:")) + .collect(); + let other_cats: Vec<&LogCategory> = categories + .iter() + .filter(|c| { + !SETUP_CATEGORIES.contains(&c.category.as_str()) && !c.category.starts_with("step:") + }) + .collect(); + + // --- Step 0: "Set up job" — setup categories + boot logs --- + let setup_start = log_index; + let mut setup_has_errors = false; + for cat in &setup_cats { + setup_has_errors |= cat.has_errors; + log_index = upload_category_logs( + client, + state, + &http, + logs_base, + jobres, + task_meta, + &cat.category, + log_index, + &now, + ) + .await; } + step_states.push(StepState { + id: 0, + result: if setup_has_errors { + v1::Result::Failure as i32 + } else { + v1::Result::Success as i32 + }, + started_at: Some(now.clone()), + stopped_at: Some(now.clone()), + log_index: setup_start, + log_length: log_index - setup_start, + }); + + // --- Step 1: The Actions YAML step — all KDL step logs + other categories --- + let work_start = log_index; + let mut work_has_errors = false; + for cat in &step_cats { + work_has_errors |= cat.has_errors; + log_index = upload_category_logs( + client, + state, + &http, + logs_base, + jobres, + task_meta, + &cat.category, + log_index, + &now, + ) + .await; + } + for cat in &other_cats { + work_has_errors |= cat.has_errors; + log_index = upload_category_logs( + client, + state, + &http, + logs_base, + jobres, + task_meta, + &cat.category, + log_index, + &now, + ) + .await; + } + step_states.push(StepState { + id: 1, + result: if work_has_errors { + v1::Result::Failure as i32 + } else { + result as i32 + }, + started_at: Some(now.clone()), + stopped_at: Some(now.clone()), + log_index: work_start, + log_length: log_index - work_start, + }); + + // --- Step 2: "Complete job" — summary line --- + let complete_start = log_index; + let summary = format!( + "Job {} with exit code {}", + if jobres.success { + "succeeded" + } else { + "failed" + }, + jobres.exit_code, + ); + let summary_req = UpdateLogRequest { + task_id: task_meta.forgejo_task_id, + index: log_index, + rows: vec![LogRow { + time: Some(now.clone()), + content: summary, + }], + no_more: false, + }; + if let Ok(resp) = client + .update_log(&summary_req, &state.identity.uuid, &state.identity.token) + .await + { + log_index = resp.ack_index; + } else { + log_index += 1; + } + step_states.push(StepState { + id: 2, + result: result as i32, + started_at: Some(now.clone()), + stopped_at: Some(now.clone()), + log_index: complete_start, + log_length: log_index - complete_start, + }); } // Send final "no more logs" marker @@ -306,19 +340,16 @@ async fn report_to_forgejo( rows: vec![], no_more: true, }; - if let Err(e) = client + let _ = client .update_log(&final_log, &state.identity.uuid, &state.identity.token) - .await - { - warn!(error = %e, "failed to send final log marker"); - } + .await; // --- Report task completion with step states --- let req = UpdateTaskRequest { state: Some(TaskState { id: task_meta.forgejo_task_id, result: result as i32, - started_at: None, // already reported when task started + started_at: None, stopped_at: Some(now), steps: step_states, }), @@ -340,3 +371,82 @@ async fn report_to_forgejo( Ok(()) } + +/// Fetch logs for a single category from logs-service and upload to Forgejo. +/// Returns the updated log_index. +async fn upload_category_logs( + client: &ConnectClient, + state: &RunnerState, + http: &reqwest::Client, + logs_base: &str, + jobres: &common::messages::JobResult, + task_meta: &crate::state::TaskMeta, + category: &str, + start_index: i64, + now: &prost_types::Timestamp, +) -> i64 { + let url = format!( + "{}/jobs/{}/logs/{}", + logs_base.trim_end_matches('/'), + jobres.request_id, + category + ); + + let text = match http.get(&url).send().await { + Ok(resp) if resp.status().is_success() => match resp.text().await { + Ok(t) => t, + Err(_) => return start_index, + }, + _ => return start_index, + }; + + let lines: Vec<&str> = text.lines().collect(); + if lines.is_empty() { + return start_index; + } + + // Add a header line for the category + let mut rows: Vec = Vec::with_capacity(lines.len() + 1); + rows.push(LogRow { + time: Some(now.clone()), + content: format!("::group::{}", category), + }); + for line in &lines { + rows.push(LogRow { + time: Some(now.clone()), + content: line.to_string(), + }); + } + rows.push(LogRow { + time: Some(now.clone()), + content: "::endgroup::".to_string(), + }); + + let line_count = rows.len() as i64; + + let log_req = UpdateLogRequest { + task_id: task_meta.forgejo_task_id, + index: start_index, + rows, + no_more: false, + }; + + match client + .update_log(&log_req, &state.identity.uuid, &state.identity.token) + .await + { + Ok(resp) => { + debug!( + category, + lines = line_count, + ack_index = resp.ack_index, + "uploaded logs" + ); + resp.ack_index + } + Err(e) => { + warn!(error = %e, category, "failed to upload logs"); + start_index + line_count + } + } +} diff --git a/crates/runner-integration/src/state.rs b/crates/runner-integration/src/state.rs index 47c0bb7..a9dc96f 100644 --- a/crates/runner-integration/src/state.rs +++ b/crates/runner-integration/src/state.rs @@ -33,6 +33,8 @@ pub struct TaskMeta { pub started_at: Instant, /// Known workflow steps (populated during translation for log/step reporting). pub steps: Vec, + /// Signal to stop the log streamer for this task. + pub stream_stop: Option>, } /// Shared state accessible by the poller and reporter tasks. diff --git a/crates/runner-integration/src/streamer.rs b/crates/runner-integration/src/streamer.rs new file mode 100644 index 0000000..753a023 --- /dev/null +++ b/crates/runner-integration/src/streamer.rs @@ -0,0 +1,187 @@ +use std::sync::Arc; +use std::time::Duration; + +use tracing::{debug, warn}; +use uuid::Uuid; + +use crate::connect::ConnectClient; +use crate::proto::runner::v1::{LogRow, UpdateLogRequest}; +use crate::state::RunnerState; + +const POLL_INTERVAL: Duration = Duration::from_secs(3); + +/// Streams logs from logs-service to Forgejo while a job is in-flight. +/// Runs until the stop signal is received (job completed). +/// Returns the final log index so the reporter can continue from there. +pub async fn stream_logs( + client: Arc, + state: Arc, + request_id: Uuid, + task_id: i64, + logs_base: String, + mut stop: tokio::sync::watch::Receiver, +) -> i64 { + let http = reqwest::Client::new(); + let mut log_index: i64 = 0; + let mut last_total: i64 = 0; + + loop { + // Check if we should stop + if *stop.borrow() { + // Do one final poll to catch any remaining logs + log_index = poll_and_send( + &client, + &state, + &http, + &logs_base, + request_id, + task_id, + log_index, + &mut last_total, + ) + .await; + break; + } + + log_index = poll_and_send( + &client, + &state, + &http, + &logs_base, + request_id, + task_id, + log_index, + &mut last_total, + ) + .await; + + // Wait for the next poll interval or stop signal + tokio::select! { + _ = tokio::time::sleep(POLL_INTERVAL) => {} + _ = stop.changed() => { + // Signal changed — do one more poll then exit + log_index = poll_and_send( + &client, &state, &http, &logs_base, + request_id, task_id, log_index, &mut last_total, + ).await; + break; + } + } + } + + debug!(task_id, log_index, "log streamer stopped"); + log_index +} + +/// Log category summary from logs-service. +#[derive(serde::Deserialize)] +struct LogCategorySummary { + category: String, + count: i64, +} + +async fn poll_and_send( + client: &ConnectClient, + state: &RunnerState, + http: &reqwest::Client, + logs_base: &str, + request_id: Uuid, + task_id: i64, + current_index: i64, + last_total: &mut i64, +) -> i64 { + // Get total log count across all categories + let categories_url = format!( + "{}/jobs/{}/logs", + logs_base.trim_end_matches('/'), + request_id + ); + + let categories = match http.get(&categories_url).send().await { + Ok(resp) if resp.status().is_success() => resp + .json::>() + .await + .unwrap_or_default(), + _ => return current_index, + }; + + let new_total: i64 = categories.iter().map(|c| c.count).sum(); + if new_total <= *last_total { + return current_index; // No new logs + } + *last_total = new_total; + + // Fetch all logs and send new ones + // We re-fetch everything but only send lines from current_index onward. + // This is simple but not optimal for large logs — good enough for streaming. + let mut all_lines: Vec = Vec::new(); + for cat in &categories { + let url = format!( + "{}/jobs/{}/logs/{}", + logs_base.trim_end_matches('/'), + request_id, + cat.category + ); + if let Ok(resp) = http.get(&url).send().await { + if resp.status().is_success() { + if let Ok(text) = resp.text().await { + all_lines.push(format!("::group::{}", cat.category)); + for line in text.lines() { + all_lines.push(line.to_string()); + } + all_lines.push("::endgroup::".to_string()); + } + } + } + } + + let total_lines = all_lines.len() as i64; + if total_lines <= current_index { + return current_index; + } + + // Send only new lines + let new_lines = &all_lines[current_index as usize..]; + if new_lines.is_empty() { + return current_index; + } + + let now = prost_types::Timestamp { + seconds: time::OffsetDateTime::now_utc().unix_timestamp(), + nanos: 0, + }; + + let rows: Vec = new_lines + .iter() + .map(|line| LogRow { + time: Some(now.clone()), + content: line.clone(), + }) + .collect(); + + let req = UpdateLogRequest { + task_id, + index: current_index, + rows, + no_more: false, + }; + + match client + .update_log(&req, &state.identity.uuid, &state.identity.token) + .await + { + Ok(resp) => { + debug!( + task_id, + new_lines = new_lines.len(), + ack_index = resp.ack_index, + "streamed logs" + ); + resp.ack_index + } + Err(e) => { + warn!(error = %e, task_id, "failed to stream logs"); + current_index + } + } +}