solstice-ci/docs/ai/2025-10-25-rabbitmq-integration.md
Till Wegmueller a71f9cc7d1
Initial Commit
Signed-off-by: Till Wegmueller <toasterson@gmail.com>
2025-10-25 20:01:08 +02:00

5 KiB

Solstice CI — RabbitMQ Integration and Internal JobRequest Schema (v1)

This document describes the first iteration of our Integration Layer wiring to a real RabbitMQ broker, the internal message schema used between Integration and Orchestrator, and the common utilities that ensure consistent messaging topology across services.

Goals

  • Decouple Integration and Orchestrator via a durable message queue.
  • Use a versioned, forward-compatible message schema (jobrequest.v1).
  • Centralize messaging code in crates/common so all services share the same setup and patterns.
  • Follow RabbitMQ best practices: durable exchanges/queues, DLX/DLQ, publisher confirms, consumer prefetch, explicit ack/nack.

Topology (created automatically by services)

  • Exchange (direct, durable): solstice.jobs
  • Routing key: jobrequest.v1 (aligns with schema version)
  • Queue (durable): solstice.jobs.v1
  • Dead-letter exchange (fanout, durable): solstice.dlx
  • Dead-letter queue (durable): solstice.jobs.v1.dlq
  • Bindings:
    • solstice.jobs.v1 is bound to solstice.jobs with jobrequest.v1.
    • solstice.jobs.v1.dlq is bound to solstice.dlx (fanout, no routing key).

Characteristics:

  • Publishing uses mandatory = true, delivery_mode = 2 (persistent), and publisher confirms.
  • Consumers use basic_qos(prefetch) for backpressure and explicitly ack success or nack(requeue=false) on error → DLQ.

Shared Code in crates/common

  • common::messages::JobRequest (serde-JSON; versioned schema):

    • schema_version: "jobrequest.v1" (default)
    • request_id: uuid::Uuid
    • source: enum { github, forgejo, manual }
    • repo_url: String
    • commit_sha: String
    • workflow_path: Option<String> — optional KDL workflow path
    • workflow_job_id: Option<String> — optional specific job id to run
    • runs_on: Option<String> — scheduling hint
    • submitted_at: time::OffsetDateTime (UTC)
  • common::mq:

    • MqConfig { url, exchange, routing_key, queue, dlx, dlq, prefetch } — env/CLI configurable.
    • declare_topology(&Channel, &MqConfig) — idempotent, called by both publishers and consumers.
    • publish_job(&MqConfig, &JobRequest) — JSON publish with confirms.
    • consume_jobs(&MqConfig, handler) — starts a consumer with QoS, deserializes JobRequest, and ack/nacks accordingly.

Service Wiring

  • Forge Integration (crates/forge-integration):

    • New CLI subcommand enqueue for developer testing:
      • Flags: --repo-url, --commit-sha, --runs-on (optional)
      • Uses common::publish_job to send a JobRequest to the broker.
    • The service will later accept real webhooks and publish JobRequests in response.
  • Orchestrator (crates/orchestrator):

    • Starts a consumer via common::consume_jobs and logs each received JobRequest.
    • This is where capacity checks, scheduling, and job provisioning will be added next.

Configuration (env or CLI)

  • AMQP_URL (default amqp://127.0.0.1:5672/%2f)
  • AMQP_EXCHANGE (default solstice.jobs)
  • AMQP_QUEUE (default solstice.jobs.v1)
  • AMQP_ROUTING_KEY (default jobrequest.v1)
  • AMQP_DLX (default solstice.dlx)
  • AMQP_DLQ (default solstice.jobs.v1.dlq)
  • AMQP_PREFETCH (default 64)

Each service exposes overrides via Clap flags that mirror the env vars.


Local Development

  1. Start RabbitMQ with Docker Compose:

    docker compose up -d rabbitmq
    # AMQP: localhost:5672, Management UI: http://localhost:15672 (guest/guest)
    
  2. Run the Orchestrator consumer:

    cargo run -p orchestrator -- \
      --amqp-url amqp://127.0.0.1:5672/%2f \
      --amqp-exchange solstice.jobs \
      --amqp-queue solstice.jobs.v1 \
      --amqp-routing-key jobrequest.v1 \
      --amqp-prefetch 64
    
  3. Enqueue a sample JobRequest from Forge Integration:

    cargo run -p forge-integration -- enqueue \
      --repo-url https://github.com/example/repo.git \
      --commit-sha deadbeefdeadbeefdeadbeefdeadbeefdeadbeef \
      --runs-on illumos-stable
    

You should see the orchestrator log receipt of the job and ack it. Any deserialization or handler error results in the message being dead-lettered to solstice.jobs.v1.dlq.


Rationale and Best Practices Used

  • Durable exchange/queue and persistent messages to avoid data loss on broker restarts.
  • Publisher confirms and mandatory flag to ensure broker acceptance; failures can be surfaced to the publisher.
  • DLX/DLQ for poison messages and non-transient failures, preventing consumer lockups.
  • QoS prefetch to match consumer concurrency and protect Orchestrator capacity.
  • Versioned routing key (jobrequest.v1) to allow schema evolution without breaking existing consumers.
  • Centralized declaration logic (common::mq) to keep all services consistent.

Next Steps

  • Map real webhooks in Integration to JobRequest creation.
  • Implement Orchestrator scheduling and VM provisioning based on runs_on and workload capacity.
  • Add observability fields (trace context, forge metadata) to JobRequest as needed (additive only, maintain compatibility).