### Solstice CI — RabbitMQ Integration and Internal JobRequest Schema (v1) This document describes the first iteration of our Integration Layer wiring to a real RabbitMQ broker, the internal message schema used between Integration and Orchestrator, and the common utilities that ensure consistent messaging topology across services. #### Goals - Decouple Integration and Orchestrator via a durable message queue. - Use a versioned, forward-compatible message schema (`jobrequest.v1`). - Centralize messaging code in `crates/common` so all services share the same setup and patterns. - Follow RabbitMQ best practices: durable exchanges/queues, DLX/DLQ, publisher confirms, consumer prefetch, explicit ack/nack. --- ### Topology (created automatically by services) - Exchange (direct, durable): `solstice.jobs` - Routing key: `jobrequest.v1` (aligns with schema version) - Queue (durable): `solstice.jobs.v1` - Dead-letter exchange (fanout, durable): `solstice.dlx` - Dead-letter queue (durable): `solstice.jobs.v1.dlq` - Bindings: - `solstice.jobs.v1` is bound to `solstice.jobs` with `jobrequest.v1`. - `solstice.jobs.v1.dlq` is bound to `solstice.dlx` (fanout, no routing key). Characteristics: - Publishing uses `mandatory = true`, `delivery_mode = 2` (persistent), and publisher confirms. - Consumers use `basic_qos(prefetch)` for backpressure and explicitly `ack` success or `nack(requeue=false)` on error → DLQ. --- ### Shared Code in `crates/common` - `common::messages::JobRequest` (serde-JSON; versioned schema): - `schema_version: "jobrequest.v1"` (default) - `request_id: uuid::Uuid` - `source: enum { github, forgejo, manual }` - `repo_url: String` - `commit_sha: String` - `workflow_path: Option` — optional KDL workflow path - `workflow_job_id: Option` — optional specific job id to run - `runs_on: Option` — scheduling hint - `submitted_at: time::OffsetDateTime` (UTC) - `common::mq`: - `MqConfig { url, exchange, routing_key, queue, dlx, dlq, prefetch }` — env/CLI configurable. - `declare_topology(&Channel, &MqConfig)` — idempotent, called by both publishers and consumers. - `publish_job(&MqConfig, &JobRequest)` — JSON publish with confirms. - `consume_jobs(&MqConfig, handler)` — starts a consumer with QoS, deserializes `JobRequest`, and `ack`/`nack`s accordingly. --- ### Service Wiring - Forge Integration (`crates/forge-integration`): - New CLI subcommand `enqueue` for developer testing: - Flags: `--repo-url`, `--commit-sha`, `--runs-on` (optional) - Uses `common::publish_job` to send a `JobRequest` to the broker. - The service will later accept real webhooks and publish `JobRequest`s in response. - Orchestrator (`crates/orchestrator`): - Starts a consumer via `common::consume_jobs` and logs each received `JobRequest`. - This is where capacity checks, scheduling, and job provisioning will be added next. --- ### Configuration (env or CLI) - `AMQP_URL` (default `amqp://127.0.0.1:5672/%2f`) - `AMQP_EXCHANGE` (default `solstice.jobs`) - `AMQP_QUEUE` (default `solstice.jobs.v1`) - `AMQP_ROUTING_KEY` (default `jobrequest.v1`) - `AMQP_DLX` (default `solstice.dlx`) - `AMQP_DLQ` (default `solstice.jobs.v1.dlq`) - `AMQP_PREFETCH` (default `64`) Each service exposes overrides via Clap flags that mirror the env vars. --- ### Local Development 1. Start RabbitMQ with Docker Compose: ```bash docker compose up -d rabbitmq # AMQP: localhost:5672, Management UI: http://localhost:15672 (guest/guest) ``` 2. Run the Orchestrator consumer: ```bash cargo run -p orchestrator -- \ --amqp-url amqp://127.0.0.1:5672/%2f \ --amqp-exchange solstice.jobs \ --amqp-queue solstice.jobs.v1 \ --amqp-routing-key jobrequest.v1 \ --amqp-prefetch 64 ``` 3. Enqueue a sample `JobRequest` from Forge Integration: ```bash cargo run -p forge-integration -- enqueue \ --repo-url https://github.com/example/repo.git \ --commit-sha deadbeefdeadbeefdeadbeefdeadbeefdeadbeef \ --runs-on illumos-stable ``` You should see the orchestrator log receipt of the job and ack it. Any deserialization or handler error results in the message being dead-lettered to `solstice.jobs.v1.dlq`. --- ### Rationale and Best Practices Used - Durable exchange/queue and persistent messages to avoid data loss on broker restarts. - Publisher confirms and `mandatory` flag to ensure broker acceptance; failures can be surfaced to the publisher. - DLX/DLQ for poison messages and non-transient failures, preventing consumer lockups. - QoS prefetch to match consumer concurrency and protect Orchestrator capacity. - Versioned routing key (`jobrequest.v1`) to allow schema evolution without breaking existing consumers. - Centralized declaration logic (`common::mq`) to keep all services consistent. --- ### Next Steps - Map real webhooks in Integration to `JobRequest` creation. - Implement Orchestrator scheduling and VM provisioning based on `runs_on` and workload capacity. - Add observability fields (trace context, forge metadata) to `JobRequest` as needed (additive only, maintain compatibility).