From 49c3ab03c483400a9d1963b47268bb709236831348b4930b773ce3adc16e53fe Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Tue, 7 Apr 2026 00:41:26 +0200 Subject: [PATCH] Map per-step log ranges to YAML steps using KDL step order - Streamer sorts step categories in KDL workflow order (not alphabetical) - Reporter emits one StepState per KDL step, each mapped by position to the corresponding YAML step ID - Setup logs auto-map to "Set up job", per-step logs to their steps --- crates/runner-integration/src/poller.rs | 2 + crates/runner-integration/src/reporter.rs | 69 ++++++++++++++++------- crates/runner-integration/src/streamer.rs | 43 ++++++++------ 3 files changed, 76 insertions(+), 38 deletions(-) diff --git a/crates/runner-integration/src/poller.rs b/crates/runner-integration/src/poller.rs index f24e701..bf14816 100644 --- a/crates/runner-integration/src/poller.rs +++ b/crates/runner-integration/src/poller.rs @@ -120,12 +120,14 @@ pub async fn run( let s_state = state.clone(); let s_request_id = jr.request_id; let s_logs_base = logs_base.clone(); + let s_steps = steps.clone(); tokio::spawn(crate::streamer::stream_logs( s_client, s_state, s_request_id, task_id, s_logs_base, + s_steps, stream_stop_rx, )); } diff --git a/crates/runner-integration/src/reporter.rs b/crates/runner-integration/src/reporter.rs index e654ebf..559408d 100644 --- a/crates/runner-integration/src/reporter.rs +++ b/crates/runner-integration/src/reporter.rs @@ -200,36 +200,65 @@ async fn report_to_forgejo( _ => vec![], }; - // Count lines per phase to set step boundaries + // Count setup lines (goes to Forgejo's virtual "Set up job") let setup_lines: i64 = categories .iter() .filter(|c| SETUP_CATEGORIES.contains(&c.category.as_str())) .map(|c| c.count) .sum(); - let work_lines: i64 = categories + // Build per-step boundaries matching KDL step order. + // Each KDL step maps to a YAML step by position (id=0,1,2...). + // The streamer sorts categories in the same order, so indices align. + let mut cursor = setup_lines; + for (step_idx, step_info) in task_meta.steps.iter().enumerate() { + let step_lines = categories + .iter() + .find(|c| c.category == step_info.log_category) + .map(|c| c.count) + .unwrap_or(0); + + step_states.push(StepState { + id: step_idx as i64, + result: result as i32, + started_at: Some(now.clone()), + stopped_at: Some(now.clone()), + log_index: cursor, + log_length: step_lines, + }); + cursor += step_lines; + } + + // If no steps were tracked (e.g. tier 2), use a single step for all work + if task_meta.steps.is_empty() { + let work_lines: i64 = categories + .iter() + .filter(|c| !SETUP_CATEGORIES.contains(&c.category.as_str())) + .map(|c| c.count) + .sum(); + step_states.push(StepState { + id: 0, + result: result as i32, + started_at: Some(now.clone()), + stopped_at: Some(now.clone()), + log_index: setup_lines, + log_length: work_lines, + }); + cursor = setup_lines + work_lines; + } + + // Account for any remaining categories not mapped to steps + let remaining: i64 = categories .iter() - .filter(|c| !SETUP_CATEGORIES.contains(&c.category.as_str())) + .filter(|c| { + !SETUP_CATEGORIES.contains(&c.category.as_str()) + && !task_meta.steps.iter().any(|s| s.log_category == c.category) + }) .map(|c| c.count) .sum(); + cursor += remaining; - total_lines = setup_lines + work_lines; - - // Forgejo's "Set up job" and "Complete job" are virtual UI steps — - // they auto-collect logs outside any real step's range. Only actual - // YAML steps need StepState entries with matching IDs. - // - // The YAML trigger has 1 step (id=0). Logs before its log_index - // go to "Set up job", logs in its range go to the step, and - // any logs after go to "Complete job". - step_states.push(StepState { - id: 0, - result: result as i32, - started_at: Some(now.clone()), - stopped_at: Some(now.clone()), - log_index: setup_lines, - log_length: work_lines, - }); + total_lines = cursor; } // Send final "no more logs" marker diff --git a/crates/runner-integration/src/streamer.rs b/crates/runner-integration/src/streamer.rs index 7135858..1197b1e 100644 --- a/crates/runner-integration/src/streamer.rs +++ b/crates/runner-integration/src/streamer.rs @@ -6,7 +6,7 @@ use uuid::Uuid; use crate::connect::ConnectClient; use crate::proto::runner::v1::{LogRow, UpdateLogRequest}; -use crate::state::RunnerState; +use crate::state::{RunnerState, StepInfo}; const POLL_INTERVAL: Duration = Duration::from_secs(3); @@ -22,16 +22,15 @@ struct LogCategorySummary { /// Streams logs from logs-service to Forgejo while a job is in-flight. /// -/// On each poll, fetches ALL log lines (sorted: setup categories first, then -/// work categories) and only sends lines beyond what Forgejo already has. -/// This ensures log indices always align with the reporter's step boundaries -/// regardless of the order categories appear in the DB. +/// Categories are ordered: setup categories first, then step categories in +/// KDL workflow order (matching the YAML step order), then any remaining. pub async fn stream_logs( client: Arc, state: Arc, request_id: Uuid, task_id: i64, logs_base: String, + steps: Vec, mut stop: tokio::sync::watch::Receiver, ) -> i64 { let http = reqwest::Client::new(); @@ -40,14 +39,14 @@ pub async fn stream_logs( loop { if *stop.borrow() { log_index = poll_and_send( - &client, &state, &http, &logs_base, request_id, task_id, log_index, + &client, &state, &http, &logs_base, request_id, task_id, log_index, &steps, ) .await; break; } log_index = poll_and_send( - &client, &state, &http, &logs_base, request_id, task_id, log_index, + &client, &state, &http, &logs_base, request_id, task_id, log_index, &steps, ) .await; @@ -56,7 +55,7 @@ pub async fn stream_logs( _ = stop.changed() => { log_index = poll_and_send( &client, &state, &http, &logs_base, - request_id, task_id, log_index, + request_id, task_id, log_index, &steps, ).await; break; } @@ -67,6 +66,22 @@ pub async fn stream_logs( log_index } +/// Sort categories: setup first, then step categories in KDL order, then any remaining. +fn sort_categories(categories: &mut [LogCategorySummary], steps: &[StepInfo]) { + categories.sort_by_key(|c| { + if SETUP_CATEGORIES.contains(&c.category.as_str()) { + // Setup categories come first, sub-sorted alphabetically + (0, 0, c.category.clone()) + } else if let Some(pos) = steps.iter().position(|s| s.log_category == c.category) { + // Step categories in KDL workflow order + (1, pos, c.category.clone()) + } else { + // Any remaining categories at the end + (2, 0, c.category.clone()) + } + }); +} + async fn poll_and_send( client: &ConnectClient, state: &RunnerState, @@ -75,6 +90,7 @@ async fn poll_and_send( request_id: Uuid, task_id: i64, current_index: i64, + steps: &[StepInfo], ) -> i64 { let categories_url = format!( "{}/jobs/{}/logs", @@ -94,15 +110,7 @@ async fn poll_and_send( return current_index; } - // Sort: setup categories first, then work categories. - // This order must match the reporter's step boundary calculation. - categories.sort_by_key(|c| { - if SETUP_CATEGORIES.contains(&c.category.as_str()) { - (0, c.category.clone()) - } else { - (1, c.category.clone()) - } - }); + sort_categories(&mut categories, steps); // Build the full ordered log by fetching each category let mut all_lines: Vec = Vec::new(); @@ -132,7 +140,6 @@ async fn poll_and_send( return current_index; } - // Only send lines beyond what we've already sent let new_lines = &all_lines[current_index as usize..]; let now = prost_types::Timestamp {