Send incremental UpdateTask with step states during log streaming

Streamer now sends UpdateTask alongside UpdateLog on each poll so
Forgejo maps log lines to steps in real time, not just at completion.
This prevents "Set up job" from accumulating all streamed logs.
This commit is contained in:
Till Wegmueller 2026-04-07 00:44:02 +02:00
parent 49c3ab03c4
commit ceaac25a7e

View file

@ -5,7 +5,9 @@ use tracing::{debug, warn};
use uuid::Uuid;
use crate::connect::ConnectClient;
use crate::proto::runner::v1::{LogRow, UpdateLogRequest};
use crate::proto::runner::v1::{
self, LogRow, StepState, TaskState, UpdateLogRequest, UpdateTaskRequest,
};
use crate::state::{RunnerState, StepInfo};
const POLL_INTERVAL: Duration = Duration::from_secs(3);
@ -21,9 +23,8 @@ struct LogCategorySummary {
}
/// Streams logs from logs-service to Forgejo while a job is in-flight.
///
/// Categories are ordered: setup categories first, then step categories in
/// KDL workflow order (matching the YAML step order), then any remaining.
/// Also sends incremental `UpdateTask` with step states so Forgejo assigns
/// log lines to the correct steps in real time.
pub async fn stream_logs(
client: Arc<ConnectClient>,
state: Arc<RunnerState>,
@ -66,17 +67,14 @@ pub async fn stream_logs(
log_index
}
/// Sort categories: setup first, then step categories in KDL order, then any remaining.
/// Sort categories: setup first, then step categories in KDL order, then remaining.
fn sort_categories(categories: &mut [LogCategorySummary], steps: &[StepInfo]) {
categories.sort_by_key(|c| {
if SETUP_CATEGORIES.contains(&c.category.as_str()) {
// Setup categories come first, sub-sorted alphabetically
(0, 0, c.category.clone())
} else if let Some(pos) = steps.iter().position(|s| s.log_category == c.category) {
// Step categories in KDL workflow order
(1, pos, c.category.clone())
} else {
// Any remaining categories at the end
(2, 0, c.category.clone())
}
});
@ -112,8 +110,10 @@ async fn poll_and_send(
sort_categories(&mut categories, steps);
// Build the full ordered log by fetching each category
// Build the full ordered log and track per-category line counts
let mut all_lines: Vec<String> = Vec::new();
let mut category_counts: Vec<(String, i64)> = Vec::new();
for cat in &categories {
let url = format!(
"{}/jobs/{}/logs/{}",
@ -130,7 +130,10 @@ async fn poll_and_send(
_ => continue,
};
for line in text.lines() {
let lines: Vec<&str> = text.lines().collect();
let count = lines.len() as i64;
category_counts.push((cat.category.clone(), count));
for line in lines {
all_lines.push(line.to_string());
}
}
@ -140,6 +143,7 @@ async fn poll_and_send(
return current_index;
}
// Send new log lines
let new_lines = &all_lines[current_index as usize..];
let now = prost_types::Timestamp {
@ -155,23 +159,23 @@ async fn poll_and_send(
})
.collect();
let count = rows.len();
let new_count = rows.len();
let req = UpdateLogRequest {
let log_req = UpdateLogRequest {
task_id,
index: current_index,
rows,
no_more: false,
};
match client
.update_log(&req, &state.identity.uuid, &state.identity.token)
let new_index = match client
.update_log(&log_req, &state.identity.uuid, &state.identity.token)
.await
{
Ok(resp) => {
debug!(
task_id,
new_lines = count,
new_lines = new_count,
ack_index = resp.ack_index,
"streamed logs"
);
@ -179,7 +183,60 @@ async fn poll_and_send(
}
Err(e) => {
warn!(error = %e, task_id, "failed to stream logs");
current_index
return current_index;
}
};
// Build and send step states so Forgejo maps lines to steps in real time
let setup_lines: i64 = category_counts
.iter()
.filter(|(cat, _)| SETUP_CATEGORIES.contains(&cat.as_str()))
.map(|(_, count)| count)
.sum();
let mut step_states: Vec<StepState> = Vec::new();
let mut cursor = setup_lines;
for (step_idx, step_info) in steps.iter().enumerate() {
let step_lines = category_counts
.iter()
.find(|(cat, _)| *cat == step_info.log_category)
.map(|(_, count)| *count)
.unwrap_or(0);
if step_lines > 0 {
step_states.push(StepState {
id: step_idx as i64,
result: v1::Result::Unspecified as i32, // still running
started_at: Some(now.clone()),
stopped_at: None,
log_index: cursor,
log_length: step_lines,
});
}
cursor += step_lines;
}
// Send UpdateTask with current step states (incremental update)
if !step_states.is_empty() {
let task_req = UpdateTaskRequest {
state: Some(TaskState {
id: task_id,
result: v1::Result::Unspecified as i32, // still running
started_at: None,
stopped_at: None,
steps: step_states,
}),
outputs: Default::default(),
};
if let Err(e) = client
.update_task(&task_req, &state.identity.uuid, &state.identity.token)
.await
{
warn!(error = %e, task_id, "failed to update step states");
}
}
new_index
}