From f61588e68bcc67b2797e28169cbcec3dec790c783d26ba44f6981f76afd078cb Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Tue, 7 Apr 2026 00:32:54 +0200 Subject: [PATCH] Fix streamer category ordering to match step boundaries Streamer now rebuilds the full sorted log (setup categories first, then work categories) on each poll and only sends new lines. This ensures log indices align with the reporter's step boundary calculation regardless of when categories appear in the DB. --- crates/runner-integration/src/streamer.rs | 161 ++++++++++------------ 1 file changed, 73 insertions(+), 88 deletions(-) diff --git a/crates/runner-integration/src/streamer.rs b/crates/runner-integration/src/streamer.rs index 9fce4ed..7135858 100644 --- a/crates/runner-integration/src/streamer.rs +++ b/crates/runner-integration/src/streamer.rs @@ -10,6 +10,9 @@ use crate::state::RunnerState; const POLL_INTERVAL: Duration = Duration::from_secs(3); +/// Categories that belong to the "Set up job" phase — must match reporter.rs. +const SETUP_CATEGORIES: &[&str] = &["boot", "default", "env", "env_setup", "tool_check"]; + /// Log category summary from logs-service. #[derive(serde::Deserialize)] struct LogCategorySummary { @@ -18,7 +21,11 @@ struct LogCategorySummary { } /// Streams logs from logs-service to Forgejo while a job is in-flight. -/// Returns the final log index so the reporter knows where we left off. +/// +/// On each poll, fetches ALL log lines (sorted: setup categories first, then +/// work categories) and only sends lines beyond what Forgejo already has. +/// This ensures log indices always align with the reporter's step boundaries +/// regardless of the order categories appear in the DB. pub async fn stream_logs( client: Arc, state: Arc, @@ -28,47 +35,28 @@ pub async fn stream_logs( mut stop: tokio::sync::watch::Receiver, ) -> i64 { let http = reqwest::Client::new(); - // Track how many lines we've sent per category to only send new ones. - let mut sent_per_category: std::collections::HashMap = Default::default(); let mut log_index: i64 = 0; loop { if *stop.borrow() { - // Final flush log_index = poll_and_send( - &client, - &state, - &http, - &logs_base, - request_id, - task_id, - log_index, - &mut sent_per_category, + &client, &state, &http, &logs_base, request_id, task_id, log_index, ) .await; break; } log_index = poll_and_send( - &client, - &state, - &http, - &logs_base, - request_id, - task_id, - log_index, - &mut sent_per_category, + &client, &state, &http, &logs_base, request_id, task_id, log_index, ) .await; tokio::select! { _ = tokio::time::sleep(POLL_INTERVAL) => {} _ = stop.changed() => { - // Final flush log_index = poll_and_send( &client, &state, &http, &logs_base, request_id, task_id, log_index, - &mut sent_per_category, ).await; break; } @@ -87,7 +75,6 @@ async fn poll_and_send( request_id: Uuid, task_id: i64, current_index: i64, - sent_per_category: &mut std::collections::HashMap, ) -> i64 { let categories_url = format!( "{}/jobs/{}/logs", @@ -95,7 +82,7 @@ async fn poll_and_send( request_id ); - let categories = match http.get(&categories_url).send().await { + let mut categories = match http.get(&categories_url).send().await { Ok(resp) if resp.status().is_success() => resp .json::>() .await @@ -103,24 +90,23 @@ async fn poll_and_send( _ => return current_index, }; - // Check if there are any new lines at all - let has_new = categories.iter().any(|c| { - let prev = sent_per_category.get(&c.category).copied().unwrap_or(0); - c.count as usize > prev - }); - if !has_new { + if categories.is_empty() { return current_index; } - let mut log_index = current_index; - - for cat in &categories { - let prev_sent = sent_per_category.get(&cat.category).copied().unwrap_or(0); - if (cat.count as usize) <= prev_sent { - continue; // No new lines in this category + // Sort: setup categories first, then work categories. + // This order must match the reporter's step boundary calculation. + categories.sort_by_key(|c| { + if SETUP_CATEGORIES.contains(&c.category.as_str()) { + (0, c.category.clone()) + } else { + (1, c.category.clone()) } + }); - // Fetch all lines for this category + // Build the full ordered log by fetching each category + let mut all_lines: Vec = Vec::new(); + for cat in &categories { let url = format!( "{}/jobs/{}/logs/{}", logs_base.trim_end_matches('/'), @@ -136,58 +122,57 @@ async fn poll_and_send( _ => continue, }; - let all_lines: Vec<&str> = text.lines().collect(); - if all_lines.len() <= prev_sent { - continue; + for line in text.lines() { + all_lines.push(line.to_string()); } - - // Only send lines we haven't sent yet - let new_lines = &all_lines[prev_sent..]; - - let now = prost_types::Timestamp { - seconds: time::OffsetDateTime::now_utc().unix_timestamp(), - nanos: 0, - }; - - let rows: Vec = new_lines - .iter() - .map(|line| LogRow { - time: Some(now.clone()), - content: line.to_string(), - }) - .collect(); - - let count = rows.len() as i64; - - let req = UpdateLogRequest { - task_id, - index: log_index, - rows, - no_more: false, - }; - - match client - .update_log(&req, &state.identity.uuid, &state.identity.token) - .await - { - Ok(resp) => { - debug!( - task_id, - category = %cat.category, - new_lines = count, - ack_index = resp.ack_index, - "streamed logs" - ); - log_index = resp.ack_index; - } - Err(e) => { - warn!(error = %e, task_id, category = %cat.category, "failed to stream logs"); - log_index += count; - } - } - - sent_per_category.insert(cat.category.clone(), all_lines.len()); } - log_index + let total = all_lines.len() as i64; + if total <= current_index { + return current_index; + } + + // Only send lines beyond what we've already sent + let new_lines = &all_lines[current_index as usize..]; + + let now = prost_types::Timestamp { + seconds: time::OffsetDateTime::now_utc().unix_timestamp(), + nanos: 0, + }; + + let rows: Vec = new_lines + .iter() + .map(|line| LogRow { + time: Some(now.clone()), + content: line.clone(), + }) + .collect(); + + let count = rows.len(); + + let req = UpdateLogRequest { + task_id, + index: current_index, + rows, + no_more: false, + }; + + match client + .update_log(&req, &state.identity.uuid, &state.identity.token) + .await + { + Ok(resp) => { + debug!( + task_id, + new_lines = count, + ack_index = resp.ack_index, + "streamed logs" + ); + resp.ack_index + } + Err(e) => { + warn!(error = %e, task_id, "failed to stream logs"); + current_index + } + } }