5 KiB
Solstice CI — RabbitMQ Integration and Internal JobRequest Schema (v1)
This document describes the first iteration of our Integration Layer wiring to a real RabbitMQ broker, the internal message schema used between Integration and Orchestrator, and the common utilities that ensure consistent messaging topology across services.
Goals
- Decouple Integration and Orchestrator via a durable message queue.
- Use a versioned, forward-compatible message schema (
jobrequest.v1). - Centralize messaging code in
crates/commonso all services share the same setup and patterns. - Follow RabbitMQ best practices: durable exchanges/queues, DLX/DLQ, publisher confirms, consumer prefetch, explicit ack/nack.
Topology (created automatically by services)
- Exchange (direct, durable):
solstice.jobs - Routing key:
jobrequest.v1(aligns with schema version) - Queue (durable):
solstice.jobs.v1 - Dead-letter exchange (fanout, durable):
solstice.dlx - Dead-letter queue (durable):
solstice.jobs.v1.dlq - Bindings:
solstice.jobs.v1is bound tosolstice.jobswithjobrequest.v1.solstice.jobs.v1.dlqis bound tosolstice.dlx(fanout, no routing key).
Characteristics:
- Publishing uses
mandatory = true,delivery_mode = 2(persistent), and publisher confirms. - Consumers use
basic_qos(prefetch)for backpressure and explicitlyacksuccess ornack(requeue=false)on error → DLQ.
Shared Code in crates/common
-
common::messages::JobRequest(serde-JSON; versioned schema):schema_version: "jobrequest.v1"(default)request_id: uuid::Uuidsource: enum { github, forgejo, manual }repo_url: Stringcommit_sha: Stringworkflow_path: Option<String>— optional KDL workflow pathworkflow_job_id: Option<String>— optional specific job id to runruns_on: Option<String>— scheduling hintsubmitted_at: time::OffsetDateTime(UTC)
-
common::mq:MqConfig { url, exchange, routing_key, queue, dlx, dlq, prefetch }— env/CLI configurable.declare_topology(&Channel, &MqConfig)— idempotent, called by both publishers and consumers.publish_job(&MqConfig, &JobRequest)— JSON publish with confirms.consume_jobs(&MqConfig, handler)— starts a consumer with QoS, deserializesJobRequest, andack/nacks accordingly.
Service Wiring
-
Forge Integration (
crates/forge-integration):- New CLI subcommand
enqueuefor developer testing:- Flags:
--repo-url,--commit-sha,--runs-on(optional) - Uses
common::publish_jobto send aJobRequestto the broker.
- Flags:
- The service will later accept real webhooks and publish
JobRequests in response.
- New CLI subcommand
-
Orchestrator (
crates/orchestrator):- Starts a consumer via
common::consume_jobsand logs each receivedJobRequest. - This is where capacity checks, scheduling, and job provisioning will be added next.
- Starts a consumer via
Configuration (env or CLI)
AMQP_URL(defaultamqp://127.0.0.1:5672/%2f)AMQP_EXCHANGE(defaultsolstice.jobs)AMQP_QUEUE(defaultsolstice.jobs.v1)AMQP_ROUTING_KEY(defaultjobrequest.v1)AMQP_DLX(defaultsolstice.dlx)AMQP_DLQ(defaultsolstice.jobs.v1.dlq)AMQP_PREFETCH(default64)
Each service exposes overrides via Clap flags that mirror the env vars.
Local Development
-
Start RabbitMQ with Docker Compose:
docker compose up -d rabbitmq # AMQP: localhost:5672, Management UI: http://localhost:15672 (guest/guest) -
Run the Orchestrator consumer:
cargo run -p orchestrator -- \ --amqp-url amqp://127.0.0.1:5672/%2f \ --amqp-exchange solstice.jobs \ --amqp-queue solstice.jobs.v1 \ --amqp-routing-key jobrequest.v1 \ --amqp-prefetch 64 -
Enqueue a sample
JobRequestfrom Forge Integration:cargo run -p forge-integration -- enqueue \ --repo-url https://github.com/example/repo.git \ --commit-sha deadbeefdeadbeefdeadbeefdeadbeefdeadbeef \ --runs-on illumos-stable
You should see the orchestrator log receipt of the job and ack it. Any deserialization or handler error results in the message being dead-lettered to solstice.jobs.v1.dlq.
Rationale and Best Practices Used
- Durable exchange/queue and persistent messages to avoid data loss on broker restarts.
- Publisher confirms and
mandatoryflag to ensure broker acceptance; failures can be surfaced to the publisher. - DLX/DLQ for poison messages and non-transient failures, preventing consumer lockups.
- QoS prefetch to match consumer concurrency and protect Orchestrator capacity.
- Versioned routing key (
jobrequest.v1) to allow schema evolution without breaking existing consumers. - Centralized declaration logic (
common::mq) to keep all services consistent.
Next Steps
- Map real webhooks in Integration to
JobRequestcreation. - Implement Orchestrator scheduling and VM provisioning based on
runs_onand workload capacity. - Add observability fields (trace context, forge metadata) to
JobRequestas needed (additive only, maintain compatibility).