Fix log streaming: no duplicates, proper step boundaries

- Streamer sends only new lines per category (tracks cursor per category)
- Reporter no longer re-uploads logs — only sets step state boundaries
  and sends the no_more marker
- Remove ::group:: markers that cluttered the Forgejo log viewer
- Step 0 (Set up job) gets setup categories (boot, env, tool_check)
- Step 1 (main step) gets workflow step output
This commit is contained in:
Till Wegmueller 2026-04-07 00:23:00 +02:00
parent 3a261b3f2e
commit 61fca2673d
2 changed files with 125 additions and 285 deletions

View file

@ -6,9 +6,7 @@ use tokio::sync::watch;
use tracing::{debug, info, warn};
use crate::connect::ConnectClient;
use crate::proto::runner::v1::{
self, LogRow, StepState, TaskState, UpdateLogRequest, UpdateTaskRequest,
};
use crate::proto::runner::v1::{self, StepState, TaskState, UpdateLogRequest, UpdateTaskRequest};
use crate::state::RunnerState;
/// Categories that belong to the "Set up job" phase.
@ -106,11 +104,10 @@ pub async fn run(
if let Some((_, mut task_meta)) =
state.in_flight.remove(&jobres.request_id)
{
// Stop the log streamer and wait briefly for it to flush
// Stop the log streamer and wait for final flush
if let Some(stop_tx) = task_meta.stream_stop.take() {
let _ = stop_tx.send(true);
// Give the streamer a moment to do its final poll
tokio::time::sleep(std::time::Duration::from_millis(500))
tokio::time::sleep(std::time::Duration::from_secs(1))
.await;
}
@ -160,8 +157,8 @@ pub async fn run(
#[derive(serde::Deserialize)]
struct LogCategory {
category: String,
#[allow(dead_code)]
count: i64,
#[allow(dead_code)]
has_errors: bool,
}
@ -182,13 +179,14 @@ async fn report_to_forgejo(
v1::Result::Failure
};
let mut log_index: i64 = 0;
// Build step states by querying log categories from logs-service
// to compute line counts per step. The streamer already uploaded the actual
// log content — we just need to tell Forgejo which log ranges map to which steps.
let mut step_states: Vec<StepState> = Vec::new();
let mut total_lines: i64 = 0;
if let Some(logs_base) = &state.logs_base_url {
let http = reqwest::Client::new();
// Fetch log categories
let categories_url = format!(
"{}/jobs/{}/logs",
logs_base.trim_end_matches('/'),
@ -202,141 +200,56 @@ async fn report_to_forgejo(
_ => vec![],
};
// Partition categories into setup vs step categories
let setup_cats: Vec<&LogCategory> = categories
// Count lines per phase to set step boundaries
let setup_lines: i64 = categories
.iter()
.filter(|c| SETUP_CATEGORIES.contains(&c.category.as_str()))
.collect();
let step_cats: Vec<&LogCategory> = categories
.iter()
.filter(|c| c.category.starts_with("step:"))
.collect();
let other_cats: Vec<&LogCategory> = categories
.iter()
.filter(|c| {
!SETUP_CATEGORIES.contains(&c.category.as_str()) && !c.category.starts_with("step:")
})
.collect();
.map(|c| c.count)
.sum();
// --- Step 0: "Set up job" — setup categories + boot logs ---
let setup_start = log_index;
let mut setup_has_errors = false;
for cat in &setup_cats {
setup_has_errors |= cat.has_errors;
log_index = upload_category_logs(
client,
state,
&http,
logs_base,
jobres,
task_meta,
&cat.category,
log_index,
&now,
)
.await;
}
let work_lines: i64 = categories
.iter()
.filter(|c| !SETUP_CATEGORIES.contains(&c.category.as_str()))
.map(|c| c.count)
.sum();
total_lines = setup_lines + work_lines;
// Step 0: "Set up job" — boot, env, tool_check etc.
step_states.push(StepState {
id: 0,
result: if setup_has_errors {
v1::Result::Failure as i32
} else {
v1::Result::Success as i32
},
result: v1::Result::Success as i32,
started_at: Some(now.clone()),
stopped_at: Some(now.clone()),
log_index: setup_start,
log_length: log_index - setup_start,
log_index: 0,
log_length: setup_lines,
});
// --- Step 1: The Actions YAML step — all KDL step logs + other categories ---
let work_start = log_index;
let mut work_has_errors = false;
for cat in &step_cats {
work_has_errors |= cat.has_errors;
log_index = upload_category_logs(
client,
state,
&http,
logs_base,
jobres,
task_meta,
&cat.category,
log_index,
&now,
)
.await;
}
for cat in &other_cats {
work_has_errors |= cat.has_errors;
log_index = upload_category_logs(
client,
state,
&http,
logs_base,
jobres,
task_meta,
&cat.category,
log_index,
&now,
)
.await;
}
// Step 1: The Actions YAML step — KDL workflow steps + other output
step_states.push(StepState {
id: 1,
result: if work_has_errors {
v1::Result::Failure as i32
} else {
result as i32
},
result: result as i32,
started_at: Some(now.clone()),
stopped_at: Some(now.clone()),
log_index: work_start,
log_length: log_index - work_start,
log_index: setup_lines,
log_length: work_lines,
});
// --- Step 2: "Complete job" — summary line ---
let complete_start = log_index;
let summary = format!(
"Job {} with exit code {}",
if jobres.success {
"succeeded"
} else {
"failed"
},
jobres.exit_code,
);
let summary_req = UpdateLogRequest {
task_id: task_meta.forgejo_task_id,
index: log_index,
rows: vec![LogRow {
time: Some(now.clone()),
content: summary,
}],
no_more: false,
};
if let Ok(resp) = client
.update_log(&summary_req, &state.identity.uuid, &state.identity.token)
.await
{
log_index = resp.ack_index;
} else {
log_index += 1;
}
// Step 2: "Complete job"
step_states.push(StepState {
id: 2,
result: result as i32,
started_at: Some(now.clone()),
stopped_at: Some(now.clone()),
log_index: complete_start,
log_length: log_index - complete_start,
log_index: total_lines,
log_length: 0,
});
}
// Send final "no more logs" marker
let final_log = UpdateLogRequest {
task_id: task_meta.forgejo_task_id,
index: log_index,
index: total_lines,
rows: vec![],
no_more: true,
};
@ -344,7 +257,7 @@ async fn report_to_forgejo(
.update_log(&final_log, &state.identity.uuid, &state.identity.token)
.await;
// --- Report task completion with step states ---
// Report task completion with step states
let req = UpdateTaskRequest {
state: Some(TaskState {
id: task_meta.forgejo_task_id,
@ -365,88 +278,9 @@ async fn report_to_forgejo(
task_id = task_meta.forgejo_task_id,
success = jobres.success,
exit_code = jobres.exit_code,
log_lines = log_index,
log_lines = total_lines,
"reported result to Forgejo"
);
Ok(())
}
/// Fetch logs for a single category from logs-service and upload to Forgejo.
/// Returns the updated log_index.
async fn upload_category_logs(
client: &ConnectClient,
state: &RunnerState,
http: &reqwest::Client,
logs_base: &str,
jobres: &common::messages::JobResult,
task_meta: &crate::state::TaskMeta,
category: &str,
start_index: i64,
now: &prost_types::Timestamp,
) -> i64 {
let url = format!(
"{}/jobs/{}/logs/{}",
logs_base.trim_end_matches('/'),
jobres.request_id,
category
);
let text = match http.get(&url).send().await {
Ok(resp) if resp.status().is_success() => match resp.text().await {
Ok(t) => t,
Err(_) => return start_index,
},
_ => return start_index,
};
let lines: Vec<&str> = text.lines().collect();
if lines.is_empty() {
return start_index;
}
// Add a header line for the category
let mut rows: Vec<LogRow> = Vec::with_capacity(lines.len() + 1);
rows.push(LogRow {
time: Some(now.clone()),
content: format!("::group::{}", category),
});
for line in &lines {
rows.push(LogRow {
time: Some(now.clone()),
content: line.to_string(),
});
}
rows.push(LogRow {
time: Some(now.clone()),
content: "::endgroup::".to_string(),
});
let line_count = rows.len() as i64;
let log_req = UpdateLogRequest {
task_id: task_meta.forgejo_task_id,
index: start_index,
rows,
no_more: false,
};
match client
.update_log(&log_req, &state.identity.uuid, &state.identity.token)
.await
{
Ok(resp) => {
debug!(
category,
lines = line_count,
ack_index = resp.ack_index,
"uploaded logs"
);
resp.ack_index
}
Err(e) => {
warn!(error = %e, category, "failed to upload logs");
start_index + line_count
}
}
}

View file

@ -10,9 +10,15 @@ use crate::state::RunnerState;
const POLL_INTERVAL: Duration = Duration::from_secs(3);
/// Log category summary from logs-service.
#[derive(serde::Deserialize)]
struct LogCategorySummary {
category: String,
count: i64,
}
/// Streams logs from logs-service to Forgejo while a job is in-flight.
/// Runs until the stop signal is received (job completed).
/// Returns the final log index so the reporter can continue from there.
/// Returns the final log index so the reporter knows where we left off.
pub async fn stream_logs(
client: Arc<ConnectClient>,
state: Arc<RunnerState>,
@ -22,13 +28,13 @@ pub async fn stream_logs(
mut stop: tokio::sync::watch::Receiver<bool>,
) -> i64 {
let http = reqwest::Client::new();
// Track how many lines we've sent per category to only send new ones.
let mut sent_per_category: std::collections::HashMap<String, usize> = Default::default();
let mut log_index: i64 = 0;
let mut last_total: i64 = 0;
loop {
// Check if we should stop
if *stop.borrow() {
// Do one final poll to catch any remaining logs
// Final flush
log_index = poll_and_send(
&client,
&state,
@ -37,7 +43,7 @@ pub async fn stream_logs(
request_id,
task_id,
log_index,
&mut last_total,
&mut sent_per_category,
)
.await;
break;
@ -51,18 +57,18 @@ pub async fn stream_logs(
request_id,
task_id,
log_index,
&mut last_total,
&mut sent_per_category,
)
.await;
// Wait for the next poll interval or stop signal
tokio::select! {
_ = tokio::time::sleep(POLL_INTERVAL) => {}
_ = stop.changed() => {
// Signal changed — do one more poll then exit
// Final flush
log_index = poll_and_send(
&client, &state, &http, &logs_base,
request_id, task_id, log_index, &mut last_total,
request_id, task_id, log_index,
&mut sent_per_category,
).await;
break;
}
@ -73,13 +79,6 @@ pub async fn stream_logs(
log_index
}
/// Log category summary from logs-service.
#[derive(serde::Deserialize)]
struct LogCategorySummary {
category: String,
count: i64,
}
async fn poll_and_send(
client: &ConnectClient,
state: &RunnerState,
@ -88,9 +87,8 @@ async fn poll_and_send(
request_id: Uuid,
task_id: i64,
current_index: i64,
last_total: &mut i64,
sent_per_category: &mut std::collections::HashMap<String, usize>,
) -> i64 {
// Get total log count across all categories
let categories_url = format!(
"{}/jobs/{}/logs",
logs_base.trim_end_matches('/'),
@ -105,46 +103,46 @@ async fn poll_and_send(
_ => return current_index,
};
let new_total: i64 = categories.iter().map(|c| c.count).sum();
if new_total <= *last_total {
return current_index; // No new logs
// Check if there are any new lines at all
let has_new = categories.iter().any(|c| {
let prev = sent_per_category.get(&c.category).copied().unwrap_or(0);
c.count as usize > prev
});
if !has_new {
return current_index;
}
*last_total = new_total;
// Fetch all logs and send new ones
// We re-fetch everything but only send lines from current_index onward.
// This is simple but not optimal for large logs — good enough for streaming.
let mut all_lines: Vec<String> = Vec::new();
let mut log_index = current_index;
for cat in &categories {
let prev_sent = sent_per_category.get(&cat.category).copied().unwrap_or(0);
if (cat.count as usize) <= prev_sent {
continue; // No new lines in this category
}
// Fetch all lines for this category
let url = format!(
"{}/jobs/{}/logs/{}",
logs_base.trim_end_matches('/'),
request_id,
cat.category
);
if let Ok(resp) = http.get(&url).send().await {
if resp.status().is_success() {
if let Ok(text) = resp.text().await {
all_lines.push(format!("::group::{}", cat.category));
for line in text.lines() {
all_lines.push(line.to_string());
}
all_lines.push("::endgroup::".to_string());
}
}
}
let text = match http.get(&url).send().await {
Ok(resp) if resp.status().is_success() => match resp.text().await {
Ok(t) => t,
Err(_) => continue,
},
_ => continue,
};
let all_lines: Vec<&str> = text.lines().collect();
if all_lines.len() <= prev_sent {
continue;
}
let total_lines = all_lines.len() as i64;
if total_lines <= current_index {
return current_index;
}
// Send only new lines
let new_lines = &all_lines[current_index as usize..];
if new_lines.is_empty() {
return current_index;
}
// Only send lines we haven't sent yet
let new_lines = &all_lines[prev_sent..];
let now = prost_types::Timestamp {
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
@ -155,13 +153,15 @@ async fn poll_and_send(
.iter()
.map(|line| LogRow {
time: Some(now.clone()),
content: line.clone(),
content: line.to_string(),
})
.collect();
let count = rows.len() as i64;
let req = UpdateLogRequest {
task_id,
index: current_index,
index: log_index,
rows,
no_more: false,
};
@ -173,15 +173,21 @@ async fn poll_and_send(
Ok(resp) => {
debug!(
task_id,
new_lines = new_lines.len(),
category = %cat.category,
new_lines = count,
ack_index = resp.ack_index,
"streamed logs"
);
resp.ack_index
log_index = resp.ack_index;
}
Err(e) => {
warn!(error = %e, task_id, "failed to stream logs");
current_index
warn!(error = %e, task_id, category = %cat.category, "failed to stream logs");
log_index += count;
}
}
sent_per_category.insert(cat.category.clone(), all_lines.len());
}
log_index
}