2025-11-01 12:14:50 +01:00
|
|
|
use futures_util::StreamExt;
|
|
|
|
|
use miette::{IntoDiagnostic as _, Result};
|
2025-11-01 14:56:46 +01:00
|
|
|
use std::net::SocketAddr;
|
2025-11-01 12:14:50 +01:00
|
|
|
use tonic::{Request, Response, Status};
|
|
|
|
|
use tracing::{error, info, warn};
|
|
|
|
|
|
2025-11-01 14:56:46 +01:00
|
|
|
use common::runner::v1::{
|
|
|
|
|
Ack, LogItem,
|
|
|
|
|
runner_server::{Runner, RunnerServer},
|
|
|
|
|
};
|
2025-11-01 12:14:50 +01:00
|
|
|
use common::{MqConfig, publish_job_result};
|
2025-11-02 23:37:11 +01:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
|
|
use crate::persist::Persist;
|
2025-11-01 12:14:50 +01:00
|
|
|
|
|
|
|
|
pub struct RunnerSvc {
|
|
|
|
|
mq_cfg: MqConfig,
|
2025-11-02 23:37:11 +01:00
|
|
|
persist: Arc<Persist>,
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl RunnerSvc {
|
2025-11-02 23:37:11 +01:00
|
|
|
pub fn new(mq_cfg: MqConfig, persist: Arc<Persist>) -> Self {
|
|
|
|
|
Self { mq_cfg, persist }
|
2025-11-01 14:56:46 +01:00
|
|
|
}
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tonic::async_trait]
|
|
|
|
|
impl Runner for RunnerSvc {
|
|
|
|
|
async fn stream_logs(
|
|
|
|
|
&self,
|
|
|
|
|
request: Request<tonic::Streaming<LogItem>>,
|
|
|
|
|
) -> std::result::Result<Response<Ack>, Status> {
|
|
|
|
|
let mut stream = request.into_inner();
|
|
|
|
|
let mut req_id: Option<uuid::Uuid> = None;
|
|
|
|
|
let mut repo_url: Option<String> = None;
|
|
|
|
|
let mut commit_sha: Option<String> = None;
|
|
|
|
|
let mut exit_code: i32 = 0;
|
|
|
|
|
let mut success: bool = true;
|
2025-11-02 20:36:13 +01:00
|
|
|
let mut lines_stdout: usize = 0;
|
|
|
|
|
let mut lines_stderr: usize = 0;
|
|
|
|
|
let mut got_end: bool = false;
|
2025-11-02 23:37:11 +01:00
|
|
|
let mut seq: i64 = 0;
|
2025-11-01 12:14:50 +01:00
|
|
|
|
2025-11-02 20:36:13 +01:00
|
|
|
info!("runner log stream opened");
|
2025-11-01 14:56:46 +01:00
|
|
|
while let Some(item) = stream
|
|
|
|
|
.next()
|
|
|
|
|
.await
|
|
|
|
|
.transpose()
|
|
|
|
|
.map_err(|e| Status::internal(e.to_string()))?
|
|
|
|
|
{
|
2025-11-01 12:14:50 +01:00
|
|
|
// Correlate request id
|
|
|
|
|
if req_id.is_none() {
|
|
|
|
|
match uuid::Uuid::parse_str(&item.request_id) {
|
2025-11-02 20:36:13 +01:00
|
|
|
Ok(u) => {
|
|
|
|
|
info!(request_id = %u, "runner log stream identified");
|
|
|
|
|
req_id = Some(u)
|
|
|
|
|
}
|
2025-11-01 12:14:50 +01:00
|
|
|
Err(_) => {
|
|
|
|
|
warn!(request_id = %item.request_id, "invalid request_id from runner");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if let Some(ev) = item.event {
|
|
|
|
|
match ev {
|
|
|
|
|
common::runner::v1::log_item::Event::Log(chunk) => {
|
|
|
|
|
if chunk.stderr {
|
2025-11-02 20:36:13 +01:00
|
|
|
lines_stderr += 1;
|
2025-11-01 12:14:50 +01:00
|
|
|
info!(request_id = %item.request_id, line = %chunk.line, "runner:stderr");
|
|
|
|
|
} else {
|
2025-11-02 20:36:13 +01:00
|
|
|
lines_stdout += 1;
|
2025-11-01 12:14:50 +01:00
|
|
|
info!(request_id = %item.request_id, line = %chunk.line, "runner:stdout");
|
|
|
|
|
}
|
2025-11-02 23:37:11 +01:00
|
|
|
// Best effort: persist the line if we have a parsed UUID
|
|
|
|
|
if let Some(id) = req_id {
|
|
|
|
|
if let Err(e) = self.persist.record_log_line(id, seq, chunk.stderr, &chunk.line).await {
|
|
|
|
|
warn!(error = %e, request_id = %id, seq = seq, "failed to persist log line");
|
|
|
|
|
}
|
|
|
|
|
seq += 1;
|
|
|
|
|
}
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
common::runner::v1::log_item::Event::End(end) => {
|
|
|
|
|
exit_code = end.exit_code;
|
|
|
|
|
success = end.success;
|
|
|
|
|
repo_url = Some(end.repo_url);
|
|
|
|
|
commit_sha = Some(end.commit_sha);
|
2025-11-02 20:36:13 +01:00
|
|
|
got_end = true;
|
|
|
|
|
info!(exit_code, success, "runner log stream received End");
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-02 20:36:13 +01:00
|
|
|
info!(lines_stdout, lines_stderr, got_end, "runner log stream closed");
|
2025-11-01 12:14:50 +01:00
|
|
|
// Publish final status if we have enough context
|
2025-11-01 14:56:46 +01:00
|
|
|
if let (Some(id), Some(repo), Some(sha)) =
|
|
|
|
|
(req_id.as_ref(), repo_url.as_ref(), commit_sha.as_ref())
|
|
|
|
|
{
|
|
|
|
|
let result = common::messages::JobResult::new(
|
|
|
|
|
id.clone(),
|
|
|
|
|
repo.clone(),
|
|
|
|
|
sha.clone(),
|
|
|
|
|
success,
|
|
|
|
|
exit_code,
|
|
|
|
|
None,
|
|
|
|
|
);
|
2025-11-01 12:14:50 +01:00
|
|
|
if let Err(e) = publish_job_result(&self.mq_cfg, &result).await {
|
|
|
|
|
error!(error = %e, request_id = %id, "failed to publish JobResult");
|
|
|
|
|
}
|
|
|
|
|
} else {
|
2025-11-01 14:56:46 +01:00
|
|
|
warn!(
|
|
|
|
|
have_req_id = req_id.is_some(),
|
|
|
|
|
have_repo = repo_url.is_some(),
|
|
|
|
|
have_sha = commit_sha.is_some(),
|
|
|
|
|
"missing context for JobResult; skipping publish"
|
|
|
|
|
);
|
2025-11-01 12:14:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(Response::new(Ack { ok: true }))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-01 14:56:46 +01:00
|
|
|
pub async fn serve_with_shutdown(
|
|
|
|
|
addr: SocketAddr,
|
|
|
|
|
mq_cfg: MqConfig,
|
2025-11-02 23:37:11 +01:00
|
|
|
persist: Arc<Persist>,
|
2025-11-01 14:56:46 +01:00
|
|
|
shutdown: impl std::future::Future<Output = ()>,
|
|
|
|
|
) -> Result<()> {
|
2025-11-01 12:14:50 +01:00
|
|
|
info!(%addr, "gRPC server starting");
|
|
|
|
|
tonic::transport::Server::builder()
|
2025-11-02 23:37:11 +01:00
|
|
|
.add_service(RunnerServer::new(RunnerSvc::new(mq_cfg, persist)))
|
2025-11-01 12:14:50 +01:00
|
|
|
.serve_with_shutdown(addr, shutdown)
|
|
|
|
|
.await
|
|
|
|
|
.into_diagnostic()
|
|
|
|
|
}
|