mirror of
https://codeberg.org/Toasterson/solstice-ci.git
synced 2026-04-10 21:30:41 +00:00
118 lines
5 KiB
Markdown
118 lines
5 KiB
Markdown
|
|
### Solstice CI — RabbitMQ Integration and Internal JobRequest Schema (v1)
|
||
|
|
|
||
|
|
This document describes the first iteration of our Integration Layer wiring to a real RabbitMQ broker, the internal message schema used between Integration and Orchestrator, and the common utilities that ensure consistent messaging topology across services.
|
||
|
|
|
||
|
|
#### Goals
|
||
|
|
- Decouple Integration and Orchestrator via a durable message queue.
|
||
|
|
- Use a versioned, forward-compatible message schema (`jobrequest.v1`).
|
||
|
|
- Centralize messaging code in `crates/common` so all services share the same setup and patterns.
|
||
|
|
- Follow RabbitMQ best practices: durable exchanges/queues, DLX/DLQ, publisher confirms, consumer prefetch, explicit ack/nack.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Topology (created automatically by services)
|
||
|
|
- Exchange (direct, durable): `solstice.jobs`
|
||
|
|
- Routing key: `jobrequest.v1` (aligns with schema version)
|
||
|
|
- Queue (durable): `solstice.jobs.v1`
|
||
|
|
- Dead-letter exchange (fanout, durable): `solstice.dlx`
|
||
|
|
- Dead-letter queue (durable): `solstice.jobs.v1.dlq`
|
||
|
|
- Bindings:
|
||
|
|
- `solstice.jobs.v1` is bound to `solstice.jobs` with `jobrequest.v1`.
|
||
|
|
- `solstice.jobs.v1.dlq` is bound to `solstice.dlx` (fanout, no routing key).
|
||
|
|
|
||
|
|
Characteristics:
|
||
|
|
- Publishing uses `mandatory = true`, `delivery_mode = 2` (persistent), and publisher confirms.
|
||
|
|
- Consumers use `basic_qos(prefetch)` for backpressure and explicitly `ack` success or `nack(requeue=false)` on error → DLQ.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Shared Code in `crates/common`
|
||
|
|
- `common::messages::JobRequest` (serde-JSON; versioned schema):
|
||
|
|
- `schema_version: "jobrequest.v1"` (default)
|
||
|
|
- `request_id: uuid::Uuid`
|
||
|
|
- `source: enum { github, forgejo, manual }`
|
||
|
|
- `repo_url: String`
|
||
|
|
- `commit_sha: String`
|
||
|
|
- `workflow_path: Option<String>` — optional KDL workflow path
|
||
|
|
- `workflow_job_id: Option<String>` — optional specific job id to run
|
||
|
|
- `runs_on: Option<String>` — scheduling hint
|
||
|
|
- `submitted_at: time::OffsetDateTime` (UTC)
|
||
|
|
|
||
|
|
- `common::mq`:
|
||
|
|
- `MqConfig { url, exchange, routing_key, queue, dlx, dlq, prefetch }` — env/CLI configurable.
|
||
|
|
- `declare_topology(&Channel, &MqConfig)` — idempotent, called by both publishers and consumers.
|
||
|
|
- `publish_job(&MqConfig, &JobRequest)` — JSON publish with confirms.
|
||
|
|
- `consume_jobs(&MqConfig, handler)` — starts a consumer with QoS, deserializes `JobRequest`, and `ack`/`nack`s accordingly.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Service Wiring
|
||
|
|
- Forge Integration (`crates/forge-integration`):
|
||
|
|
- New CLI subcommand `enqueue` for developer testing:
|
||
|
|
- Flags: `--repo-url`, `--commit-sha`, `--runs-on` (optional)
|
||
|
|
- Uses `common::publish_job` to send a `JobRequest` to the broker.
|
||
|
|
- The service will later accept real webhooks and publish `JobRequest`s in response.
|
||
|
|
|
||
|
|
- Orchestrator (`crates/orchestrator`):
|
||
|
|
- Starts a consumer via `common::consume_jobs` and logs each received `JobRequest`.
|
||
|
|
- This is where capacity checks, scheduling, and job provisioning will be added next.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Configuration (env or CLI)
|
||
|
|
- `AMQP_URL` (default `amqp://127.0.0.1:5672/%2f`)
|
||
|
|
- `AMQP_EXCHANGE` (default `solstice.jobs`)
|
||
|
|
- `AMQP_QUEUE` (default `solstice.jobs.v1`)
|
||
|
|
- `AMQP_ROUTING_KEY` (default `jobrequest.v1`)
|
||
|
|
- `AMQP_DLX` (default `solstice.dlx`)
|
||
|
|
- `AMQP_DLQ` (default `solstice.jobs.v1.dlq`)
|
||
|
|
- `AMQP_PREFETCH` (default `64`)
|
||
|
|
|
||
|
|
Each service exposes overrides via Clap flags that mirror the env vars.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Local Development
|
||
|
|
1. Start RabbitMQ with Docker Compose:
|
||
|
|
```bash
|
||
|
|
docker compose up -d rabbitmq
|
||
|
|
# AMQP: localhost:5672, Management UI: http://localhost:15672 (guest/guest)
|
||
|
|
```
|
||
|
|
|
||
|
|
2. Run the Orchestrator consumer:
|
||
|
|
```bash
|
||
|
|
cargo run -p orchestrator -- \
|
||
|
|
--amqp-url amqp://127.0.0.1:5672/%2f \
|
||
|
|
--amqp-exchange solstice.jobs \
|
||
|
|
--amqp-queue solstice.jobs.v1 \
|
||
|
|
--amqp-routing-key jobrequest.v1 \
|
||
|
|
--amqp-prefetch 64
|
||
|
|
```
|
||
|
|
|
||
|
|
3. Enqueue a sample `JobRequest` from Forge Integration:
|
||
|
|
```bash
|
||
|
|
cargo run -p forge-integration -- enqueue \
|
||
|
|
--repo-url https://github.com/example/repo.git \
|
||
|
|
--commit-sha deadbeefdeadbeefdeadbeefdeadbeefdeadbeef \
|
||
|
|
--runs-on illumos-stable
|
||
|
|
```
|
||
|
|
|
||
|
|
You should see the orchestrator log receipt of the job and ack it. Any deserialization or handler error results in the message being dead-lettered to `solstice.jobs.v1.dlq`.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Rationale and Best Practices Used
|
||
|
|
- Durable exchange/queue and persistent messages to avoid data loss on broker restarts.
|
||
|
|
- Publisher confirms and `mandatory` flag to ensure broker acceptance; failures can be surfaced to the publisher.
|
||
|
|
- DLX/DLQ for poison messages and non-transient failures, preventing consumer lockups.
|
||
|
|
- QoS prefetch to match consumer concurrency and protect Orchestrator capacity.
|
||
|
|
- Versioned routing key (`jobrequest.v1`) to allow schema evolution without breaking existing consumers.
|
||
|
|
- Centralized declaration logic (`common::mq`) to keep all services consistent.
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Next Steps
|
||
|
|
- Map real webhooks in Integration to `JobRequest` creation.
|
||
|
|
- Implement Orchestrator scheduling and VM provisioning based on `runs_on` and workload capacity.
|
||
|
|
- Add observability fields (trace context, forge metadata) to `JobRequest` as needed (additive only, maintain compatibility).
|