use futures_util::StreamExt; use miette::{IntoDiagnostic as _, Result}; use std::net::SocketAddr; use tonic::{Request, Response, Status}; use tracing::{debug, error, info, warn}; use common::runner::v1::{ Ack, LogItem, runner_server::{Runner, RunnerServer}, }; use common::{MqConfig, publish_job_result}; use std::sync::Arc; use crate::persist::Persist; fn parse_owner_repo(repo_url: &str) -> Option<(String, String)> { let url = repo_url.trim_end_matches(".git"); if let Some(rest) = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://")) { let parts: Vec<&str> = rest.split('/').collect(); if parts.len() >= 3 { return Some((parts[1].to_string(), parts[2].to_string())); } } else if let Some(rest) = url.strip_prefix("ssh://") { // ssh://git@host/owner/repo let after_host = rest.splitn(2, '/').nth(1)?; let parts: Vec<&str> = after_host.split('/').collect(); if parts.len() >= 2 { return Some((parts[0].to_string(), parts[1].to_string())); } } else if let Some(idx) = url.find(':') { // git@host:owner/repo let after = &url[idx+1..]; let parts: Vec<&str> = after.split('/').collect(); if parts.len() >= 2 { return Some((parts[0].to_string(), parts[1].to_string())); } } None } pub struct RunnerSvc { mq_cfg: MqConfig, persist: Arc, } impl RunnerSvc { pub fn new(mq_cfg: MqConfig, persist: Arc) -> Self { Self { mq_cfg, persist } } } #[tonic::async_trait] impl Runner for RunnerSvc { async fn stream_logs( &self, request: Request>, ) -> std::result::Result, Status> { let mut stream = request.into_inner(); let mut req_id: Option = None; let mut repo_url: Option = None; let mut commit_sha: Option = None; let mut exit_code: i32 = 0; let mut success: bool = true; let mut lines_stdout: usize = 0; let mut lines_stderr: usize = 0; let mut got_end: bool = false; let mut seq: i64 = 0; debug!("runner log stream opened"); while let Some(item) = stream .next() .await .transpose() .map_err(|e| Status::internal(e.to_string()))? { // Correlate request id if req_id.is_none() { match uuid::Uuid::parse_str(&item.request_id) { Ok(u) => { debug!(request_id = %u, "runner log stream identified"); req_id = Some(u) } Err(_) => { warn!(request_id = %item.request_id, "invalid request_id from runner"); } } } if let Some(ev) = item.event { match ev { common::runner::v1::log_item::Event::Log(chunk) => { if chunk.stderr { lines_stderr += 1; debug!(request_id = %item.request_id, line = %chunk.line, "runner:stderr"); } else { lines_stdout += 1; debug!(request_id = %item.request_id, line = %chunk.line, "runner:stdout"); } // Best effort: persist the line if we have a parsed UUID if let Some(id) = req_id { if let Err(e) = self.persist.record_log_line(id, seq, chunk.stderr, &chunk.line).await { warn!(error = %e, request_id = %id, seq = seq, "failed to persist log line"); } seq += 1; } } common::runner::v1::log_item::Event::End(end) => { exit_code = end.exit_code; success = end.success; repo_url = Some(end.repo_url); commit_sha = Some(end.commit_sha); got_end = true; debug!(exit_code, success, "runner log stream received End"); } } } } debug!(lines_stdout, lines_stderr, got_end, "runner log stream closed"); // Publish final status if we have enough context if let (Some(id), Some(repo), Some(sha)) = (req_id.as_ref(), repo_url.as_ref(), commit_sha.as_ref()) { let mut result = common::messages::JobResult::new( id.clone(), repo.clone(), sha.clone(), success, exit_code, None, ); // Try to parse owner/repo for downstream integrations to avoid reparsing URLs if let Some((owner, name)) = parse_owner_repo(&repo) { result.repo_owner = Some(owner); result.repo_name = Some(name); } if let Err(e) = publish_job_result(&self.mq_cfg, &result).await { error!(error = %e, request_id = %id, "failed to publish JobResult"); } } else { warn!( have_req_id = req_id.is_some(), have_repo = repo_url.is_some(), have_sha = commit_sha.is_some(), "missing context for JobResult; skipping publish" ); } Ok(Response::new(Ack { ok: true })) } } pub async fn serve_with_shutdown( addr: SocketAddr, mq_cfg: MqConfig, persist: Arc, shutdown: impl std::future::Future, ) -> Result<()> { info!(%addr, "gRPC server starting"); tonic::transport::Server::builder() .add_service(RunnerServer::new(RunnerSvc::new(mq_cfg, persist))) .serve_with_shutdown(addr, shutdown) .await .into_diagnostic() }