solstice-ci/crates/orchestrator/src/grpc.rs

159 lines
5.9 KiB
Rust
Raw Normal View History

use futures_util::StreamExt;
use miette::{IntoDiagnostic as _, Result};
2025-11-01 14:56:46 +01:00
use std::net::SocketAddr;
use tonic::{Request, Response, Status};
use tracing::{error, info, warn};
2025-11-01 14:56:46 +01:00
use common::runner::v1::{
Ack, LogItem,
runner_server::{Runner, RunnerServer},
};
use common::{MqConfig, publish_job_result};
use std::sync::Arc;
use crate::persist::Persist;
fn parse_owner_repo(repo_url: &str) -> Option<(String, String)> {
let url = repo_url.trim_end_matches(".git");
if let Some(rest) = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://")) {
let parts: Vec<&str> = rest.split('/').collect();
if parts.len() >= 3 { return Some((parts[1].to_string(), parts[2].to_string())); }
} else if let Some(rest) = url.strip_prefix("ssh://") {
// ssh://git@host/owner/repo
let after_host = rest.splitn(2, '/').nth(1)?;
let parts: Vec<&str> = after_host.split('/').collect();
if parts.len() >= 2 { return Some((parts[0].to_string(), parts[1].to_string())); }
} else if let Some(idx) = url.find(':') {
// git@host:owner/repo
let after = &url[idx+1..];
let parts: Vec<&str> = after.split('/').collect();
if parts.len() >= 2 { return Some((parts[0].to_string(), parts[1].to_string())); }
}
None
}
pub struct RunnerSvc {
mq_cfg: MqConfig,
persist: Arc<Persist>,
}
impl RunnerSvc {
pub fn new(mq_cfg: MqConfig, persist: Arc<Persist>) -> Self {
Self { mq_cfg, persist }
2025-11-01 14:56:46 +01:00
}
}
#[tonic::async_trait]
impl Runner for RunnerSvc {
async fn stream_logs(
&self,
request: Request<tonic::Streaming<LogItem>>,
) -> std::result::Result<Response<Ack>, Status> {
let mut stream = request.into_inner();
let mut req_id: Option<uuid::Uuid> = None;
let mut repo_url: Option<String> = None;
let mut commit_sha: Option<String> = None;
let mut exit_code: i32 = 0;
let mut success: bool = true;
let mut lines_stdout: usize = 0;
let mut lines_stderr: usize = 0;
let mut got_end: bool = false;
let mut seq: i64 = 0;
info!("runner log stream opened");
2025-11-01 14:56:46 +01:00
while let Some(item) = stream
.next()
.await
.transpose()
.map_err(|e| Status::internal(e.to_string()))?
{
// Correlate request id
if req_id.is_none() {
match uuid::Uuid::parse_str(&item.request_id) {
Ok(u) => {
info!(request_id = %u, "runner log stream identified");
req_id = Some(u)
}
Err(_) => {
warn!(request_id = %item.request_id, "invalid request_id from runner");
}
}
}
if let Some(ev) = item.event {
match ev {
common::runner::v1::log_item::Event::Log(chunk) => {
if chunk.stderr {
lines_stderr += 1;
info!(request_id = %item.request_id, line = %chunk.line, "runner:stderr");
} else {
lines_stdout += 1;
info!(request_id = %item.request_id, line = %chunk.line, "runner:stdout");
}
// Best effort: persist the line if we have a parsed UUID
if let Some(id) = req_id {
if let Err(e) = self.persist.record_log_line(id, seq, chunk.stderr, &chunk.line).await {
warn!(error = %e, request_id = %id, seq = seq, "failed to persist log line");
}
seq += 1;
}
}
common::runner::v1::log_item::Event::End(end) => {
exit_code = end.exit_code;
success = end.success;
repo_url = Some(end.repo_url);
commit_sha = Some(end.commit_sha);
got_end = true;
info!(exit_code, success, "runner log stream received End");
}
}
}
}
info!(lines_stdout, lines_stderr, got_end, "runner log stream closed");
// Publish final status if we have enough context
2025-11-01 14:56:46 +01:00
if let (Some(id), Some(repo), Some(sha)) =
(req_id.as_ref(), repo_url.as_ref(), commit_sha.as_ref())
{
let mut result = common::messages::JobResult::new(
2025-11-01 14:56:46 +01:00
id.clone(),
repo.clone(),
sha.clone(),
success,
exit_code,
None,
);
// Try to parse owner/repo for downstream integrations to avoid reparsing URLs
if let Some((owner, name)) = parse_owner_repo(&repo) {
result.repo_owner = Some(owner);
result.repo_name = Some(name);
}
if let Err(e) = publish_job_result(&self.mq_cfg, &result).await {
error!(error = %e, request_id = %id, "failed to publish JobResult");
}
} else {
2025-11-01 14:56:46 +01:00
warn!(
have_req_id = req_id.is_some(),
have_repo = repo_url.is_some(),
have_sha = commit_sha.is_some(),
"missing context for JobResult; skipping publish"
);
}
Ok(Response::new(Ack { ok: true }))
}
}
2025-11-01 14:56:46 +01:00
pub async fn serve_with_shutdown(
addr: SocketAddr,
mq_cfg: MqConfig,
persist: Arc<Persist>,
2025-11-01 14:56:46 +01:00
shutdown: impl std::future::Future<Output = ()>,
) -> Result<()> {
info!(%addr, "gRPC server starting");
tonic::transport::Server::builder()
.add_service(RunnerServer::new(RunnerSvc::new(mq_cfg, persist)))
.serve_with_shutdown(addr, shutdown)
.await
.into_diagnostic()
}