Introduce centralized configuration handling via KDL and environment variables

This commit adds:
- A unified configuration system (`AppConfig`) that aggregates KDL files and environment variables with precedence handling.
- Example KDL configuration files for the orchestrator and forge-integration modules.
- Updates to orchestrator and forge-integration to load and apply configurations from `AppConfig`.
- Improved AMQP and database configuration with overlays from CLI, environment, or KDL.
- Deprecated `TODO.txt` as it's now represented in the configuration examples.
This commit is contained in:
Till Wegmueller 2025-11-06 23:48:03 +01:00
parent 0dabdf2bb2
commit 11ce9cc881
No known key found for this signature in database
7 changed files with 152 additions and 16 deletions

View file

@ -1,3 +0,0 @@
- Make VM reachable IP of the orchestrator configurable in case the setup on illumos gets more complicated (via config file)
- Make the forge-integration task use fnox secrets

View file

@ -0,0 +1,97 @@
use std::collections::HashMap;
use std::path::PathBuf;
use miette::{IntoDiagnostic as _, Result};
use kdl::{KdlDocument, KdlValue};
/// Internal application configuration aggregated from env and KDL.
#[derive(Clone, Debug)]
pub struct AppConfig {
pub grpc_addr: Option<String>,
pub http_addr: Option<String>,
pub database_url: Option<String>,
pub otlp_endpoint: Option<String>,
pub mq: crate::mq::MqConfig,
}
impl AppConfig {
/// Load config by reading env vars and KDL files without mutating the environment.
/// Precedence: KDL (lowest) < Environment < CLI (applied by callers).
pub fn load(service: &str) -> Result<Self> {
let kdl_map = load_kdl_kv(service)?;
let grpc_addr = std::env::var("GRPC_ADDR").ok().or_else(|| kdl_map.get("GRPC_ADDR").cloned());
let http_addr = std::env::var("HTTP_ADDR").ok().or_else(|| kdl_map.get("HTTP_ADDR").cloned());
let database_url = std::env::var("DATABASE_URL").ok().or_else(|| kdl_map.get("DATABASE_URL").cloned());
let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok().or_else(|| kdl_map.get("OTEL_EXPORTER_OTLP_ENDPOINT").cloned());
// Build MQ config from env with KDL fallbacks, then defaults
let url = std::env::var("AMQP_URL")
.ok()
.or_else(|| kdl_map.get("AMQP_URL").cloned())
.unwrap_or_else(|| "amqp://127.0.0.1:5672/%2f".into());
let exchange = std::env::var("AMQP_EXCHANGE")
.ok()
.or_else(|| kdl_map.get("AMQP_EXCHANGE").cloned())
.unwrap_or_else(|| "solstice.jobs".into());
let routing_key = std::env::var("AMQP_ROUTING_KEY")
.ok()
.or_else(|| kdl_map.get("AMQP_ROUTING_KEY").cloned())
.unwrap_or_else(|| "jobrequest.v1".into());
let queue = std::env::var("AMQP_QUEUE")
.ok()
.or_else(|| kdl_map.get("AMQP_QUEUE").cloned())
.unwrap_or_else(|| "solstice.jobs.v1".into());
let dlx = std::env::var("AMQP_DLX")
.ok()
.or_else(|| kdl_map.get("AMQP_DLX").cloned())
.unwrap_or_else(|| "solstice.dlx".into());
let dlq = std::env::var("AMQP_DLQ")
.ok()
.or_else(|| kdl_map.get("AMQP_DLQ").cloned())
.unwrap_or_else(|| "solstice.jobs.v1.dlq".into());
let prefetch = std::env::var("AMQP_PREFETCH")
.ok()
.and_then(|s| s.parse().ok())
.or_else(|| kdl_map.get("AMQP_PREFETCH").and_then(|s| s.parse().ok()))
.unwrap_or(64u16);
let mq = crate::mq::MqConfig { url, exchange, routing_key, queue, dlx, dlq, prefetch };
Ok(Self { grpc_addr, http_addr, database_url, otlp_endpoint, mq })
}
}
/// Load KDL files into a simple key/value map of strings.
fn load_kdl_kv(service: &str) -> Result<HashMap<String, String>> {
let global = PathBuf::from("/etc/solstice/solstice.kdl");
let svc = PathBuf::from(format!("/etc/solstice/{}.kdl", service));
let mut map = HashMap::new();
for path in [global, svc] {
if !path.exists() { continue; }
let s = std::fs::read_to_string(&path).into_diagnostic()?;
let doc: KdlDocument = s.parse().into_diagnostic()?;
for node in doc.nodes() {
let key = node.name().value().to_string();
// Prefer first argument, otherwise `value` property; skip nulls
let value_str = if let Some(entry) = node.entries().first() {
kdl_value_to_string(entry.value())
} else if let Some(v) = node.get("value") {
kdl_value_to_string(v)
} else {
None
};
if let Some(v) = value_str {
// Only insert if not already set by a previous file (global lowest precedence)
map.entry(key).or_insert(v);
}
}
}
Ok(map)
}
fn kdl_value_to_string(v: &KdlValue) -> Option<String> {
match v {
KdlValue::Null => None,
_ => Some(v.to_string()),
}
}

View file

@ -2,11 +2,13 @@ pub mod job;
pub mod messages; pub mod messages;
pub mod mq; pub mod mq;
pub mod telemetry; pub mod telemetry;
pub mod config;
pub use job::{Job, Step, Workflow, parse_workflow_file, parse_workflow_str}; pub use job::{Job, Step, Workflow, parse_workflow_file, parse_workflow_str};
pub use messages::{JobRequest, JobResult, SourceSystem}; pub use messages::{JobRequest, JobResult, SourceSystem};
pub use mq::{MqConfig, consume_jobs, consume_jobs_until, publish_job, publish_job_result}; pub use mq::{MqConfig, consume_jobs, consume_jobs_until, publish_job, publish_job_result};
pub use telemetry::{TelemetryGuard, init_tracing}; pub use telemetry::{TelemetryGuard, init_tracing};
pub use config::AppConfig;
// Generated gRPC module for runner <-> orchestrator // Generated gRPC module for runner <-> orchestrator
pub mod runner { pub mod runner {

View file

@ -123,12 +123,14 @@ type HmacSha256 = Hmac<Sha256>;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> { async fn main() -> Result<()> {
// Load internal config (preloads KDL -> env, then reads env)
let app_cfg = common::AppConfig::load("forge-integration")?;
let _t = common::init_tracing("solstice-forge-integration")?; let _t = common::init_tracing("solstice-forge-integration")?;
let opts = Opts::parse(); let opts = Opts::parse();
info!(http_addr = %opts.http_addr, path = %opts.webhook_path, "forge integration starting"); info!(http_addr = %opts.http_addr, path = %opts.webhook_path, "forge integration starting");
// Apply AMQP overrides if provided // Apply AMQP overrides if provided, starting from AppConfig
let mut mq_cfg = common::MqConfig::default(); let mut mq_cfg = app_cfg.mq.clone();
if let Some(u) = opts.amqp_url { if let Some(u) = opts.amqp_url {
mq_cfg.url = u; mq_cfg.url = u;
} }

View file

@ -101,6 +101,8 @@ struct Opts {
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> { async fn main() -> Result<()> {
// Load internal config (preloads KDL -> env, then reads env)
let app_cfg = common::AppConfig::load("orchestrator")?;
let _t = common::init_tracing("solstice-orchestrator")?; let _t = common::init_tracing("solstice-orchestrator")?;
let opts = Opts::parse(); let opts = Opts::parse();
info!(grpc_addr = %opts.grpc_addr, db = %opts.database_url, amqp = %opts.amqp_url, "orchestrator starting"); info!(grpc_addr = %opts.grpc_addr, db = %opts.database_url, amqp = %opts.amqp_url, "orchestrator starting");
@ -119,19 +121,24 @@ async fn main() -> Result<()> {
let persist = if opts.skip_persistence { let persist = if opts.skip_persistence {
Arc::new(Persist::new(None).await?) Arc::new(Persist::new(None).await?)
} else { } else {
Arc::new(Persist::new(Some(opts.database_url.clone())).await?) // Use CLI database_url if non-empty; otherwise fall back to AppConfig
let db_url_opt = if opts.database_url.is_empty() {
app_cfg.database_url.clone()
} else {
Some(opts.database_url.clone())
};
Arc::new(Persist::new(db_url_opt).await?)
}; };
// Build MQ config and start consumer // Build MQ config starting from AppConfig, then overlay CLI flags
let mq_cfg = common::MqConfig { let mut mq_cfg = app_cfg.mq.clone();
url: opts.amqp_url.clone(), mq_cfg.url = opts.amqp_url.clone();
exchange: opts.amqp_exchange.clone(), mq_cfg.exchange = opts.amqp_exchange.clone();
routing_key: opts.amqp_routing_key.clone(), mq_cfg.routing_key = opts.amqp_routing_key.clone();
queue: opts.amqp_queue.clone(), mq_cfg.queue = opts.amqp_queue.clone();
dlx: std::env::var("AMQP_DLX").unwrap_or_else(|_| "solstice.dlx".into()), // dlx/dlq can come from env/KDL via AppConfig (MqConfig::default), keep existing if not set in env
dlq: std::env::var("AMQP_DLQ").unwrap_or_else(|_| "solstice.jobs.v1.dlq".into()), // prefetch: if not provided, default to max_concurrency
prefetch: opts.amqp_prefetch.unwrap_or(opts.max_concurrency as u16), mq_cfg.prefetch = opts.amqp_prefetch.unwrap_or(opts.max_concurrency as u16);
};
// Start gRPC server for runner log streaming // Start gRPC server for runner log streaming
let grpc_addr: std::net::SocketAddr = opts.grpc_addr.parse().into_diagnostic()?; let grpc_addr: std::net::SocketAddr = opts.grpc_addr.parse().into_diagnostic()?;

View file

@ -0,0 +1,17 @@
// Example forge-integration config in KDL. Place at /etc/solstice/forge-integration.kdl
HTTP_ADDR "0.0.0.0:8080"
WEBHOOK_PATH "/webhooks/forgejo"
WEBHOOK_SECRET "replace-me"
AMQP_URL "amqp://127.0.0.1:5672/%2f"
AMQP_EXCHANGE "solstice.jobs"
AMQP_QUEUE "solstice.jobs.v1"
AMQP_ROUTING_KEY "jobrequest.v1"
AMQP_PREFETCH 64
FORGEJO_BASE_URL "https://codeberg.org/api/v1"
FORGEJO_TOKEN "token-here"
FORGE_CONTEXT "solstice/ci"
ORCH_HTTP_BASE "http://127.0.0.1:8081"
S3_ENDPOINT "http://127.0.0.1:9000"
S3_BUCKET "solstice-logs"
RUNS_ON_DEFAULT "illumos-latest"
RUNS_ON_MAP "owner1/repo1=ubuntu-22.04,owner2/repo2=illumos-latest"

View file

@ -0,0 +1,14 @@
// Example orchestrator config in KDL. Place at /etc/solstice/orchestrator.kdl
GRPC_ADDR "0.0.0.0:50051"
HTTP_ADDR "0.0.0.0:8081"
DATABASE_URL "postgres://user:pass@localhost:5432/solstice"
AMQP_URL "amqp://127.0.0.1:5672/%2f"
AMQP_EXCHANGE "solstice.jobs"
AMQP_QUEUE "solstice.jobs.v1"
AMQP_ROUTING_KEY "jobrequest.v1"
AMQP_PREFETCH 32
LIBVIRT_URI "qemu:///system"
LIBVIRT_NETWORK "default"
MAX_CONCURRENCY 2
VM_PLACEHOLDER_RUN_SECS 3600
RUNNER_DIR "/var/lib/solstice/runners"