From a1592cd6c99798d4f9bcb8913cb5e94db1c504af98d5a4110228344915d0c75b Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sun, 25 Jan 2026 16:50:52 +0100 Subject: [PATCH] Add GitHub App support, AMQP integration, and webhook enhancements - Extend GitHub webhook handler with signature validation, push, and pull request event handling. - Add GitHub App authentication via JWT and installation token retrieval. - Parse `.solstice/workflow.kdl` for job queuing with `runs_on`, `script`, and job grouping support. - Integrate AMQP consumer for orchestrator results and structured job enqueueing. - Add S3-compatible storage configuration for log uploads. - Refactor CLI options and internal state for improved configuration management. - Enhance dependencies for signature, JSON, and AMQP handling. - Document GitHub integration Signed-off-by: Till Wegmueller --- crates/github-integration/Cargo.toml | 22 +- crates/github-integration/src/main.rs | 1326 ++++++++++++++++- ...025-10-25-github-webhooks-to-jobrequest.md | 170 +++ 3 files changed, 1505 insertions(+), 13 deletions(-) create mode 100644 docs/ai/2025-10-25-github-webhooks-to-jobrequest.md diff --git a/crates/github-integration/Cargo.toml b/crates/github-integration/Cargo.toml index a3aa500..0c0851e 100644 --- a/crates/github-integration/Cargo.toml +++ b/crates/github-integration/Cargo.toml @@ -8,5 +8,25 @@ common = { path = "../common" } clap = { version = "4", features = ["derive", "env"] } miette = { version = "7", features = ["fancy"] } tracing = "0.1" -tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "fs", "io-util", "time"] } +# HTTP + Webhooks axum = { version = "0.8", features = ["macros"] } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls-native-roots"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +# Signature verification +hmac = "0.12" +sha2 = "0.10" +hex = "0.4" +# GitHub App auth +jsonwebtoken = "9" +time = { version = "0.3", features = ["formatting"] } +# AMQP consumer for results +lapin = { version = "2" } +futures-util = "0.3" +# S3/Garage upload +aws-config = { version = "1", default-features = false, features = ["behavior-version-latest", "rt-tokio"] } +aws-sdk-s3 = { version = "1", default-features = false, features = ["rt-tokio", "rustls"] } +# Workflow parsing helpers +base64 = "0.22" +uuid = { version = "1", features = ["v4"] } diff --git a/crates/github-integration/src/main.rs b/crates/github-integration/src/main.rs index 4f1e0f3..e70edd1 100644 --- a/crates/github-integration/src/main.rs +++ b/crates/github-integration/src/main.rs @@ -1,8 +1,40 @@ -use axum::{Router, http::StatusCode, response::IntoResponse, routing::post}; -use clap::Parser; -use miette::Result; +use aws_sdk_s3::primitives::ByteStream; use std::net::SocketAddr; -use tracing::{info, warn}; +use std::sync::Arc; + +use axum::{ + Router, + body::Bytes, + extract::State, + http::{HeaderMap, StatusCode}, + response::IntoResponse, + routing::post, +}; +use base64::Engine; +use clap::{Parser, Subcommand}; +use futures_util::StreamExt; +use hmac::{Hmac, Mac}; +use jsonwebtoken::{EncodingKey, Header}; +use miette::{IntoDiagnostic, Result}; +use serde::{Deserialize, Serialize}; +use sha2::Sha256; +use tracing::{error, info, warn}; + +#[derive(Subcommand, Debug)] +enum Cmd { + /// Enqueue a sample JobRequest (dev/test helper) + Enqueue { + /// Repository URL + #[arg(long)] + repo_url: String, + /// Commit SHA + #[arg(long)] + commit_sha: String, + /// Optional runs_on hint + #[arg(long)] + runs_on: Option, + }, +} #[derive(Parser, Debug)] #[command( @@ -19,6 +51,14 @@ struct Opts { #[arg(long, env = "WEBHOOK_PATH", default_value = "/webhooks/github")] webhook_path: String, + /// GitHub webhook secret + #[arg(long, env = "GITHUB_WEBHOOK_SECRET")] + webhook_secret: Option, + + /// GitHub API base (e.g., https://api.github.com) + #[arg(long, env = "GITHUB_API_BASE", default_value = "https://api.github.com")] + github_api_base: String, + /// GitHub App ID #[arg(long, env = "GITHUB_APP_ID")] app_id: Option, @@ -27,36 +67,1298 @@ struct Opts { #[arg(long, env = "GITHUB_APP_KEY_PATH")] app_key_path: Option, + /// GitHub App private key PEM (alternative to file) + #[arg(long, env = "GITHUB_APP_KEY")] + app_key_pem: Option, + + /// Check run display name + #[arg(long, env = "GITHUB_CHECK_NAME", default_value = "Solstice CI")] + check_name: String, + + /// RabbitMQ URL (AMQP) + #[arg(long, env = "AMQP_URL")] + amqp_url: Option, + + /// Exchange for job requests + #[arg(long, env = "AMQP_EXCHANGE")] + amqp_exchange: Option, + + /// Queue (declared by orchestrator too) + #[arg(long, env = "AMQP_QUEUE")] + amqp_queue: Option, + + /// Routing key for job requests + #[arg(long, env = "AMQP_ROUTING_KEY")] + amqp_routing_key: Option, + /// OTLP endpoint (e.g., http://localhost:4317) #[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")] otlp: Option, + + /// Orchestrator HTTP base for logs (deprecated; use LOGS_BASE_URL) + #[arg(long, env = "ORCH_HTTP_BASE")] // Deprecated + orch_http_base: Option, + + /// Logs service base URL (e.g., http://logs.local:8082) + #[arg(long, env = "LOGS_BASE_URL")] + logs_base_url: Option, + + /// S3-compatible endpoint for Garage/MinIO (e.g., http://localhost:9000) + #[arg(long, env = "S3_ENDPOINT")] + s3_endpoint: Option, + /// Bucket to upload logs into + #[arg(long, env = "S3_BUCKET")] + s3_bucket: Option, + + /// Default runs_on label to use when not specified via labels or repo map + #[arg(long, env = "RUNS_ON_DEFAULT")] + runs_on_default: Option, + /// Per-repo runs_on overrides: comma-separated owner/repo=label pairs + #[arg(long, env = "RUNS_ON_MAP")] + runs_on_map: Option, + + #[command(subcommand)] + cmd: Option, } -async fn handle_github_webhook() -> impl IntoResponse { - // For now, accept and log. Implement signature verification and event handling later. - StatusCode::OK +#[derive(Clone)] +struct AppState { + mq_cfg: common::MqConfig, + webhook_secret: Option, + github_api_base: String, + app_id: Option, + app_key_pem: Option, + check_name: String, + orch_http_base: Option, // deprecated + logs_base_url: Option, + s3_endpoint: Option, + s3_bucket: Option, + runs_on_default: Option, + runs_on_map: std::collections::HashMap, // key: owner/repo } +type HmacSha256 = Hmac; + #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<()> { + // Load internal config (preloads KDL -> env, then reads env) + let app_cfg = common::AppConfig::load("github-integration")?; let _t = common::init_tracing("solstice-github-integration")?; let opts = Opts::parse(); info!(http_addr = %opts.http_addr, path = %opts.webhook_path, "github integration starting"); + // Apply AMQP overrides if provided, starting from AppConfig + let mut mq_cfg = app_cfg.mq.clone(); + if let Some(u) = opts.amqp_url { + mq_cfg.url = u; + } + if let Some(x) = opts.amqp_exchange { + mq_cfg.exchange = x; + } + if let Some(q) = opts.amqp_queue { + mq_cfg.queue = q; + } + if let Some(rk) = opts.amqp_routing_key { + mq_cfg.routing_key = rk; + } + + if let Some(Cmd::Enqueue { + repo_url, + commit_sha, + runs_on, + }) = opts.cmd + { + let mut jr = common::JobRequest::new(common::SourceSystem::Github, repo_url, commit_sha); + jr.runs_on = runs_on; + common::publish_job(&mq_cfg, &jr).await?; + // Print just the request_id on stdout so scripts can capture it reliably. + println!("{}", jr.request_id); + info!(request_id = %jr.request_id, "enqueued job request"); + return Ok(()); + } + + let webhook_secret = opts + .webhook_secret + .or_else(|| std::env::var("WEBHOOK_SECRET").ok()); + if webhook_secret.is_none() { + warn!( + "GITHUB_WEBHOOK_SECRET is not set — accepting webhooks without signature validation (dev mode)" + ); + } + + let app_key_pem = match (&opts.app_key_pem, &opts.app_key_path) { + (Some(pem), _) => Some(pem.clone()), + (None, Some(path)) => Some(std::fs::read_to_string(path).into_diagnostic()?), + (None, None) => None, + }; + + // Parse runs_on overrides map from CLI/env + let runs_on_map = opts + .runs_on_map + .as_deref() + .map(parse_runs_on_map) + .unwrap_or_default(); + + let state = Arc::new(AppState { + mq_cfg, + webhook_secret, + github_api_base: opts.github_api_base, + app_id: opts.app_id, + app_key_pem, + check_name: opts.check_name, + orch_http_base: opts.orch_http_base, + logs_base_url: opts.logs_base_url, + s3_endpoint: opts.s3_endpoint, + s3_bucket: opts.s3_bucket, + runs_on_default: opts.runs_on_default, + runs_on_map, + }); + + // Leak the path string to satisfy 'static requirement for axum route API let path: &'static str = Box::leak(opts.webhook_path.clone().into_boxed_str()); - let app = Router::new().route(path, post(handle_github_webhook)); + + let router = Router::new() + .route(path, post(handle_webhook)) + .with_state(state.clone()); let addr: SocketAddr = opts.http_addr.parse().expect("invalid HTTP_ADDR"); - warn!( - "github-integration webhook endpoint is active but handler is minimal; implement GitHub App flow" - ); + // Start JobResult consumer in background + let state_clone = state.clone(); + tokio::spawn(async move { + if let Err(e) = consume_job_results(state_clone).await { + tracing::error!(error = %e, "job result consumer exited"); + } + }); axum::serve( tokio::net::TcpListener::bind(addr).await.expect("bind"), - app, + router, ) .await .expect("server error"); Ok(()) } + +#[derive(Serialize)] +struct GitHubJwtClaims { + iat: usize, + exp: usize, + iss: String, +} + +fn build_app_jwt(app_id: u64, pem: &str) -> Result { + let now = time::OffsetDateTime::now_utc().unix_timestamp(); + let iat = (now - 60) as usize; + let exp = (now + 9 * 60) as usize; // 9 minutes to avoid 10-minute max + let claims = GitHubJwtClaims { + iat, + exp, + iss: app_id.to_string(), + }; + let key = EncodingKey::from_rsa_pem(pem.as_bytes()).into_diagnostic()?; + let header = Header::new(jsonwebtoken::Algorithm::RS256); + let token = jsonwebtoken::encode(&header, &claims, &key).into_diagnostic()?; + Ok(token) +} + +fn github_client() -> reqwest::Client { + reqwest::Client::new() +} + +async fn get_installation_token(state: &AppState, installation_id: u64) -> Result> { + let (Some(app_id), Some(pem)) = (state.app_id, state.app_key_pem.as_deref()) else { + return Ok(None); + }; + let jwt = build_app_jwt(app_id, pem)?; + let url = format!( + "{}/app/installations/{}/access_tokens", + state.github_api_base.trim_end_matches('/'), + installation_id + ); + let resp = github_client() + .post(&url) + .bearer_auth(jwt) + .header("Accept", "application/vnd.github+json") + .header("X-GitHub-Api-Version", "2022-11-28") + .header("User-Agent", "solstice-ci") + .send() + .await + .into_diagnostic()?; + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + warn!(status = ?status, body = %text, "failed to get installation token"); + return Ok(None); + } + let v: serde_json::Value = resp.json().await.into_diagnostic()?; + Ok(v.get("token").and_then(|t| t.as_str()).map(|s| s.to_string())) +} + +async fn get_installation_id_for_repo( + state: &AppState, + owner: &str, + repo: &str, +) -> Result> { + let (Some(app_id), Some(pem)) = (state.app_id, state.app_key_pem.as_deref()) else { + return Ok(None); + }; + let jwt = build_app_jwt(app_id, pem)?; + let url = format!( + "{}/repos/{}/{}/installation", + state.github_api_base.trim_end_matches('/'), + owner, + repo + ); + let resp = github_client() + .get(&url) + .bearer_auth(jwt) + .header("Accept", "application/vnd.github+json") + .header("X-GitHub-Api-Version", "2022-11-28") + .header("User-Agent", "solstice-ci") + .send() + .await + .into_diagnostic()?; + if resp.status() == StatusCode::NOT_FOUND { + return Ok(None); + } + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + warn!(status = ?status, body = %text, "failed to fetch repo installation"); + return Ok(None); + } + let v: serde_json::Value = resp.json().await.into_diagnostic()?; + Ok(v.get("id").and_then(|id| id.as_u64())) +} + +async fn fetch_workflow_kdl( + state: &AppState, + owner: &str, + repo: &str, + sha: &str, + installation_id: Option, +) -> Result> { + let Some(installation_id) = installation_id else { + return Ok(None); + }; + let Some(token) = get_installation_token(state, installation_id).await? else { + return Ok(None); + }; + let url = format!( + "{}/repos/{}/{}/contents/.solstice/workflow.kdl?ref={}", + state.github_api_base.trim_end_matches('/'), + owner, + repo, + sha + ); + let resp = github_client() + .get(&url) + .bearer_auth(token) + .header("Accept", "application/vnd.github+json") + .header("X-GitHub-Api-Version", "2022-11-28") + .header("User-Agent", "solstice-ci") + .send() + .await + .into_diagnostic()?; + if resp.status().is_success() { + let v: serde_json::Value = resp.json().await.into_diagnostic()?; + if let Some(enc) = v.get("encoding").and_then(|e| e.as_str()) { + if enc.eq_ignore_ascii_case("base64") { + if let Some(content) = v.get("content").and_then(|c| c.as_str()) { + let decoded = base64::engine::general_purpose::STANDARD + .decode(content.replace('\n', "")) + .into_diagnostic()?; + let s = String::from_utf8(decoded).into_diagnostic()?; + return Ok(Some(s)); + } + } + } + } + Ok(None) +} + +#[derive(Debug, Deserialize)] +struct RepoOwner { + #[serde(default)] + login: Option, + #[serde(default)] + name: Option, +} + +#[derive(Debug, Deserialize)] +struct RepoInfo { + #[serde(default)] + clone_url: Option, + #[serde(default)] + ssh_url: Option, + #[serde(default)] + name: Option, + #[serde(default)] + owner: Option, +} + +#[derive(Debug, Deserialize)] +struct InstallationInfo { + id: u64, +} + +#[derive(Debug, Deserialize)] +struct PushPayload { + after: String, + repository: RepoInfo, + #[serde(default)] + installation: Option, +} + +#[derive(Debug, Deserialize)] +struct PrRepoInfo { + #[serde(default)] + clone_url: Option, + #[serde(default)] + ssh_url: Option, + #[serde(default)] + name: Option, + #[serde(default)] + owner: Option, +} + +#[derive(Debug, Deserialize)] +struct PrHead { + sha: String, + repo: PrRepoInfo, +} + +#[derive(Debug, Deserialize)] +struct Label { + name: String, +} + +#[derive(Debug, Deserialize)] +struct PullRequest { + head: PrHead, + #[serde(default)] + labels: Vec