solstice-ci/.junie/guidelines.md
Till Wegmueller 931e5ac81a
Add explicit libvirt configuration support; remove environment variable reliance; bump version to 0.1.9
- Introduce `libvirt_uri` and `libvirt_network` in configuration structs, replacing reliance on environment variables.
- Update all `virsh`-related logic to use explicit parameters for libvirt connection and network settings.
- Align codebase with new guidelines rejecting runtime environment variable mutations.
- Document breaking changes in `.junie/guidelines.md`.
- Increment orchestrator version to 0.1.9.

Signed-off-by: Till Wegmueller <toasterson@gmail.com>
2025-11-17 22:40:50 +01:00

26 KiB
Raw Blame History

Solstice CI — Engineering Guidelines

This document records project-specific build, test, and development conventions for the Solstice CI workspace. It is written for experienced Rust developers and focuses only on details that are unique to this repository.

  • Workspace root: solstice-ci/
  • Crates: crates/agent, crates/orchestrator, crates/forge-integration, crates/github-integration, crates/common
  • Rust edition: 2024
  • Docs policy: Any AI-generated markdown summaries not explicitly requested in a prompt must live under docs/ai/ with a timestamp prefix in the filename (e.g., 2025-10-25-some-topic.md).

1. Build and Configuration

BREAKING POLICY UPDATE — 2025-11-17

We do not set environment variables from Rust code anywhere in this repository. Processes may read environment variables exactly once at startup to build their in-memory configuration, and thereafter must pass values through explicit configuration structs and function parameters. Subprocesses must receive required configuration via flags/args or inherited environment coming from the supervisor (systemd, container runtime), not via mutations performed by our code.

Implications:

  • Remove/avoid any calls to std::env::set_var, Command::env, or ad-hoc HOME/XDG_* rewrites in code. If a child process (e.g., virsh) needs a URI, pass it explicitly via its CLI flags (e.g., virsh -c ) or ensure the supervisors EnvironmentFile provides it.
  • Read environment only to construct config types (e.g., AppConfig, Opts via clap). Do not propagate config by mutating process env.
  • Pass configuration through owned structs (e.g., ExecConfig) and function parameters. Do not fall back to environment once config is built.
  • This change is intentionally not backward compatible with any earlier behavior that relied on code setting env vars at runtime.

Operational guidance:

  • Systemd units and container definitions remain the source of truth for environment. Ensure EnvironmentFile entries are correct. For local runs, pass flags or export vars before invoking binaries.

  • For libvirt tools like virsh, we pass the URI using -c and do not manipulate HOME/XDG_CACHE_HOME.

  • Use the stable toolchain unless explicitly noted. The workspace uses the 2024 edition; keep rustup and cargo updated.

  • Top-level build:

    • Build everything: cargo build --workspace
    • Run individual binaries during development using cargo run -p <crate>.
  • mise file tasks:

    • Tasks live under .mise/tasks/ and are discovered automatically by mise.
    • List all available tasks: mise tasks
    • Run tasks with namespace-style names (directory -> :). Examples:
      • Build all (debug): mise run build:all
      • Build all (release): mise run build:release
      • Test all: mise run test:all
      • Start local deps (RabbitMQ): mise run dev:up
      • Stop local deps: mise run dev:down
      • Run orchestrator with local defaults: mise run run:orchestrator
      • Enqueue a sample job for the current repo/commit: mise run run:forge-enqueue
      • Serve the workflow runner for VMs to download (local dev): mise run run:runner-serve
      • End-to-end local flow (bring up deps, start orchestrator, enqueue one job, tail logs): mise run ci:local
    • Notes for local VM downloads:
      • The orchestrator injects a SOLSTICE_RUNNER_URL into cloud-init; ci:local sets this automatically by serving the runner from your host.
      • You can set ORCH_CONTACT_ADDR to the host:port where the runner should stream logs back (defaults to GRPC_ADDR).
  • Lints and formatting follow the default Rust style unless a crate specifies otherwise. Prefer cargo fmt and cargo clippy --workspace --all-targets --all-features before committing.

  • Secrets and credentials are never committed. For local runs, use environment variables or a .env provider (do not add .env to VCS). In CI/deployments, use a secret store (e.g., Vault, KMS) — see the Integration layer notes.

Common configuration environment variables (pattern; per-service variables may diverge):

  • Logging and tracing:
    • RUST_LOG (or use tracing-subscriber filters)
    • OTEL_EXPORTER_OTLP_ENDPOINT (e.g., http://localhost:4317)
    • OTEL_SERVICE_NAME (e.g., solstice-orchestrator)
  • Database (Postgres via SeaORM):
    • DATABASE_URL=postgres://user:pass@host:5432/solstice
  • Object storage (S3-compatible) and filesystem:
    • S3_ENDPOINT, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION, S3_BUCKET
    • BLOB_FS_ROOT (filesystem-backed blob store root)

2. Error Handling, Tracing, and Telemetry

We standardize on the miette + thiserror error pattern and tracing with OpenTelemetry.

  • thiserror is used for domain error enums/structs; avoid stringly-typed errors.
  • Wrap top-level errors with miette::Report for rich diagnostic output in CLIs and service logs. Prefer eyre-style ergonomics via miette::Result<T> where appropriate.
  • Use tracing over log. Never mix ad-hoc println! in services; println! remains acceptable in intentionally minimal binaries or for very early bootstrap before subscribers are set.
  • Emit spans/resources with OpenTelemetry. The shared initialization lives in the common crate so all binaries get consistent wiring.

Recommended initialization (placed in crates/common as a single public function and called from each service main):

// common/src/telemetry.rs (example; keep in `common`)
pub struct TelemetryGuard {
    // Drop to flush and shutdown OTEL pipeline.
    _guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}

pub fn init_tracing(service_name: &str) -> miette::Result<TelemetryGuard> {
    use miette::IntoDiagnostic;
    use opentelemetry::sdk::{propagation::TraceContextPropagator, Resource};
    use opentelemetry::sdk::trace as sdktrace;
    use opentelemetry::KeyValue;
    use opentelemetry_otlp::WithExportConfig;
    use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};

    // Resource describing this service.
    let resource = Resource::new(vec![KeyValue::new("service.name", service_name.to_string())]);

    // OTLP exporter (gRPC) if endpoint present; otherwise, only console output.
    let tracer = opentelemetry_otlp::new_pipeline()
        .tracing()
        .with_trace_config(sdktrace::Config::default().with_resource(resource))
        .with_exporter(opentelemetry_otlp::new_exporter().tonic())
        .install_batch(opentelemetry::runtime::Tokio)
        .into_diagnostic()?;

    // Optional non-blocking file appender (example) — keep simple unless needed.
    let (nb_writer, guard) = tracing_appender::non_blocking(std::io::stderr());

    let fmt_layer = fmt::layer()
        .with_target(false)
        .with_writer(nb_writer)
        .with_ansi(atty::is(atty::Stream::Stderr));

    let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);

    let filter = EnvFilter::try_from_default_env()
        .or_else(|_| Ok(EnvFilter::new("info")))
        .into_diagnostic()?;

    opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());

    tracing_subscriber::registry()
        .with(filter)
        .with(fmt_layer)
        .with(otel_layer)
        .init();

    Ok(TelemetryGuard { _guard: Some(guard) })
}

In each service binary:

fn main() -> miette::Result<()> {
    let _telemetry = common::telemetry::init_tracing("solstice-orchestrator")?;
    // ...
    Ok(())
}

Guidance:

  • Favor spans around orchestration lifecycles (VM provisioning, job execution, webhook processing). Include high-cardinality attributes judiciously; prefer stable keys (e.g., job_id) and sample IDs for verbose content.
  • When returning errors from service entrypoints, prefer miette::Report with context to produce actionable diagnostics.

3. Database Access — SeaORM + Postgres

  • Use SeaORM for all relational access; keep raw SQL to migrations or perf-critical paths only.
  • Model organization:
    • Entities live in a dedicated entities module/crate generated by SeaORM tooling or handwritten as needed.
    • Queries should be implemented in small, testable functions; avoid mixing business logic with query composition.
  • Migrations:
    • Create a migrations crate (e.g., crates/migration) using SeaORM CLI and point it at Postgres via DATABASE_URL.
    • Typical workflow:
      • sea-orm-cli migrate init
      • sea-orm-cli migrate generate <name>
      • Edit up/down migrations.
      • sea-orm-cli migrate up (or refresh in dev)
    • In services, run migrations on boot behind a feature flag or dedicated admin command.
  • Connection management:
    • Create a connection pool per service at startup (Tokio + async). Inject the pool into subsystems rather than using globals.
    • Use timeouts and statement logging (via tracing) for observability.

3.1 Connection Pooling — deadpool (Postgres)

We standardize on deadpool for async connection pooling to Postgres, using deadpool-postgres (+ tokio-postgres) alongside SeaORM. Services construct a single pool at startup and pass it into subsystems.

Guidelines:

  • Configuration via env:
    • DATABASE_URL (Postgres connection string)
    • DB_POOL_MAX_SIZE (e.g., 1664 depending on workload)
    • DB_POOL_TIMEOUT_MS (acquire timeout; default 5_000ms)
  • Enable TLS when required by your Postgres deployment (use rustls variants where possible). Avoid embedding credentials in code; prefer env/secret stores.
  • Instrument queries with tracing. Prefer statement logging at debug with sampling in high-traffic paths.

Minimal setup sketch:

use std::time::Duration;
use deadpool_postgres::{Config as PgConfig, ManagerConfig, RecyclingMethod};
use miette::{IntoDiagnostic, Result};
use tokio_postgres::NoTls; // or a TLS connector

pub struct DbPool(deadpool_postgres::Pool);

pub fn build_db_pool() -> Result<DbPool> {
    let mut cfg = PgConfig::new();
    cfg.dbname = None; // use url
    cfg.user = None;   // prefer url/env
    cfg.password = None;
    cfg.host = None;
    cfg.port = None;
    cfg.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Fast });

    // Use URL from env; deadpool-postgres supports it via from_env as well.
    let mut cfg = PgConfig::from_env("DATABASE").unwrap_or(cfg);

    let pool = cfg
        .create_pool(Some(deadpool_postgres::Runtime::Tokio1), NoTls)
        .into_diagnostic()?;

    // Optionally wrap with a newtype to pass around.
    Ok(DbPool(pool))
}

pub async fn with_client<F, Fut, T>(pool: &DbPool, f: F) -> Result<T>
where
    F: FnOnce(deadpool_postgres::Client) -> Fut,
    Fut: std::future::Future<Output = Result<T>>,
{
    use tokio::time::timeout;
    let client = timeout(
        Duration::from_millis(std::env::var("DB_POOL_TIMEOUT_MS").ok()
            .and_then(|s| s.parse().ok()).unwrap_or(5_000)),
        pool.0.get(),
    )
    .await
    .into_diagnostic()??;

    f(client).await
}

Notes:

  • Use one pool per service. Do not create pools per request.
  • If using SeaORM, prefer constructing Database::connect from a tokio_postgres::Client via sqlx feature compatibility where applicable, or keep SeaORM for ORM and use raw pooled clients for admin/utility tasks.

4. Blob Storage — S3 and Filesystem

We support both S3-compatible object storage and a local filesystem backend.

  • Prefer a storage abstraction in common with a trait like BlobStore and two implementations:
    • S3BlobStore using the official aws-sdk-s3 client (or a vetted S3-compatible client).
    • FsBlobStore rooted at BLOB_FS_ROOT for local/dev and tests.
  • Selection is via configuration (env or CLI). Keep keys and endpoints out of the repo.
  • For S3, follow best practices:
    • Use instance/role credentials where available; otherwise use env creds.
    • Set timeouts, retries, and backoff via the client config.
    • Keep bucket and path layout stable; include job IDs in keys for traceability.
  • For filesystem, ensure directories are created atomically and validate paths to avoid traversal issues.

5. Argument Parsing — Clap

  • All binaries use clap for flags/subcommands. Derive-based APIs (#[derive(Parser)]) are strongly preferred for consistency and help text quality.
  • Align flags across services where semantics match (e.g., --log-level, --database-url, --s3-endpoint).
  • Emit --help that documents env var fallbacks where appropriate.

Example skeleton:

use clap::Parser;

#[derive(Parser, Debug)]
#[command(name = "solstice-orchestrator", version, about)]
struct Opts {
    /// Postgres connection string
    #[arg(long, env = "DATABASE_URL")]
    database_url: String,

    /// OTLP endpoint (e.g., http://localhost:4317)
    #[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")]
    otlp: Option<String>,
}

fn main() -> miette::Result<()> {
    let _t = common::telemetry::init_tracing("solstice-orchestrator")?;
    let opts = Opts::parse();
    // ...
    Ok(())
}

6. Testing — How We Configure and Run Tests

  • Unit tests: colocated in module files under #[cfg(test)].
  • Integration tests: per-crate tests/ directory. Each *.rs compiles as a separate test binary.
  • Doc tests: keep examples correct or mark them no_run if they require external services.
  • Workspace commands we actually verified:
    • Run all tests: cargo test --workspace
    • Run a single crates integration test (example we executed): cargo test -p agent --test smoke

Adding a new integration test:

  1. Create crates/<crate>/tests/<name>.rs.
  2. Write tests using the public API of the crate; avoid #[cfg(test)] internals.
  3. Use #[tokio::test(flavor = "multi_thread")] when async runtime is needed.
  4. Gate external dependencies (DB, S3) behind env flags or mark tests ignore by default.

Example minimal integration test (we created and validated this during documentation):

#[test]
fn smoke_passes() {
    assert_eq!(2 + 2, 4);
}

Running subsets and filters:

  • By crate: cargo test -p orchestrator
  • By test target: cargo test -p agent --test smoke
  • By name filter: cargo test smoke

7. Code Style and Conventions

  • Error design:
    • Domain errors via thiserror. Convert them to miette::Report at boundaries (CLI/service entry) with context.
    • Prefer Result<T, E> where E: Into<miette::Report> for top-level flows.
  • Telemetry:
    • Every request/job gets a root span; child spans for significant phases. Include IDs in span fields, not only logs.
    • When logging sensitive data, strip or hash before emitting.
  • Async and runtime:
    • Tokio everywhere for async services. Use a single runtime per process; dont nest.
    • Use tracing instrument macros (#[instrument]) for important async fns.
  • Crate versions:
    • Always prefer the most recent compatible releases for miette, thiserror, tracing, tracing-subscriber, opentelemetry, opentelemetry-otlp, tracing-opentelemetry, sea-orm, sea-orm-cli, aws-sdk-s3, and clap.
    • Avoid pinning minor/patch versions unless required for reproducibility or to work around regressions.

8. Local Development Tips

  • Reproducible tasks: For interactions with external systems, prefer containerized dependencies (e.g., Postgres, MinIO) or devcontainer/nix flows if provided in the future.
  • Migration safety: Never auto-apply migrations in production without a maintenance window and backup. In dev, refresh is acceptable.
  • Storage backends: Provide a no-op or filesystem fallback so developers can run without cloud credentials.
  • Observability: Keep logs structured and at info by default. For deep debugging, use RUST_LOG=trace with sampling to avoid log storms.

9. Troubleshooting Quick Notes

  • No logs emitted: Ensure the tracing subscriber is initialized exactly once; double-init will panic. Also check RUST_LOG filters.
  • OTLP export fails: Verify OTEL_EXPORTER_OTLP_ENDPOINT and that an OTLP collector (e.g., otelcol) is reachable. Fall back to console-only tracing if needed.
  • DB connection errors: Validate DATABASE_URL and SSL/TLS requirements. Confirm the service can resolve the host and reach the port.
  • S3 errors: Check credentials and bucket permissions; verify endpoint (for MinIO use path-style or specific region settings).

10. Documentation Routing

  • Architecture and AI-generated summaries: place in docs/ai/ with a timestamp prefix.
  • This guidelines file intentionally lives under .junie/guidelines.md for developer tooling.

11. Messaging — RabbitMQ (lapin)

We use lapin (AMQP 0-9-1 client) for RabbitMQ access with Tokio. Keep producers and consumers simple and observable; centralize connection setup in the common crate where practical and inject channels into subsystems. This channel is used for asynchronous communication. For direct, synchronous service-to-service RPC, use gRPC via tonic (see §12).

Configuration (env; per-service may extend):

  • AMQP_URL (e.g., amqp://user:pass@host:5672/%2f or amqps://... for TLS)
  • AMQP_PREFETCH (QoS prefetch; default 32256 depending on workload)
  • AMQP_EXCHANGE (default exchange name; often empty-string for default direct exchange)
  • AMQP_QUEUE (queue name)
  • AMQP_ROUTING_KEY (routing key when publishing)

Guidelines:

  • Establish one Connection per process with automatic heartbeats; create dedicated Channels per producer/consumer task.
  • Declare exchanges/queues idempotently at startup with durable = true and auto_delete = false unless explicitly ephemeral.
  • Set channel QoS with basic_qos(prefetch_count) to control backpressure. Use ack/nack to preserve at-least-once delivery.
  • Prefer publisher confirms (confirm_select) and handle BasicReturn for unroutable messages when mandatory is set.
  • Instrument with tracing: tag spans with exchange, queue, routing_key, and message IDs; avoid logging bodies.
  • Reconnection: on connection/channel error, back off with jitter and recreate connection/channels; ensure consumers re-declare topology.

Minimal producer example:

use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
use miette::{IntoDiagnostic as _, Result};
use tokio_amqp::*; // enables Tokio reactor for lapin

pub async fn publish_one(msg: &[u8]) -> Result<()> {
    let url = std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
    let exchange = std::env::var("AMQP_EXCHANGE").unwrap_or_default();
    let routing_key = std::env::var("AMQP_ROUTING_KEY").unwrap_or_else(|_| "jobs".into());

    let conn = Connection::connect(&url, ConnectionProperties::default().with_tokio())
        .await
        .into_diagnostic()?;
    let channel = conn.create_channel().await.into_diagnostic()?;

    // Optional: declare exchange if non-empty.
    if !exchange.is_empty() {
        channel
            .exchange_declare(
                &exchange,
                lapin::ExchangeKind::Direct,
                ExchangeDeclareOptions { durable: true, auto_delete: false, ..Default::default() },
                FieldTable::default(),
            )
            .await
            .into_diagnostic()?;
    }

    // Publisher confirms
    channel.confirm_select(ConfirmSelectOptions::default()).await.into_diagnostic()?;

    let confirm = channel
        .basic_publish(
            &exchange,
            &routing_key,
            BasicPublishOptions { mandatory: true, ..Default::default() },
            msg,
            BasicProperties::default().with_content_type("application/octet-stream".into()),
        )
        .await
        .into_diagnostic()?;

    confirm.await.into_diagnostic()?; // wait for confirm
    Ok(())
}

Minimal consumer example:

use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use miette::{IntoDiagnostic as _, Result};
use tokio_amqp::*;

pub async fn consume() -> Result<()> {
    let url = std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
    let queue = std::env::var("AMQP_QUEUE").unwrap_or_else(|_| "jobs".into());
    let prefetch: u16 = std::env::var("AMQP_PREFETCH").ok().and_then(|s| s.parse().ok()).unwrap_or(64);

    let conn = Connection::connect(&url, ConnectionProperties::default().with_tokio())
        .await
        .into_diagnostic()?;
    let channel = conn.create_channel().await.into_diagnostic()?;

    channel
        .queue_declare(
            &queue,
            QueueDeclareOptions { durable: true, auto_delete: false, ..Default::default() },
            FieldTable::default(),
        )
        .await
        .into_diagnostic()?;

    channel.basic_qos(prefetch, BasicQosOptions { global: false }).await.into_diagnostic()?;

    let mut consumer = channel
        .basic_consume(
            &queue,
            "worker",
            BasicConsumeOptions { no_ack: false, ..Default::default() },
            FieldTable::default(),
        )
        .await
        .into_diagnostic()?;

    while let Some(delivery) = consumer.next().await {
        let delivery = delivery.into_diagnostic()?;
        // process delivery.data ...
        channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).await.into_diagnostic()?;
    }
    Ok(())
}

Notes:

  • Use TLS (amqps://) where brokers require it; configure certificates via the underlying TLS connector if needed.
  • Keep message payloads schematized (e.g., JSON/CBOR/Protobuf) and versioned; include an explicit content_type and version header where applicable.

12. RPC — gRPC with tonic

We use gRPC for direct, synchronous service-to-service communication and standardize on the Rust tonic stack with prost for code generation. Prefer RabbitMQ (see §11) for asynchronous workflows, fan-out, or buffering; prefer gRPC when a caller needs an immediate response, strong request/response semantics, deadlines, and backpressure at the transport layer.

Guidance:

  • Crates: tonic, prost, prost-types, optionally tonic-health (liveness/readiness) and tonic-reflection (dev only).
  • Service boundaries: Define protobuf packages per domain (orchestrator.v1, agent.v1). Version packages; keep backward compatible changes (field additions with new tags, do not reuse/rename tags).
  • Errors: Map domain errors to tonic::Status with appropriate Code (e.g., InvalidArgument, NotFound, FailedPrecondition, Unavailable, Internal). Preserve rich diagnostics in logs via miette and tracing; avoid leaking internals to clients.
  • Deadlines and cancellation: Require callers to set deadlines; servers must honor request.deadline() and request.extensions() cancellation. Set sensible server timeouts.
  • Observability: Propagate W3C TraceContext over gRPC metadata and create spans per RPC. Emit attributes for rpc.system = "grpc", rpc.service, rpc.method, stable IDs (job_id) as fields.
  • Security: Prefer TLS with rustls. Use mTLS where feasible, or bearer tokens in metadata (e.g., authorization: Bearer ...). Rotate certs without restarts where possible.
  • Operations: Configure keepalive, max message sizes, and concurrency. Use streaming RPCs for log streaming and large artifact transfers when applicable.

Common env configuration (typical; per-service may extend):

  • GRPC_ADDR or GRPC_HOST/GRPC_PORT (listen/connect endpoint, e.g., 0.0.0.0:50051)
  • GRPC_TLS_CERT, GRPC_TLS_KEY, GRPC_TLS_CA (PEM paths for TLS/mTLS)
  • GRPC_KEEPALIVE_MS (e.g., 20_000), GRPC_MAX_MESSAGE_MB (e.g., 32), GRPC_TIMEOUT_MS (client default)

Minimal server skeleton (centralize wiring in common):

use miette::{IntoDiagnostic as _, Result};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{info, instrument};

// Assume prost-generated module: pub mod orchestrator { pub mod v1 { include!("orchestrator.v1.rs"); }}
use orchestrator::v1::{job_service_server::{JobService, JobServiceServer}, StartJobRequest, StartJobResponse};

pub struct JobSvc;

#[tonic::async_trait]
impl JobService for JobSvc {
    #[instrument(name = "rpc_start_job", skip(self, req), fields(rpc.system = "grpc", rpc.service = "orchestrator.v1.JobService", rpc.method = "StartJob"))]
    async fn start_job(&self, req: Request<StartJobRequest>) -> Result<Response<StartJobResponse>, Status> {
        // Respect caller deadline/cancellation
        if req.deadline().elapsed().is_ok() { return Err(Status::deadline_exceeded("deadline passed")); }
        let r = req.into_inner();
        info!(job_id = %r.job_id, "start_job request");
        // ... do work, map domain errors to Status ...
        Ok(Response::new(StartJobResponse { accepted: true }))
    }
}

pub async fn serve(addr: std::net::SocketAddr) -> Result<()> {
    let _t = common::telemetry::init_tracing("solstice-orchestrator");
    Server::builder()
        .add_service(JobServiceServer::new(JobSvc))
        .serve(addr)
        .await
        .into_diagnostic()
}

Minimal client sketch (with timeout and TLS optional):

use miette::{IntoDiagnostic as _, Result};
use tonic::{transport::{Channel, ClientTlsConfig, Endpoint}, Request, Code};
use orchestrator::v1::{job_service_client::JobServiceClient, StartJobRequest};

pub async fn start_job(addr: &str, job_id: String) -> Result<bool> {
    let mut ep = Endpoint::from_shared(format!("https://{addr}"))?.tcp_keepalive(Some(std::time::Duration::from_secs(20)));
    // ep = ep.tls_config(ClientTlsConfig::new())?; // enable when TLS configured
    let channel: Channel = ep.connect_timeout(std::time::Duration::from_secs(3)).connect().await.into_diagnostic()?;
    let mut client = JobServiceClient::new(channel);
    let mut req = Request::new(StartJobRequest { job_id });
    req.set_timeout(std::time::Duration::from_secs(10));
    match client.start_job(req).await { 
        Ok(resp) => Ok(resp.into_inner().accepted),
        Err(status) if status.code() == Code::Unavailable => Ok(false), // map transient
        Err(e) => Err(miette::miette!("gRPC error: {e}")),
    }
}

Health and reflection:

  • Expose grpc.health.v1.Health via tonic-health for k8s/consumers. Include a readiness check (DB pool, AMQP connection) before reporting SERVING.
  • Enable tonic-reflection only in dev/test to assist tooling; disable in prod.

Testing notes:

  • Use #[tokio::test(flavor = "multi_thread")] and bind the server to 127.0.0.1:0 (ephemeral port) for integration tests.
  • Assert deadlines and cancellation by setting short timeouts on the client Request and verifying Status::deadline_exceeded.

Cross-reference:

  • Asynchronous communication: RabbitMQ via lapin (§11).
  • Direct synchronous RPC: gRPC via tonic (this section).