diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index c67d566..4acab58 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -55,7 +55,18 @@ impl AppConfig { .or_else(|| kdl_map.get("AMQP_PREFETCH").and_then(|s| s.parse().ok())) .unwrap_or(64u16); - let mq = crate::mq::MqConfig { url, exchange, routing_key, queue, dlx, dlq, prefetch }; + let results_routing_key = std::env::var("AMQP_RESULTS_ROUTING_KEY") + .ok() + .or_else(|| kdl_map.get("AMQP_RESULTS_ROUTING_KEY").cloned()) + .or_else(|| std::env::var("RESULTS_ROUTING_KEY").ok()) + .unwrap_or_else(|| "jobresult.v1".into()); + let results_queue = std::env::var("AMQP_RESULTS_QUEUE") + .ok() + .or_else(|| kdl_map.get("AMQP_RESULTS_QUEUE").cloned()) + .or_else(|| std::env::var("RESULTS_QUEUE").ok()) + .unwrap_or_else(|| "solstice.results.v1".into()); + + let mq = crate::mq::MqConfig { url, exchange, routing_key, queue, results_routing_key, results_queue, dlx, dlq, prefetch }; Ok(Self { grpc_addr, http_addr, database_url, otlp_endpoint, mq }) } diff --git a/crates/common/src/mq.rs b/crates/common/src/mq.rs index f04c7a2..211d2cf 100644 --- a/crates/common/src/mq.rs +++ b/crates/common/src/mq.rs @@ -57,6 +57,8 @@ pub struct MqConfig { pub exchange: String, pub routing_key: String, pub queue: String, + pub results_routing_key: String, + pub results_queue: String, pub dlx: String, pub dlq: String, pub prefetch: u16, @@ -67,15 +69,17 @@ impl Default for MqConfig { Self { url: std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()), exchange: std::env::var("AMQP_EXCHANGE").unwrap_or_else(|_| "solstice.jobs".into()), - routing_key: std::env::var("AMQP_ROUTING_KEY") - .unwrap_or_else(|_| "jobrequest.v1".into()), + routing_key: std::env::var("AMQP_ROUTING_KEY").unwrap_or_else(|_| "jobrequest.v1".into()), queue: std::env::var("AMQP_QUEUE").unwrap_or_else(|_| "solstice.jobs.v1".into()), + results_routing_key: std::env::var("AMQP_RESULTS_ROUTING_KEY") + .or_else(|_| std::env::var("RESULTS_ROUTING_KEY")) + .unwrap_or_else(|_| "jobresult.v1".into()), + results_queue: std::env::var("AMQP_RESULTS_QUEUE") + .or_else(|_| std::env::var("RESULTS_QUEUE")) + .unwrap_or_else(|_| "solstice.results.v1".into()), dlx: std::env::var("AMQP_DLX").unwrap_or_else(|_| "solstice.dlx".into()), dlq: std::env::var("AMQP_DLQ").unwrap_or_else(|_| "solstice.jobs.v1.dlq".into()), - prefetch: std::env::var("AMQP_PREFETCH") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(64), + prefetch: std::env::var("AMQP_PREFETCH").ok().and_then(|s| s.parse().ok()).unwrap_or(64), } } } @@ -189,6 +193,33 @@ pub async fn declare_topology(channel: &Channel, cfg: &MqConfig) -> Result<()> { .await .into_diagnostic()?; + // Declare results queue (no DLX by default) and bind to results routing key + channel + .queue_declare( + &cfg.results_queue, + QueueDeclareOptions { + durable: true, + auto_delete: false, + exclusive: false, + nowait: false, + passive: false, + }, + FieldTable::default(), + ) + .await + .into_diagnostic()?; + + channel + .queue_bind( + &cfg.results_queue, + &cfg.exchange, + &cfg.results_routing_key, + QueueBindOptions { nowait: false }, + FieldTable::default(), + ) + .await + .into_diagnostic()?; + Ok(()) } @@ -401,7 +432,7 @@ pub async fn publish_job_result(cfg: &MqConfig, result: &JobResult) -> Result<() let confirm = channel .basic_publish( &cfg.exchange, - "jobresult.v1", + &cfg.results_routing_key, BasicPublishOptions { mandatory: true, immediate: false, diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 7a71cd6..00afb39 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "orchestrator" -version = "0.1.3" +version = "0.1.4" edition = "2024" build = "build.rs"