solstice-ci/crates/common/src/mq.rs
Till Wegmueller a71f9cc7d1
Initial Commit
Signed-off-by: Till Wegmueller <toasterson@gmail.com>
2025-10-25 20:01:08 +02:00

246 lines
7.8 KiB
Rust

use std::time::Duration;
use futures_util::StreamExt;
use lapin::{
options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions,
ConfirmSelectOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
},
types::{AMQPValue, FieldTable, LongString, ShortString},
BasicProperties, Channel, Connection, ConnectionProperties, Consumer,
};
use miette::{IntoDiagnostic as _, Result};
use tracing::{debug, error, info, instrument, warn};
use tracing::Instrument;
use crate::messages::JobRequest;
#[derive(Clone, Debug)]
pub struct MqConfig {
pub url: String,
pub exchange: String,
pub routing_key: String,
pub queue: String,
pub dlx: String,
pub dlq: String,
pub prefetch: u16,
}
impl Default for MqConfig {
fn default() -> Self {
Self {
url: std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()),
exchange: std::env::var("AMQP_EXCHANGE").unwrap_or_else(|_| "solstice.jobs".into()),
routing_key: std::env::var("AMQP_ROUTING_KEY").unwrap_or_else(|_| "jobrequest.v1".into()),
queue: std::env::var("AMQP_QUEUE").unwrap_or_else(|_| "solstice.jobs.v1".into()),
dlx: std::env::var("AMQP_DLX").unwrap_or_else(|_| "solstice.dlx".into()),
dlq: std::env::var("AMQP_DLQ").unwrap_or_else(|_| "solstice.jobs.v1.dlq".into()),
prefetch: std::env::var("AMQP_PREFETCH").ok().and_then(|s| s.parse().ok()).unwrap_or(64),
}
}
}
#[instrument(skip(cfg))]
pub async fn connect(cfg: &MqConfig) -> Result<Connection> {
Connection::connect(&cfg.url, ConnectionProperties::default())
.await
.into_diagnostic()
}
#[instrument(skip(channel, cfg))]
pub async fn declare_topology(channel: &Channel, cfg: &MqConfig) -> Result<()> {
// Declare main exchange (durable direct)
channel
.exchange_declare(
&cfg.exchange,
lapin::ExchangeKind::Direct,
ExchangeDeclareOptions {
durable: true,
auto_delete: false,
internal: false,
nowait: false,
passive: false,
},
FieldTable::default(),
)
.await
.into_diagnostic()?;
// Declare DLX
channel
.exchange_declare(
&cfg.dlx,
lapin::ExchangeKind::Fanout,
ExchangeDeclareOptions { durable: true, auto_delete: false, internal: false, nowait: false, passive: false },
FieldTable::default(),
)
.await
.into_diagnostic()?;
// Declare DLQ with dead-lettering from main queue
let mut dlq_args = FieldTable::default();
dlq_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(cfg.exchange.clone())),
);
channel
.queue_declare(
&cfg.dlq,
QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false },
dlq_args,
)
.await
.into_diagnostic()?;
// Bind DLQ to DLX (fanout)
channel
.queue_bind(
&cfg.dlq,
&cfg.dlx,
"",
QueueBindOptions { nowait: false },
FieldTable::default(),
)
.await
.into_diagnostic()?;
// Declare main queue with DLX
let mut q_args = FieldTable::default();
q_args.insert(
ShortString::from("x-dead-letter-exchange"),
AMQPValue::LongString(LongString::from(cfg.dlx.clone())),
);
channel
.queue_declare(
&cfg.queue,
QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false },
q_args,
)
.await
.into_diagnostic()?;
// Bind queue
channel
.queue_bind(
&cfg.queue,
&cfg.exchange,
&cfg.routing_key,
QueueBindOptions { nowait: false },
FieldTable::default(),
)
.await
.into_diagnostic()?;
Ok(())
}
#[instrument(skip(cfg, job))]
pub async fn publish_job(cfg: &MqConfig, job: &JobRequest) -> Result<()> {
let conn = connect(cfg).await?;
let channel = conn.create_channel().await.into_diagnostic()?;
declare_topology(&channel, cfg).await?;
// Enable publisher confirms
channel
.confirm_select(ConfirmSelectOptions::default())
.await
.into_diagnostic()?;
let payload = serde_json::to_vec(job).into_diagnostic()?;
let props = BasicProperties::default()
.with_content_type("application/json".into())
.with_content_encoding("utf-8".into())
.with_type(ShortString::from(job.schema_version.clone()))
.with_delivery_mode(2u8.into()); // persistent
let confirm = channel
.basic_publish(
&cfg.exchange,
&cfg.routing_key,
BasicPublishOptions { mandatory: true, ..Default::default() },
&payload,
props,
)
.await
.into_diagnostic()?;
// Wait for broker confirm
confirm.await.into_diagnostic()?;
Ok(())
}
pub struct DeliveryMeta {
pub delivery_tag: u64,
}
#[instrument(skip(cfg, handler))]
pub async fn consume_jobs<F, Fut>(cfg: &MqConfig, handler: F) -> Result<()>
where
F: Fn(JobRequest) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<()>> + Send + 'static,
{
let conn = connect(cfg).await?;
let channel = conn.create_channel().await.into_diagnostic()?;
declare_topology(&channel, cfg).await?;
// Set QoS
channel
.basic_qos(cfg.prefetch, BasicQosOptions { global: false })
.await
.into_diagnostic()?;
let consumer: Consumer = channel
.basic_consume(
&cfg.queue,
"orchestrator",
BasicConsumeOptions { no_ack: false, ..Default::default() },
FieldTable::default(),
)
.await
.into_diagnostic()?;
info!(queue = %cfg.queue, prefetch = cfg.prefetch, "consumer started");
tokio::pin!(consumer);
while let Some(delivery) = consumer.next().await {
match delivery {
Ok(d) => {
let tag = d.delivery_tag;
match serde_json::from_slice::<JobRequest>(&d.data) {
Ok(job) => {
let span = tracing::info_span!("handle_job", request_id = %job.request_id, repo = %job.repo_url, sha = %job.commit_sha);
if let Err(err) = handler(job).instrument(span).await {
error!(error = %err, "handler error; nacking to DLQ");
channel
.basic_nack(tag, BasicNackOptions { requeue: false, multiple: false })
.await
.into_diagnostic()?;
} else {
channel
.basic_ack(tag, BasicAckOptions { multiple: false })
.await
.into_diagnostic()?;
}
}
Err(e) => {
warn!(error = %e, "failed to deserialize JobRequest; dead-lettering");
channel
.basic_nack(tag, BasicNackOptions { requeue: false, multiple: false })
.await
.into_diagnostic()?;
}
}
}
Err(e) => {
error!(error = %e, "consumer delivery error; continuing");
// Backoff briefly to avoid tight loop on errors
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
}
Ok(())
}