solstice-ci/crates/runner-integration/src/streamer.rs
Till Wegmueller ceaac25a7e Send incremental UpdateTask with step states during log streaming
Streamer now sends UpdateTask alongside UpdateLog on each poll so
Forgejo maps log lines to steps in real time, not just at completion.
This prevents "Set up job" from accumulating all streamed logs.
2026-04-07 00:44:02 +02:00

242 lines
6.8 KiB
Rust

use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, warn};
use uuid::Uuid;
use crate::connect::ConnectClient;
use crate::proto::runner::v1::{
self, LogRow, StepState, TaskState, UpdateLogRequest, UpdateTaskRequest,
};
use crate::state::{RunnerState, StepInfo};
const POLL_INTERVAL: Duration = Duration::from_secs(3);
/// Categories that belong to the "Set up job" phase — must match reporter.rs.
const SETUP_CATEGORIES: &[&str] = &["boot", "default", "env", "env_setup", "tool_check"];
/// Log category summary from logs-service.
#[derive(serde::Deserialize)]
struct LogCategorySummary {
category: String,
count: i64,
}
/// Streams logs from logs-service to Forgejo while a job is in-flight.
/// Also sends incremental `UpdateTask` with step states so Forgejo assigns
/// log lines to the correct steps in real time.
pub async fn stream_logs(
client: Arc<ConnectClient>,
state: Arc<RunnerState>,
request_id: Uuid,
task_id: i64,
logs_base: String,
steps: Vec<StepInfo>,
mut stop: tokio::sync::watch::Receiver<bool>,
) -> i64 {
let http = reqwest::Client::new();
let mut log_index: i64 = 0;
loop {
if *stop.borrow() {
log_index = poll_and_send(
&client, &state, &http, &logs_base, request_id, task_id, log_index, &steps,
)
.await;
break;
}
log_index = poll_and_send(
&client, &state, &http, &logs_base, request_id, task_id, log_index, &steps,
)
.await;
tokio::select! {
_ = tokio::time::sleep(POLL_INTERVAL) => {}
_ = stop.changed() => {
log_index = poll_and_send(
&client, &state, &http, &logs_base,
request_id, task_id, log_index, &steps,
).await;
break;
}
}
}
debug!(task_id, log_index, "log streamer stopped");
log_index
}
/// Sort categories: setup first, then step categories in KDL order, then remaining.
fn sort_categories(categories: &mut [LogCategorySummary], steps: &[StepInfo]) {
categories.sort_by_key(|c| {
if SETUP_CATEGORIES.contains(&c.category.as_str()) {
(0, 0, c.category.clone())
} else if let Some(pos) = steps.iter().position(|s| s.log_category == c.category) {
(1, pos, c.category.clone())
} else {
(2, 0, c.category.clone())
}
});
}
async fn poll_and_send(
client: &ConnectClient,
state: &RunnerState,
http: &reqwest::Client,
logs_base: &str,
request_id: Uuid,
task_id: i64,
current_index: i64,
steps: &[StepInfo],
) -> i64 {
let categories_url = format!(
"{}/jobs/{}/logs",
logs_base.trim_end_matches('/'),
request_id
);
let mut categories = match http.get(&categories_url).send().await {
Ok(resp) if resp.status().is_success() => resp
.json::<Vec<LogCategorySummary>>()
.await
.unwrap_or_default(),
_ => return current_index,
};
if categories.is_empty() {
return current_index;
}
sort_categories(&mut categories, steps);
// Build the full ordered log and track per-category line counts
let mut all_lines: Vec<String> = Vec::new();
let mut category_counts: Vec<(String, i64)> = Vec::new();
for cat in &categories {
let url = format!(
"{}/jobs/{}/logs/{}",
logs_base.trim_end_matches('/'),
request_id,
cat.category
);
let text = match http.get(&url).send().await {
Ok(resp) if resp.status().is_success() => match resp.text().await {
Ok(t) => t,
Err(_) => continue,
},
_ => continue,
};
let lines: Vec<&str> = text.lines().collect();
let count = lines.len() as i64;
category_counts.push((cat.category.clone(), count));
for line in lines {
all_lines.push(line.to_string());
}
}
let total = all_lines.len() as i64;
if total <= current_index {
return current_index;
}
// Send new log lines
let new_lines = &all_lines[current_index as usize..];
let now = prost_types::Timestamp {
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
nanos: 0,
};
let rows: Vec<LogRow> = new_lines
.iter()
.map(|line| LogRow {
time: Some(now.clone()),
content: line.clone(),
})
.collect();
let new_count = rows.len();
let log_req = UpdateLogRequest {
task_id,
index: current_index,
rows,
no_more: false,
};
let new_index = match client
.update_log(&log_req, &state.identity.uuid, &state.identity.token)
.await
{
Ok(resp) => {
debug!(
task_id,
new_lines = new_count,
ack_index = resp.ack_index,
"streamed logs"
);
resp.ack_index
}
Err(e) => {
warn!(error = %e, task_id, "failed to stream logs");
return current_index;
}
};
// Build and send step states so Forgejo maps lines to steps in real time
let setup_lines: i64 = category_counts
.iter()
.filter(|(cat, _)| SETUP_CATEGORIES.contains(&cat.as_str()))
.map(|(_, count)| count)
.sum();
let mut step_states: Vec<StepState> = Vec::new();
let mut cursor = setup_lines;
for (step_idx, step_info) in steps.iter().enumerate() {
let step_lines = category_counts
.iter()
.find(|(cat, _)| *cat == step_info.log_category)
.map(|(_, count)| *count)
.unwrap_or(0);
if step_lines > 0 {
step_states.push(StepState {
id: step_idx as i64,
result: v1::Result::Unspecified as i32, // still running
started_at: Some(now.clone()),
stopped_at: None,
log_index: cursor,
log_length: step_lines,
});
}
cursor += step_lines;
}
// Send UpdateTask with current step states (incremental update)
if !step_states.is_empty() {
let task_req = UpdateTaskRequest {
state: Some(TaskState {
id: task_id,
result: v1::Result::Unspecified as i32, // still running
started_at: None,
stopped_at: None,
steps: step_states,
}),
outputs: Default::default(),
};
if let Err(e) = client
.update_task(&task_req, &state.identity.uuid, &state.identity.token)
.await
{
warn!(error = %e, task_id, "failed to update step states");
}
}
new_index
}