solstice-ci/crates/orchestrator/src/grpc.rs

82 lines
3.2 KiB
Rust
Raw Normal View History

use std::net::SocketAddr;
use futures_util::StreamExt;
use miette::{IntoDiagnostic as _, Result};
use tonic::{Request, Response, Status};
use tracing::{error, info, warn};
use common::{MqConfig, publish_job_result};
use common::runner::v1::{runner_server::{Runner, RunnerServer}, LogItem, Ack};
pub struct RunnerSvc {
mq_cfg: MqConfig,
}
impl RunnerSvc {
pub fn new(mq_cfg: MqConfig) -> Self { Self { mq_cfg } }
}
#[tonic::async_trait]
impl Runner for RunnerSvc {
async fn stream_logs(
&self,
request: Request<tonic::Streaming<LogItem>>,
) -> std::result::Result<Response<Ack>, Status> {
let mut stream = request.into_inner();
let mut req_id: Option<uuid::Uuid> = None;
let mut repo_url: Option<String> = None;
let mut commit_sha: Option<String> = None;
let mut exit_code: i32 = 0;
let mut success: bool = true;
while let Some(item) = stream.next().await.transpose().map_err(|e| Status::internal(e.to_string()))? {
// Correlate request id
if req_id.is_none() {
match uuid::Uuid::parse_str(&item.request_id) {
Ok(u) => req_id = Some(u),
Err(_) => {
warn!(request_id = %item.request_id, "invalid request_id from runner");
}
}
}
if let Some(ev) = item.event {
match ev {
common::runner::v1::log_item::Event::Log(chunk) => {
if chunk.stderr {
info!(request_id = %item.request_id, line = %chunk.line, "runner:stderr");
} else {
info!(request_id = %item.request_id, line = %chunk.line, "runner:stdout");
}
}
common::runner::v1::log_item::Event::End(end) => {
exit_code = end.exit_code;
success = end.success;
repo_url = Some(end.repo_url);
commit_sha = Some(end.commit_sha);
}
}
}
}
// Publish final status if we have enough context
if let (Some(id), Some(repo), Some(sha)) = (req_id, repo_url, commit_sha) {
let result = common::messages::JobResult::new(id, repo, sha, success, exit_code, None);
if let Err(e) = publish_job_result(&self.mq_cfg, &result).await {
error!(error = %e, request_id = %id, "failed to publish JobResult");
}
} else {
warn!(have_req_id = req_id.is_some(), have_repo = repo_url.is_some(), have_sha = commit_sha.is_some(), "missing context for JobResult; skipping publish");
}
Ok(Response::new(Ack { ok: true }))
}
}
pub async fn serve_with_shutdown(addr: SocketAddr, mq_cfg: MqConfig, shutdown: impl std::future::Future<Output = ()>) -> Result<()> {
info!(%addr, "gRPC server starting");
tonic::transport::Server::builder()
.add_service(RunnerServer::new(RunnerSvc::new(mq_cfg)))
.serve_with_shutdown(addr, shutdown)
.await
.into_diagnostic()
}