use std::time::Duration; use futures_util::StreamExt; use lapin::{ options::{ BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions, ConfirmSelectOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, BasicCancelOptions, }, types::{AMQPValue, FieldTable, LongString, ShortString}, BasicProperties, Channel, Connection, ConnectionProperties, Consumer, }; use miette::{IntoDiagnostic as _, Result}; use tracing::{debug, error, info, instrument, warn}; use tracing::Instrument; use crate::messages::JobRequest; #[derive(Clone, Debug)] pub struct MqConfig { pub url: String, pub exchange: String, pub routing_key: String, pub queue: String, pub dlx: String, pub dlq: String, pub prefetch: u16, } impl Default for MqConfig { fn default() -> Self { Self { url: std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()), exchange: std::env::var("AMQP_EXCHANGE").unwrap_or_else(|_| "solstice.jobs".into()), routing_key: std::env::var("AMQP_ROUTING_KEY").unwrap_or_else(|_| "jobrequest.v1".into()), queue: std::env::var("AMQP_QUEUE").unwrap_or_else(|_| "solstice.jobs.v1".into()), dlx: std::env::var("AMQP_DLX").unwrap_or_else(|_| "solstice.dlx".into()), dlq: std::env::var("AMQP_DLQ").unwrap_or_else(|_| "solstice.jobs.v1.dlq".into()), prefetch: std::env::var("AMQP_PREFETCH").ok().and_then(|s| s.parse().ok()).unwrap_or(64), } } } #[instrument(skip(cfg))] pub async fn connect(cfg: &MqConfig) -> Result { Connection::connect(&cfg.url, ConnectionProperties::default()) .await .into_diagnostic() } #[instrument(skip(channel, cfg))] pub async fn declare_topology(channel: &Channel, cfg: &MqConfig) -> Result<()> { // Declare main exchange (durable direct) channel .exchange_declare( &cfg.exchange, lapin::ExchangeKind::Direct, ExchangeDeclareOptions { durable: true, auto_delete: false, internal: false, nowait: false, passive: false, }, FieldTable::default(), ) .await .into_diagnostic()?; // Declare DLX channel .exchange_declare( &cfg.dlx, lapin::ExchangeKind::Fanout, ExchangeDeclareOptions { durable: true, auto_delete: false, internal: false, nowait: false, passive: false }, FieldTable::default(), ) .await .into_diagnostic()?; // Declare DLQ with dead-lettering from main queue let mut dlq_args = FieldTable::default(); dlq_args.insert( ShortString::from("x-dead-letter-exchange"), AMQPValue::LongString(LongString::from(cfg.exchange.clone())), ); channel .queue_declare( &cfg.dlq, QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false }, dlq_args, ) .await .into_diagnostic()?; // Bind DLQ to DLX (fanout) channel .queue_bind( &cfg.dlq, &cfg.dlx, "", QueueBindOptions { nowait: false }, FieldTable::default(), ) .await .into_diagnostic()?; // Declare main queue with DLX let mut q_args = FieldTable::default(); q_args.insert( ShortString::from("x-dead-letter-exchange"), AMQPValue::LongString(LongString::from(cfg.dlx.clone())), ); channel .queue_declare( &cfg.queue, QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false }, q_args, ) .await .into_diagnostic()?; // Bind queue channel .queue_bind( &cfg.queue, &cfg.exchange, &cfg.routing_key, QueueBindOptions { nowait: false }, FieldTable::default(), ) .await .into_diagnostic()?; Ok(()) } #[instrument(skip(cfg, job))] pub async fn publish_job(cfg: &MqConfig, job: &JobRequest) -> Result<()> { let conn = connect(cfg).await?; let channel = conn.create_channel().await.into_diagnostic()?; declare_topology(&channel, cfg).await?; // Enable publisher confirms channel .confirm_select(ConfirmSelectOptions::default()) .await .into_diagnostic()?; let payload = serde_json::to_vec(job).into_diagnostic()?; let props = BasicProperties::default() .with_content_type("application/json".into()) .with_content_encoding("utf-8".into()) .with_type(ShortString::from(job.schema_version.clone())) .with_delivery_mode(2u8.into()); // persistent let confirm = channel .basic_publish( &cfg.exchange, &cfg.routing_key, BasicPublishOptions { mandatory: true, ..Default::default() }, &payload, props, ) .await .into_diagnostic()?; // Wait for broker confirm confirm.await.into_diagnostic()?; Ok(()) } pub struct DeliveryMeta { pub delivery_tag: u64, } #[instrument(skip(cfg, handler))] pub async fn consume_jobs(cfg: &MqConfig, handler: F) -> Result<()> where F: Fn(JobRequest) -> Fut + Send + Sync + 'static, Fut: std::future::Future> + Send + 'static, { // Backward-compatible wrapper that runs until process exit (no cooperative shutdown) consume_jobs_until(cfg, futures_util::future::pending(), handler).await } #[instrument(skip(cfg, shutdown, handler))] pub async fn consume_jobs_until(cfg: &MqConfig, shutdown: S, handler: F) -> Result<()> where F: Fn(JobRequest) -> Fut + Send + Sync + 'static, Fut: std::future::Future> + Send + 'static, S: std::future::Future + Send + 'static, { let conn = connect(cfg).await?; let channel = conn.create_channel().await.into_diagnostic()?; declare_topology(&channel, cfg).await?; // Set QoS channel .basic_qos(cfg.prefetch, BasicQosOptions { global: false }) .await .into_diagnostic()?; let consumer: Consumer = channel .basic_consume( &cfg.queue, "orchestrator", BasicConsumeOptions { no_ack: false, ..Default::default() }, FieldTable::default(), ) .await .into_diagnostic()?; info!(queue = %cfg.queue, prefetch = cfg.prefetch, "consumer started"); tokio::pin!(consumer); let mut shutdown = Box::pin(shutdown); loop { tokio::select! { _ = &mut shutdown => { info!("shutdown requested; canceling consumer"); // Best-effort cancel; ignore errors during shutdown. let _ = channel.basic_cancel("orchestrator", BasicCancelOptions { nowait: false }).await; break; } maybe_delivery = consumer.next() => { match maybe_delivery { Some(Ok(d)) => { let tag = d.delivery_tag; match serde_json::from_slice::(&d.data) { Ok(job) => { let span = tracing::info_span!("handle_job", request_id = %job.request_id, repo = %job.repo_url, sha = %job.commit_sha); if let Err(err) = handler(job).instrument(span).await { error!(error = %err, "handler error; nacking to DLQ"); channel .basic_nack(tag, BasicNackOptions { requeue: false, multiple: false }) .await .into_diagnostic()?; } else { channel .basic_ack(tag, BasicAckOptions { multiple: false }) .await .into_diagnostic()?; } } Err(e) => { warn!(error = %e, "failed to deserialize JobRequest; dead-lettering"); channel .basic_nack(tag, BasicNackOptions { requeue: false, multiple: false }) .await .into_diagnostic()?; } } } Some(Err(e)) => { error!(error = %e, "consumer delivery error; continuing"); tokio::time::sleep(Duration::from_millis(200)).await; } None => { // Stream ended; exit loop. break; } } } } } Ok(()) }