solstice-ci/.junie/guidelines.md

546 lines
26 KiB
Markdown
Raw Permalink Normal View History

# Solstice CI — Engineering Guidelines
This document records project-specific build, test, and development conventions for the Solstice CI workspace. It is written for experienced Rust developers and focuses only on details that are unique to this repository.
- Workspace root: `solstice-ci/`
- Crates: `crates/agent`, `crates/orchestrator`, `crates/forge-integration`, `crates/github-integration`, `crates/common`
- Rust edition: `2024`
- Docs policy: Any AI-generated markdown summaries not explicitly requested in a prompt must live under `docs/ai/` with a timestamp prefix in the filename (e.g., `2025-10-25-some-topic.md`).
## 1. Build and Configuration
BREAKING POLICY UPDATE — 2025-11-17
We do not set environment variables from Rust code anywhere in this repository. Processes may read environment variables exactly once at startup to build their in-memory configuration, and thereafter must pass values through explicit configuration structs and function parameters. Subprocesses must receive required configuration via flags/args or inherited environment coming from the supervisor (systemd, container runtime), not via mutations performed by our code.
Implications:
- Remove/avoid any calls to std::env::set_var, Command::env, or ad-hoc HOME/XDG_* rewrites in code. If a child process (e.g., virsh) needs a URI, pass it explicitly via its CLI flags (e.g., virsh -c <URI>) or ensure the supervisors EnvironmentFile provides it.
- Read environment only to construct config types (e.g., AppConfig, Opts via clap). Do not propagate config by mutating process env.
- Pass configuration through owned structs (e.g., ExecConfig) and function parameters. Do not fall back to environment once config is built.
- This change is intentionally not backward compatible with any earlier behavior that relied on code setting env vars at runtime.
Operational guidance:
- Systemd units and container definitions remain the source of truth for environment. Ensure EnvironmentFile entries are correct. For local runs, pass flags or export vars before invoking binaries.
- For libvirt tools like virsh, we pass the URI using -c and do not manipulate HOME/XDG_CACHE_HOME.
- Use the stable toolchain unless explicitly noted. The workspace uses the 2024 edition; keep `rustup` and `cargo` updated.
- Top-level build:
- Build everything: `cargo build --workspace`
- Run individual binaries during development using `cargo run -p <crate>`.
- mise file tasks:
- Tasks live under `.mise/tasks/` and are discovered automatically by mise.
- List all available tasks: `mise tasks`
- Run tasks with namespace-style names (directory -> `:`). Examples:
- Build all (debug): `mise run build:all`
- Build all (release): `mise run build:release`
- Test all: `mise run test:all`
- Start local deps (RabbitMQ): `mise run dev:up`
- Stop local deps: `mise run dev:down`
- Run orchestrator with local defaults: `mise run run:orchestrator`
- Enqueue a sample job for the current repo/commit: `mise run run:forge-enqueue`
- Serve the workflow runner for VMs to download (local dev): `mise run run:runner-serve`
- End-to-end local flow (bring up deps, start orchestrator, enqueue one job, tail logs): `mise run ci:local`
- Notes for local VM downloads:
- The orchestrator injects a SOLSTICE_RUNNER_URL into cloud-init; ci:local sets this automatically by serving the runner from your host.
- You can set ORCH_CONTACT_ADDR to the host:port where the runner should stream logs back (defaults to GRPC_ADDR).
- Lints and formatting follow the default Rust style unless a crate specifies otherwise. Prefer `cargo fmt` and `cargo clippy --workspace --all-targets --all-features` before committing.
- Secrets and credentials are never committed. For local runs, use environment variables or a `.env` provider (do not add `.env` to VCS). In CI/deployments, use a secret store (e.g., Vault, KMS) — see the Integration layer notes.
Common configuration environment variables (pattern; per-service variables may diverge):
- Logging and tracing:
- `RUST_LOG` (or use `tracing-subscriber` filters)
- `OTEL_EXPORTER_OTLP_ENDPOINT` (e.g., `http://localhost:4317`)
- `OTEL_SERVICE_NAME` (e.g., `solstice-orchestrator`)
- Database (Postgres via SeaORM):
- `DATABASE_URL=postgres://user:pass@host:5432/solstice`
- Object storage (S3-compatible) and filesystem:
- `S3_ENDPOINT`, `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`, `S3_BUCKET`
- `BLOB_FS_ROOT` (filesystem-backed blob store root)
## 2. Error Handling, Tracing, and Telemetry
We standardize on the `miette` + `thiserror` error pattern and `tracing` with OpenTelemetry.
- `thiserror` is used for domain error enums/structs; avoid stringly-typed errors.
- Wrap top-level errors with `miette::Report` for rich diagnostic output in CLIs and service logs. Prefer `eyre`-style ergonomics via `miette::Result<T>` where appropriate.
- Use `tracing` over `log`. Never mix ad-hoc `println!` in services; `println!` remains acceptable in intentionally minimal binaries or for very early bootstrap before subscribers are set.
- Emit spans/resources with OpenTelemetry. The shared initialization lives in the `common` crate so all binaries get consistent wiring.
Recommended initialization (placed in `crates/common` as a single public function and called from each service `main`):
```rust
// common/src/telemetry.rs (example; keep in `common`)
pub struct TelemetryGuard {
// Drop to flush and shutdown OTEL pipeline.
_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}
pub fn init_tracing(service_name: &str) -> miette::Result<TelemetryGuard> {
use miette::IntoDiagnostic;
use opentelemetry::sdk::{propagation::TraceContextPropagator, Resource};
use opentelemetry::sdk::trace as sdktrace;
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
// Resource describing this service.
let resource = Resource::new(vec![KeyValue::new("service.name", service_name.to_string())]);
// OTLP exporter (gRPC) if endpoint present; otherwise, only console output.
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(sdktrace::Config::default().with_resource(resource))
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.install_batch(opentelemetry::runtime::Tokio)
.into_diagnostic()?;
// Optional non-blocking file appender (example) — keep simple unless needed.
let (nb_writer, guard) = tracing_appender::non_blocking(std::io::stderr());
let fmt_layer = fmt::layer()
.with_target(false)
.with_writer(nb_writer)
.with_ansi(atty::is(atty::Stream::Stderr));
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let filter = EnvFilter::try_from_default_env()
.or_else(|_| Ok(EnvFilter::new("info")))
.into_diagnostic()?;
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.with(otel_layer)
.init();
Ok(TelemetryGuard { _guard: Some(guard) })
}
```
In each service binary:
```rust
fn main() -> miette::Result<()> {
let _telemetry = common::telemetry::init_tracing("solstice-orchestrator")?;
// ...
Ok(())
}
```
Guidance:
- Favor spans around orchestration lifecycles (VM provisioning, job execution, webhook processing). Include high-cardinality attributes judiciously; prefer stable keys (e.g., job_id) and sample IDs for verbose content.
- When returning errors from service entrypoints, prefer `miette::Report` with context to produce actionable diagnostics.
## 3. Database Access — SeaORM + Postgres
- Use SeaORM for all relational access; keep raw SQL to migrations or perf-critical paths only.
- Model organization:
- Entities live in a dedicated `entities` module/crate generated by SeaORM tooling or handwritten as needed.
- Queries should be implemented in small, testable functions; avoid mixing business logic with query composition.
- Migrations:
- Create a migrations crate (e.g., `crates/migration`) using SeaORM CLI and point it at Postgres via `DATABASE_URL`.
- Typical workflow:
- `sea-orm-cli migrate init`
- `sea-orm-cli migrate generate <name>`
- Edit up/down migrations.
- `sea-orm-cli migrate up` (or `refresh` in dev)
- In services, run migrations on boot behind a feature flag or dedicated admin command.
- Connection management:
- Create a connection pool per service at startup (Tokio + async). Inject the pool into subsystems rather than using globals.
- Use timeouts and statement logging (via `tracing`) for observability.
### 3.1 Connection Pooling — deadpool (Postgres)
We standardize on `deadpool` for async connection pooling to Postgres, using `deadpool-postgres` (+ `tokio-postgres`) alongside SeaORM. Services construct a single pool at startup and pass it into subsystems.
Guidelines:
- Configuration via env:
- `DATABASE_URL` (Postgres connection string)
- `DB_POOL_MAX_SIZE` (e.g., 1664 depending on workload)
- `DB_POOL_TIMEOUT_MS` (acquire timeout; default 5_000ms)
- Enable TLS when required by your Postgres deployment (use `rustls` variants where possible). Avoid embedding credentials in code; prefer env/secret stores.
- Instrument queries with `tracing`. Prefer statement logging at `debug` with sampling in high-traffic paths.
Minimal setup sketch:
```rust
use std::time::Duration;
use deadpool_postgres::{Config as PgConfig, ManagerConfig, RecyclingMethod};
use miette::{IntoDiagnostic, Result};
use tokio_postgres::NoTls; // or a TLS connector
pub struct DbPool(deadpool_postgres::Pool);
pub fn build_db_pool() -> Result<DbPool> {
let mut cfg = PgConfig::new();
cfg.dbname = None; // use url
cfg.user = None; // prefer url/env
cfg.password = None;
cfg.host = None;
cfg.port = None;
cfg.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Fast });
// Use URL from env; deadpool-postgres supports it via from_env as well.
let mut cfg = PgConfig::from_env("DATABASE").unwrap_or(cfg);
let pool = cfg
.create_pool(Some(deadpool_postgres::Runtime::Tokio1), NoTls)
.into_diagnostic()?;
// Optionally wrap with a newtype to pass around.
Ok(DbPool(pool))
}
pub async fn with_client<F, Fut, T>(pool: &DbPool, f: F) -> Result<T>
where
F: FnOnce(deadpool_postgres::Client) -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
use tokio::time::timeout;
let client = timeout(
Duration::from_millis(std::env::var("DB_POOL_TIMEOUT_MS").ok()
.and_then(|s| s.parse().ok()).unwrap_or(5_000)),
pool.0.get(),
)
.await
.into_diagnostic()??;
f(client).await
}
```
Notes:
- Use one pool per service. Do not create pools per request.
- If using SeaORM, prefer constructing `Database::connect` from a `tokio_postgres::Client` via `sqlx` feature compatibility where applicable, or keep SeaORM for ORM and use raw pooled clients for admin/utility tasks.
## 4. Blob Storage — S3 and Filesystem
We support both S3-compatible object storage and a local filesystem backend.
- Prefer a storage abstraction in `common` with a trait like `BlobStore` and two implementations:
- `S3BlobStore` using the official `aws-sdk-s3` client (or a vetted S3-compatible client).
- `FsBlobStore` rooted at `BLOB_FS_ROOT` for local/dev and tests.
- Selection is via configuration (env or CLI). Keep keys and endpoints out of the repo.
- For S3, follow best practices:
- Use instance/role credentials where available; otherwise use env creds.
- Set timeouts, retries, and backoff via the client config.
- Keep bucket and path layout stable; include job IDs in keys for traceability.
- For filesystem, ensure directories are created atomically and validate paths to avoid traversal issues.
## 5. Argument Parsing — Clap
- All binaries use `clap` for flags/subcommands. Derive-based APIs (`#[derive(Parser)]`) are strongly preferred for consistency and help text quality.
- Align flags across services where semantics match (e.g., `--log-level`, `--database-url`, `--s3-endpoint`).
- Emit `--help` that documents env var fallbacks where appropriate.
Example skeleton:
```rust
use clap::Parser;
#[derive(Parser, Debug)]
#[command(name = "solstice-orchestrator", version, about)]
struct Opts {
/// Postgres connection string
#[arg(long, env = "DATABASE_URL")]
database_url: String,
/// OTLP endpoint (e.g., http://localhost:4317)
#[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")]
otlp: Option<String>,
}
fn main() -> miette::Result<()> {
let _t = common::telemetry::init_tracing("solstice-orchestrator")?;
let opts = Opts::parse();
// ...
Ok(())
}
```
## 6. Testing — How We Configure and Run Tests
- Unit tests: colocated in module files under `#[cfg(test)]`.
- Integration tests: per-crate `tests/` directory. Each `*.rs` compiles as a separate test binary.
- Doc tests: keep examples correct or mark them `no_run` if they require external services.
- Workspace commands we actually verified:
- Run all tests: `cargo test --workspace`
- Run a single crates integration test (example we executed): `cargo test -p agent --test smoke`
Adding a new integration test:
1. Create `crates/<crate>/tests/<name>.rs`.
2. Write tests using the public API of the crate; avoid `#[cfg(test)]` internals.
3. Use `#[tokio::test(flavor = "multi_thread")]` when async runtime is needed.
4. Gate external dependencies (DB, S3) behind env flags or mark tests `ignore` by default.
Example minimal integration test (we created and validated this during documentation):
```rust
#[test]
fn smoke_passes() {
assert_eq!(2 + 2, 4);
}
```
Running subsets and filters:
- By crate: `cargo test -p orchestrator`
- By test target: `cargo test -p agent --test smoke`
- By name filter: `cargo test smoke`
## 7. Code Style and Conventions
- Error design:
- Domain errors via `thiserror`. Convert them to `miette::Report` at boundaries (CLI/service entry) with context.
- Prefer `Result<T, E>` where `E: Into<miette::Report>` for top-level flows.
- Telemetry:
- Every request/job gets a root span; child spans for significant phases. Include IDs in span fields, not only logs.
- When logging sensitive data, strip or hash before emitting.
- Async and runtime:
- Tokio everywhere for async services. Use a single runtime per process; dont nest.
- Use `tracing` instrument macros (`#[instrument]`) for important async fns.
- Crate versions:
- Always prefer the most recent compatible releases for `miette`, `thiserror`, `tracing`, `tracing-subscriber`, `opentelemetry`, `opentelemetry-otlp`, `tracing-opentelemetry`, `sea-orm`, `sea-orm-cli`, `aws-sdk-s3`, and `clap`.
- Avoid pinning minor/patch versions unless required for reproducibility or to work around regressions.
## 8. Local Development Tips
- Reproducible tasks: For interactions with external systems, prefer containerized dependencies (e.g., Postgres, MinIO) or `devcontainer`/`nix` flows if provided in the future.
- Migration safety: Never auto-apply migrations in production without a maintenance window and backup. In dev, `refresh` is acceptable.
- Storage backends: Provide a no-op or filesystem fallback so developers can run without cloud credentials.
- Observability: Keep logs structured and at `info` by default. For deep debugging, use `RUST_LOG=trace` with sampling to avoid log storms.
## 9. Troubleshooting Quick Notes
- No logs emitted: Ensure the tracing subscriber is initialized exactly once; double-init will panic. Also check `RUST_LOG` filters.
- OTLP export fails: Verify `OTEL_EXPORTER_OTLP_ENDPOINT` and that an OTLP collector (e.g., `otelcol`) is reachable. Fall back to console-only tracing if needed.
- DB connection errors: Validate `DATABASE_URL` and SSL/TLS requirements. Confirm the service can resolve the host and reach the port.
- S3 errors: Check credentials and bucket permissions; verify endpoint (for MinIO use path-style or specific region settings).
## 10. Documentation Routing
- Architecture and AI-generated summaries: place in `docs/ai/` with a timestamp prefix.
- This guidelines file intentionally lives under `.junie/guidelines.md` for developer tooling.
## 11. Messaging — RabbitMQ (lapin)
We use `lapin` (AMQP 0-9-1 client) for RabbitMQ access with Tokio. Keep producers and consumers simple and observable; centralize connection setup in the `common` crate where practical and inject channels into subsystems. This channel is used for asynchronous communication. For direct, synchronous service-to-service RPC, use gRPC via `tonic` (see §12).
Configuration (env; per-service may extend):
- `AMQP_URL` (e.g., `amqp://user:pass@host:5672/%2f` or `amqps://...` for TLS)
- `AMQP_PREFETCH` (QoS prefetch; default 32256 depending on workload)
- `AMQP_EXCHANGE` (default exchange name; often empty-string for default direct exchange)
- `AMQP_QUEUE` (queue name)
- `AMQP_ROUTING_KEY` (routing key when publishing)
Guidelines:
- Establish one `Connection` per process with automatic heartbeats; create dedicated `Channel`s per producer/consumer task.
- Declare exchanges/queues idempotently at startup with `durable = true` and `auto_delete = false` unless explicitly ephemeral.
- Set channel QoS with `basic_qos(prefetch_count)` to control backpressure. Use ack/nack to preserve at-least-once delivery.
- Prefer publisher confirms (`confirm_select`) and handle `BasicReturn` for unroutable messages when `mandatory` is set.
- Instrument with `tracing`: tag spans with `exchange`, `queue`, `routing_key`, and message IDs; avoid logging bodies.
- Reconnection: on connection/channel error, back off with jitter and recreate connection/channels; ensure consumers re-declare topology.
Minimal producer example:
```rust
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
use miette::{IntoDiagnostic as _, Result};
use tokio_amqp::*; // enables Tokio reactor for lapin
pub async fn publish_one(msg: &[u8]) -> Result<()> {
let url = std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let exchange = std::env::var("AMQP_EXCHANGE").unwrap_or_default();
let routing_key = std::env::var("AMQP_ROUTING_KEY").unwrap_or_else(|_| "jobs".into());
let conn = Connection::connect(&url, ConnectionProperties::default().with_tokio())
.await
.into_diagnostic()?;
let channel = conn.create_channel().await.into_diagnostic()?;
// Optional: declare exchange if non-empty.
if !exchange.is_empty() {
channel
.exchange_declare(
&exchange,
lapin::ExchangeKind::Direct,
ExchangeDeclareOptions { durable: true, auto_delete: false, ..Default::default() },
FieldTable::default(),
)
.await
.into_diagnostic()?;
}
// Publisher confirms
channel.confirm_select(ConfirmSelectOptions::default()).await.into_diagnostic()?;
let confirm = channel
.basic_publish(
&exchange,
&routing_key,
BasicPublishOptions { mandatory: true, ..Default::default() },
msg,
BasicProperties::default().with_content_type("application/octet-stream".into()),
)
.await
.into_diagnostic()?;
confirm.await.into_diagnostic()?; // wait for confirm
Ok(())
}
```
Minimal consumer example:
```rust
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use miette::{IntoDiagnostic as _, Result};
use tokio_amqp::*;
pub async fn consume() -> Result<()> {
let url = std::env::var("AMQP_URL").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let queue = std::env::var("AMQP_QUEUE").unwrap_or_else(|_| "jobs".into());
let prefetch: u16 = std::env::var("AMQP_PREFETCH").ok().and_then(|s| s.parse().ok()).unwrap_or(64);
let conn = Connection::connect(&url, ConnectionProperties::default().with_tokio())
.await
.into_diagnostic()?;
let channel = conn.create_channel().await.into_diagnostic()?;
channel
.queue_declare(
&queue,
QueueDeclareOptions { durable: true, auto_delete: false, ..Default::default() },
FieldTable::default(),
)
.await
.into_diagnostic()?;
channel.basic_qos(prefetch, BasicQosOptions { global: false }).await.into_diagnostic()?;
let mut consumer = channel
.basic_consume(
&queue,
"worker",
BasicConsumeOptions { no_ack: false, ..Default::default() },
FieldTable::default(),
)
.await
.into_diagnostic()?;
while let Some(delivery) = consumer.next().await {
let delivery = delivery.into_diagnostic()?;
// process delivery.data ...
channel.basic_ack(delivery.delivery_tag, BasicAckOptions::default()).await.into_diagnostic()?;
}
Ok(())
}
```
Notes:
- Use TLS (`amqps://`) where brokers require it; configure certificates via the underlying TLS connector if needed.
- Keep message payloads schematized (e.g., JSON/CBOR/Protobuf) and versioned; include an explicit `content_type` and version header where applicable.
## 12. RPC — gRPC with tonic
We use `gRPC` for direct, synchronous service-to-service communication and standardize on the Rust `tonic` stack with `prost` for code generation. Prefer RabbitMQ (see §11) for asynchronous workflows, fan-out, or buffering; prefer gRPC when a caller needs an immediate response, strong request/response semantics, deadlines, and backpressure at the transport layer.
Guidance:
- Crates: `tonic`, `prost`, `prost-types`, optionally `tonic-health` (liveness/readiness) and `tonic-reflection` (dev only).
- Service boundaries: Define protobuf packages per domain (`orchestrator.v1`, `agent.v1`). Version packages; keep backward compatible changes (field additions with new tags, do not reuse/rename tags).
- Errors: Map domain errors to `tonic::Status` with appropriate `Code` (e.g., `InvalidArgument`, `NotFound`, `FailedPrecondition`, `Unavailable`, `Internal`). Preserve rich diagnostics in logs via `miette` and `tracing`; avoid leaking internals to clients.
- Deadlines and cancellation: Require callers to set deadlines; servers must honor `request.deadline()` and `request.extensions()` cancellation. Set sensible server timeouts.
- Observability: Propagate W3C TraceContext over gRPC metadata and create spans per RPC. Emit attributes for `rpc.system = "grpc"`, `rpc.service`, `rpc.method`, stable IDs (job_id) as fields.
- Security: Prefer TLS with `rustls`. Use mTLS where feasible, or bearer tokens in metadata (e.g., `authorization: Bearer ...`). Rotate certs without restarts where possible.
- Operations: Configure keepalive, max message sizes, and concurrency. Use streaming RPCs for log streaming and large artifact transfers when applicable.
Common env configuration (typical; per-service may extend):
- `GRPC_ADDR` or `GRPC_HOST`/`GRPC_PORT` (listen/connect endpoint, e.g., `0.0.0.0:50051`)
- `GRPC_TLS_CERT`, `GRPC_TLS_KEY`, `GRPC_TLS_CA` (PEM paths for TLS/mTLS)
- `GRPC_KEEPALIVE_MS` (e.g., 20_000), `GRPC_MAX_MESSAGE_MB` (e.g., 32), `GRPC_TIMEOUT_MS` (client default)
Minimal server skeleton (centralize wiring in `common`):
```rust
use miette::{IntoDiagnostic as _, Result};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{info, instrument};
// Assume prost-generated module: pub mod orchestrator { pub mod v1 { include!("orchestrator.v1.rs"); }}
use orchestrator::v1::{job_service_server::{JobService, JobServiceServer}, StartJobRequest, StartJobResponse};
pub struct JobSvc;
#[tonic::async_trait]
impl JobService for JobSvc {
#[instrument(name = "rpc_start_job", skip(self, req), fields(rpc.system = "grpc", rpc.service = "orchestrator.v1.JobService", rpc.method = "StartJob"))]
async fn start_job(&self, req: Request<StartJobRequest>) -> Result<Response<StartJobResponse>, Status> {
// Respect caller deadline/cancellation
if req.deadline().elapsed().is_ok() { return Err(Status::deadline_exceeded("deadline passed")); }
let r = req.into_inner();
info!(job_id = %r.job_id, "start_job request");
// ... do work, map domain errors to Status ...
Ok(Response::new(StartJobResponse { accepted: true }))
}
}
pub async fn serve(addr: std::net::SocketAddr) -> Result<()> {
let _t = common::telemetry::init_tracing("solstice-orchestrator");
Server::builder()
.add_service(JobServiceServer::new(JobSvc))
.serve(addr)
.await
.into_diagnostic()
}
```
Minimal client sketch (with timeout and TLS optional):
```rust
use miette::{IntoDiagnostic as _, Result};
use tonic::{transport::{Channel, ClientTlsConfig, Endpoint}, Request, Code};
use orchestrator::v1::{job_service_client::JobServiceClient, StartJobRequest};
pub async fn start_job(addr: &str, job_id: String) -> Result<bool> {
let mut ep = Endpoint::from_shared(format!("https://{addr}"))?.tcp_keepalive(Some(std::time::Duration::from_secs(20)));
// ep = ep.tls_config(ClientTlsConfig::new())?; // enable when TLS configured
let channel: Channel = ep.connect_timeout(std::time::Duration::from_secs(3)).connect().await.into_diagnostic()?;
let mut client = JobServiceClient::new(channel);
let mut req = Request::new(StartJobRequest { job_id });
req.set_timeout(std::time::Duration::from_secs(10));
match client.start_job(req).await {
Ok(resp) => Ok(resp.into_inner().accepted),
Err(status) if status.code() == Code::Unavailable => Ok(false), // map transient
Err(e) => Err(miette::miette!("gRPC error: {e}")),
}
}
```
Health and reflection:
- Expose `grpc.health.v1.Health` via `tonic-health` for k8s/consumers. Include a readiness check (DB pool, AMQP connection) before reporting `SERVING`.
- Enable `tonic-reflection` only in dev/test to assist tooling; disable in prod.
Testing notes:
- Use `#[tokio::test(flavor = "multi_thread")]` and bind the server to `127.0.0.1:0` (ephemeral port) for integration tests.
- Assert deadlines and cancellation by setting short timeouts on the client `Request` and verifying `Status::deadline_exceeded`.
Cross-reference:
- Asynchronous communication: RabbitMQ via `lapin` (§11).
- Direct synchronous RPC: gRPC via `tonic` (this section).