solstice-ci/docs/ai/2025-10-25-rabbitmq-integration.md
Till Wegmueller a71f9cc7d1
Initial Commit
Signed-off-by: Till Wegmueller <toasterson@gmail.com>
2025-10-25 20:01:08 +02:00

117 lines
5 KiB
Markdown

### Solstice CI — RabbitMQ Integration and Internal JobRequest Schema (v1)
This document describes the first iteration of our Integration Layer wiring to a real RabbitMQ broker, the internal message schema used between Integration and Orchestrator, and the common utilities that ensure consistent messaging topology across services.
#### Goals
- Decouple Integration and Orchestrator via a durable message queue.
- Use a versioned, forward-compatible message schema (`jobrequest.v1`).
- Centralize messaging code in `crates/common` so all services share the same setup and patterns.
- Follow RabbitMQ best practices: durable exchanges/queues, DLX/DLQ, publisher confirms, consumer prefetch, explicit ack/nack.
---
### Topology (created automatically by services)
- Exchange (direct, durable): `solstice.jobs`
- Routing key: `jobrequest.v1` (aligns with schema version)
- Queue (durable): `solstice.jobs.v1`
- Dead-letter exchange (fanout, durable): `solstice.dlx`
- Dead-letter queue (durable): `solstice.jobs.v1.dlq`
- Bindings:
- `solstice.jobs.v1` is bound to `solstice.jobs` with `jobrequest.v1`.
- `solstice.jobs.v1.dlq` is bound to `solstice.dlx` (fanout, no routing key).
Characteristics:
- Publishing uses `mandatory = true`, `delivery_mode = 2` (persistent), and publisher confirms.
- Consumers use `basic_qos(prefetch)` for backpressure and explicitly `ack` success or `nack(requeue=false)` on error → DLQ.
---
### Shared Code in `crates/common`
- `common::messages::JobRequest` (serde-JSON; versioned schema):
- `schema_version: "jobrequest.v1"` (default)
- `request_id: uuid::Uuid`
- `source: enum { github, forgejo, manual }`
- `repo_url: String`
- `commit_sha: String`
- `workflow_path: Option<String>` — optional KDL workflow path
- `workflow_job_id: Option<String>` — optional specific job id to run
- `runs_on: Option<String>` — scheduling hint
- `submitted_at: time::OffsetDateTime` (UTC)
- `common::mq`:
- `MqConfig { url, exchange, routing_key, queue, dlx, dlq, prefetch }` — env/CLI configurable.
- `declare_topology(&Channel, &MqConfig)` — idempotent, called by both publishers and consumers.
- `publish_job(&MqConfig, &JobRequest)` — JSON publish with confirms.
- `consume_jobs(&MqConfig, handler)` — starts a consumer with QoS, deserializes `JobRequest`, and `ack`/`nack`s accordingly.
---
### Service Wiring
- Forge Integration (`crates/forge-integration`):
- New CLI subcommand `enqueue` for developer testing:
- Flags: `--repo-url`, `--commit-sha`, `--runs-on` (optional)
- Uses `common::publish_job` to send a `JobRequest` to the broker.
- The service will later accept real webhooks and publish `JobRequest`s in response.
- Orchestrator (`crates/orchestrator`):
- Starts a consumer via `common::consume_jobs` and logs each received `JobRequest`.
- This is where capacity checks, scheduling, and job provisioning will be added next.
---
### Configuration (env or CLI)
- `AMQP_URL` (default `amqp://127.0.0.1:5672/%2f`)
- `AMQP_EXCHANGE` (default `solstice.jobs`)
- `AMQP_QUEUE` (default `solstice.jobs.v1`)
- `AMQP_ROUTING_KEY` (default `jobrequest.v1`)
- `AMQP_DLX` (default `solstice.dlx`)
- `AMQP_DLQ` (default `solstice.jobs.v1.dlq`)
- `AMQP_PREFETCH` (default `64`)
Each service exposes overrides via Clap flags that mirror the env vars.
---
### Local Development
1. Start RabbitMQ with Docker Compose:
```bash
docker compose up -d rabbitmq
# AMQP: localhost:5672, Management UI: http://localhost:15672 (guest/guest)
```
2. Run the Orchestrator consumer:
```bash
cargo run -p orchestrator -- \
--amqp-url amqp://127.0.0.1:5672/%2f \
--amqp-exchange solstice.jobs \
--amqp-queue solstice.jobs.v1 \
--amqp-routing-key jobrequest.v1 \
--amqp-prefetch 64
```
3. Enqueue a sample `JobRequest` from Forge Integration:
```bash
cargo run -p forge-integration -- enqueue \
--repo-url https://github.com/example/repo.git \
--commit-sha deadbeefdeadbeefdeadbeefdeadbeefdeadbeef \
--runs-on illumos-stable
```
You should see the orchestrator log receipt of the job and ack it. Any deserialization or handler error results in the message being dead-lettered to `solstice.jobs.v1.dlq`.
---
### Rationale and Best Practices Used
- Durable exchange/queue and persistent messages to avoid data loss on broker restarts.
- Publisher confirms and `mandatory` flag to ensure broker acceptance; failures can be surfaced to the publisher.
- DLX/DLQ for poison messages and non-transient failures, preventing consumer lockups.
- QoS prefetch to match consumer concurrency and protect Orchestrator capacity.
- Versioned routing key (`jobrequest.v1`) to allow schema evolution without breaking existing consumers.
- Centralized declaration logic (`common::mq`) to keep all services consistent.
---
### Next Steps
- Map real webhooks in Integration to `JobRequest` creation.
- Implement Orchestrator scheduling and VM provisioning based on `runs_on` and workload capacity.
- Add observability fields (trace context, forge metadata) to `JobRequest` as needed (additive only, maintain compatibility).