solstice-ci/crates/runner-integration/src/streamer.rs

179 lines
4.7 KiB
Rust
Raw Normal View History

use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, warn};
use uuid::Uuid;
use crate::connect::ConnectClient;
use crate::proto::runner::v1::{LogRow, UpdateLogRequest};
use crate::state::RunnerState;
const POLL_INTERVAL: Duration = Duration::from_secs(3);
/// Categories that belong to the "Set up job" phase — must match reporter.rs.
const SETUP_CATEGORIES: &[&str] = &["boot", "default", "env", "env_setup", "tool_check"];
/// Log category summary from logs-service.
#[derive(serde::Deserialize)]
struct LogCategorySummary {
category: String,
count: i64,
}
/// Streams logs from logs-service to Forgejo while a job is in-flight.
///
/// On each poll, fetches ALL log lines (sorted: setup categories first, then
/// work categories) and only sends lines beyond what Forgejo already has.
/// This ensures log indices always align with the reporter's step boundaries
/// regardless of the order categories appear in the DB.
pub async fn stream_logs(
client: Arc<ConnectClient>,
state: Arc<RunnerState>,
request_id: Uuid,
task_id: i64,
logs_base: String,
mut stop: tokio::sync::watch::Receiver<bool>,
) -> i64 {
let http = reqwest::Client::new();
let mut log_index: i64 = 0;
loop {
if *stop.borrow() {
log_index = poll_and_send(
&client, &state, &http, &logs_base, request_id, task_id, log_index,
)
.await;
break;
}
log_index = poll_and_send(
&client, &state, &http, &logs_base, request_id, task_id, log_index,
)
.await;
tokio::select! {
_ = tokio::time::sleep(POLL_INTERVAL) => {}
_ = stop.changed() => {
log_index = poll_and_send(
&client, &state, &http, &logs_base,
request_id, task_id, log_index,
).await;
break;
}
}
}
debug!(task_id, log_index, "log streamer stopped");
log_index
}
async fn poll_and_send(
client: &ConnectClient,
state: &RunnerState,
http: &reqwest::Client,
logs_base: &str,
request_id: Uuid,
task_id: i64,
current_index: i64,
) -> i64 {
let categories_url = format!(
"{}/jobs/{}/logs",
logs_base.trim_end_matches('/'),
request_id
);
let mut categories = match http.get(&categories_url).send().await {
Ok(resp) if resp.status().is_success() => resp
.json::<Vec<LogCategorySummary>>()
.await
.unwrap_or_default(),
_ => return current_index,
};
if categories.is_empty() {
return current_index;
}
// Sort: setup categories first, then work categories.
// This order must match the reporter's step boundary calculation.
categories.sort_by_key(|c| {
if SETUP_CATEGORIES.contains(&c.category.as_str()) {
(0, c.category.clone())
} else {
(1, c.category.clone())
}
});
// Build the full ordered log by fetching each category
let mut all_lines: Vec<String> = Vec::new();
for cat in &categories {
let url = format!(
"{}/jobs/{}/logs/{}",
logs_base.trim_end_matches('/'),
request_id,
cat.category
);
let text = match http.get(&url).send().await {
Ok(resp) if resp.status().is_success() => match resp.text().await {
Ok(t) => t,
Err(_) => continue,
},
_ => continue,
};
for line in text.lines() {
all_lines.push(line.to_string());
}
}
let total = all_lines.len() as i64;
if total <= current_index {
return current_index;
}
// Only send lines beyond what we've already sent
let new_lines = &all_lines[current_index as usize..];
let now = prost_types::Timestamp {
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
nanos: 0,
};
let rows: Vec<LogRow> = new_lines
.iter()
.map(|line| LogRow {
time: Some(now.clone()),
content: line.clone(),
})
.collect();
let count = rows.len();
let req = UpdateLogRequest {
task_id,
index: current_index,
rows,
no_more: false,
};
match client
.update_log(&req, &state.identity.uuid, &state.identity.token)
.await
{
Ok(resp) => {
debug!(
task_id,
new_lines = count,
ack_index = resp.ack_index,
"streamed logs"
);
resp.ack_index
}
Err(e) => {
warn!(error = %e, task_id, "failed to stream logs");
current_index
}
}
}