solstice-ci/crates/common/src/mq.rs
Till Wegmueller bddd36b16f
Add cooperative shutdown support for Scheduler and AMQP consumer
This commit updates the `Scheduler` to support cooperative shutdown using `Notify`, allowing graceful termination of tasks and cleanup of placeholder VMs. Additionally, the AMQP consumer is enhanced with an explicit shutdown mechanism, ensuring proper resource cleanup, including closing channels and connections.
2025-10-26 21:13:56 +01:00

287 lines
9.9 KiB
Rust

use std::time::Duration;
use futures_util::StreamExt;
use lapin::{
options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions,
ConfirmSelectOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, BasicCancelOptions,
},
types::{AMQPValue, FieldTable, LongString, ShortString},
BasicProperties, Channel, Connection, ConnectionProperties, Consumer,
};
use miette::{IntoDiagnostic as _, Result};
use tracing::{debug, error, info, instrument, warn};
use tracing::Instrument;
use crate::messages::JobRequest;
#[derive(Clone, Debug)]
pub struct MqConfig {
pub url: String,
pub exchange: String,
pub routing_key: String,
pub queue: String,
pub dlx: String,
pub dlq: String,
pub prefetch: u16,
}
impl Default for MqConfig {
fn default() -> Self {
Self {
url: std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()),
exchange: std::env::var("AMQP_EXCHANGE").unwrap_or_else(|_| "solstice.jobs".into()),
routing_key: std::env::var("AMQP_ROUTING_KEY").unwrap_or_else(|_| "jobrequest.v1".into()),
queue: std::env::var("AMQP_QUEUE").unwrap_or_else(|_| "solstice.jobs.v1".into()),
dlx: std::env::var("AMQP_DLX").unwrap_or_else(|_| "solstice.dlx".into()),
dlq: std::env::var("AMQP_DLQ").unwrap_or_else(|_| "solstice.jobs.v1.dlq".into()),
prefetch: std::env::var("AMQP_PREFETCH").ok().and_then(|s| s.parse().ok()).unwrap_or(64),
}
}
}
#[instrument(skip(cfg))]
pub async fn connect(cfg: &MqConfig) -> Result<Connection> {
Connection::connect(&cfg.url, ConnectionProperties::default())
.await
.into_diagnostic()
}
#[instrument(skip(channel, cfg))]
pub async fn declare_topology(channel: &Channel, cfg: &MqConfig) -> Result<()> {
// Declare main exchange (durable direct)
channel
.exchange_declare(
&cfg.exchange,
lapin::ExchangeKind::Direct,
ExchangeDeclareOptions {
durable: true,
auto_delete: false,
internal: false,
nowait: false,
passive: false,
},
FieldTable::default(),
)
.await
.into_diagnostic()?;
// Declare DLX
channel
.exchange_declare(
&cfg.dlx,
lapin::ExchangeKind::Fanout,
ExchangeDeclareOptions { durable: true, auto_delete: false, internal: false, nowait: false, passive: false },
FieldTable::default(),
)
.await
.into_diagnostic()?;
// Declare DLQ with dead-lettering from main queue
let mut dlq_args = FieldTable::default();
dlq_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(cfg.exchange.clone())),
);
channel
.queue_declare(
&cfg.dlq,
QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false },
dlq_args,
)
.await
.into_diagnostic()?;
// Bind DLQ to DLX (fanout)
channel
.queue_bind(
&cfg.dlq,
&cfg.dlx,
"",
QueueBindOptions { nowait: false },
FieldTable::default(),
)
.await
.into_diagnostic()?;
// Declare main queue with DLX
let mut q_args = FieldTable::default();
q_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(cfg.dlx.clone())),
);
channel
.queue_declare(
&cfg.queue,
QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false },
q_args,
)
.await
.into_diagnostic()?;
// Bind queue
channel
.queue_bind(
&cfg.queue,
&cfg.exchange,
&cfg.routing_key,
QueueBindOptions { nowait: false },
FieldTable::default(),
)
.await
.into_diagnostic()?;
Ok(())
}
#[instrument(skip(cfg, job))]
pub async fn publish_job(cfg: &MqConfig, job: &JobRequest) -> Result<()> {
let conn = connect(cfg).await?;
let channel = conn.create_channel().await.into_diagnostic()?;
declare_topology(&channel, cfg).await?;
// Enable publisher confirms
channel
.confirm_select(ConfirmSelectOptions::default())
.await
.into_diagnostic()?;
let payload = serde_json::to_vec(job).into_diagnostic()?;
let props = BasicProperties::default()
.with_content_type("application/json".into())
.with_content_encoding("utf-8".into())
.with_type(ShortString::from(job.schema_version.clone()))
.with_delivery_mode(2u8.into()); // persistent
let confirm = channel
.basic_publish(
&cfg.exchange,
&cfg.routing_key,
BasicPublishOptions { mandatory: true, ..Default::default() },
&payload,
props,
)
.await
.into_diagnostic()?;
// Wait for broker confirm
confirm.await.into_diagnostic()?;
Ok(())
}
pub struct DeliveryMeta {
pub delivery_tag: u64,
}
#[instrument(skip(cfg, handler))]
pub async fn consume_jobs<F, Fut>(cfg: &MqConfig, handler: F) -> Result<()>
where
F: Fn(JobRequest) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<()>> + Send + 'static,
{
// Backward-compatible wrapper that runs until process exit (no cooperative shutdown)
consume_jobs_until(cfg, futures_util::future::pending(), handler).await
}
#[instrument(skip(cfg, shutdown, handler))]
pub async fn consume_jobs_until<F, Fut, S>(cfg: &MqConfig, shutdown: S, handler: F) -> Result<()>
where
F: Fn(JobRequest) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<()>> + Send + 'static,
S: std::future::Future<Output = ()> + Send + 'static,
{
let conn = connect(cfg).await?;
let channel = conn.create_channel().await.into_diagnostic()?;
declare_topology(&channel, cfg).await?;
// Set QoS
channel
.basic_qos(cfg.prefetch, BasicQosOptions { global: false })
.await
.into_diagnostic()?;
let consumer: Consumer = channel
.basic_consume(
&cfg.queue,
"orchestrator",
BasicConsumeOptions { no_ack: false, ..Default::default() },
FieldTable::default(),
)
.await
.into_diagnostic()?;
info!(queue = %cfg.queue, prefetch = cfg.prefetch, "consumer started");
tokio::pin!(consumer);
let mut shutdown: core::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> = Box::pin(shutdown);
'consume: loop {
tokio::select! {
_ = &mut shutdown => {
info!("shutdown requested; canceling consumer");
// Best-effort cancel; do not wait for confirmation.
//let _ = tokio::time::timeout(Duration::from_millis(300), channel.basic_cancel("orchestrator", BasicCancelOptions { nowait: true })).await;
break 'consume;
}
maybe_delivery = consumer.next() => {
match maybe_delivery {
Some(Ok(d)) => {
let tag = d.delivery_tag;
match serde_json::from_slice::<JobRequest>(&d.data) {
Ok(job) => {
let span = tracing::info_span!("handle_job", request_id = %job.request_id, repo = %job.repo_url, sha = %job.commit_sha);
if let Err(err) = handler(job).instrument(span).await {
error!(error = %err, "handler error; nacking to DLQ");
channel
.basic_nack(tag, BasicNackOptions { requeue: false, multiple: false })
.await
.into_diagnostic()?;
} else {
channel
.basic_ack(tag, BasicAckOptions { multiple: false })
.await
.into_diagnostic()?;
}
}
Err(e) => {
warn!(error = %e, "failed to deserialize JobRequest; dead-lettering");
channel
.basic_nack(tag, BasicNackOptions { requeue: false, multiple: false })
.await
.into_diagnostic()?;
}
}
}
Some(Err(e)) => {
error!(error = %e, "consumer delivery error; continuing");
tokio::time::sleep(Duration::from_millis(200)).await;
}
None => {
// Stream ended; exit loop.
break;
}
}
}
}
}
// Explicitly drop the consumer stream before closing channel/connection to stop background tasks
drop(consumer);
// Close channel and connection to stop heartbeats and background tasks
match tokio::time::timeout(Duration::from_secs(2), channel.close(200, "shutdown")).await {
Ok(Ok(_)) => {},
Ok(Err(e)) => warn!(error = %e, "failed to close AMQP channel"),
Err(_) => warn!("timeout while closing AMQP channel"),
}
match tokio::time::timeout(Duration::from_secs(2), conn.close(200, "shutdown")).await {
Ok(Ok(_)) => {},
Ok(Err(e)) => warn!(error = %e, "failed to close AMQP connection"),
Err(_) => warn!("timeout while closing AMQP connection"),
}
info!("consume_jobs completed");
Ok(())
}