solstice-ci/crates/github-integration/src/main.rs
Till Wegmueller d3841462cf
chore(format): Format code
Signed-off-by: Till Wegmueller <toasterson@gmail.com>
2026-01-25 23:16:36 +01:00

1393 lines
42 KiB
Rust

use aws_sdk_s3::primitives::ByteStream;
use std::net::SocketAddr;
use std::sync::Arc;
use axum::{
Router,
body::Bytes,
extract::State,
http::{HeaderMap, StatusCode},
response::IntoResponse,
routing::post,
};
use base64::Engine;
use clap::{Parser, Subcommand};
use futures_util::StreamExt;
use jsonwebtoken::{EncodingKey, Header};
use miette::{IntoDiagnostic, Result};
use serde::{Deserialize, Serialize};
use tracing::{error, info, warn};
use webhook::{SignatureCheck, SignaturePolicy, WebhookError, WebhookInfo};
#[derive(Subcommand, Debug)]
enum Cmd {
/// Enqueue a sample JobRequest (dev/test helper)
Enqueue {
/// Repository URL
#[arg(long)]
repo_url: String,
/// Commit SHA
#[arg(long)]
commit_sha: String,
/// Optional runs_on hint
#[arg(long)]
runs_on: Option<String>,
},
}
#[derive(Parser, Debug)]
#[command(
name = "solstice-github",
version,
about = "Solstice CI — GitHub Integration (GitHub App)"
)]
struct Opts {
/// HTTP bind address for GitHub webhooks (e.g., 0.0.0.0:8082)
#[arg(long, env = "HTTP_ADDR", default_value = "0.0.0.0:8082")]
http_addr: String,
/// Webhook path (route)
#[arg(long, env = "WEBHOOK_PATH", default_value = "/webhooks/github")]
webhook_path: String,
/// GitHub webhook secret
#[arg(long, env = "GITHUB_WEBHOOK_SECRET")]
webhook_secret: Option<String>,
/// Hookdeck signing secret (proxy signature)
#[arg(long, env = "HOOKDECK_SIGNING_SECRET")]
hookdeck_signing_secret: Option<String>,
/// GitHub API base (e.g., https://api.github.com)
#[arg(
long,
env = "GITHUB_API_BASE",
default_value = "https://api.github.com"
)]
github_api_base: String,
/// GitHub App ID
#[arg(long, env = "GITHUB_APP_ID")]
app_id: Option<u64>,
/// Path to GitHub App private key (PEM)
#[arg(long, env = "GITHUB_APP_KEY_PATH")]
app_key_path: Option<String>,
/// GitHub App private key PEM (alternative to file)
#[arg(long, env = "GITHUB_APP_KEY")]
app_key_pem: Option<String>,
/// Check run display name
#[arg(long, env = "GITHUB_CHECK_NAME", default_value = "Solstice CI")]
check_name: String,
/// RabbitMQ URL (AMQP)
#[arg(long, env = "AMQP_URL")]
amqp_url: Option<String>,
/// Exchange for job requests
#[arg(long, env = "AMQP_EXCHANGE")]
amqp_exchange: Option<String>,
/// Queue (declared by orchestrator too)
#[arg(long, env = "AMQP_QUEUE")]
amqp_queue: Option<String>,
/// Routing key for job requests
#[arg(long, env = "AMQP_ROUTING_KEY")]
amqp_routing_key: Option<String>,
/// OTLP endpoint (e.g., http://localhost:4317)
#[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")]
otlp: Option<String>,
/// Orchestrator HTTP base for logs (deprecated; use LOGS_BASE_URL)
#[arg(long, env = "ORCH_HTTP_BASE")] // Deprecated
orch_http_base: Option<String>,
/// Logs service base URL (e.g., http://logs.local:8082)
#[arg(long, env = "LOGS_BASE_URL")]
logs_base_url: Option<String>,
/// S3-compatible endpoint for Garage/MinIO (e.g., http://localhost:9000)
#[arg(long, env = "S3_ENDPOINT")]
s3_endpoint: Option<String>,
/// Bucket to upload logs into
#[arg(long, env = "S3_BUCKET")]
s3_bucket: Option<String>,
/// Default runs_on label to use when not specified via labels or repo map
#[arg(long, env = "RUNS_ON_DEFAULT")]
runs_on_default: Option<String>,
/// Per-repo runs_on overrides: comma-separated owner/repo=label pairs
#[arg(long, env = "RUNS_ON_MAP")]
runs_on_map: Option<String>,
#[command(subcommand)]
cmd: Option<Cmd>,
}
#[derive(Clone)]
struct AppState {
mq_cfg: common::MqConfig,
webhook_secret: Option<String>,
hookdeck_signing_secret: Option<String>,
github_api_base: String,
app_id: Option<u64>,
app_key_pem: Option<String>,
check_name: String,
orch_http_base: Option<String>, // deprecated
logs_base_url: Option<String>,
s3_endpoint: Option<String>,
s3_bucket: Option<String>,
runs_on_default: Option<String>,
runs_on_map: std::collections::HashMap<String, String>, // key: owner/repo
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
// Load internal config (preloads KDL -> env, then reads env)
let app_cfg = common::AppConfig::load("github-integration")?;
let _t = common::init_tracing("solstice-github-integration")?;
let opts = Opts::parse();
info!(http_addr = %opts.http_addr, path = %opts.webhook_path, "github integration starting");
// Apply AMQP overrides if provided, starting from AppConfig
let mut mq_cfg = app_cfg.mq.clone();
if let Some(u) = opts.amqp_url {
mq_cfg.url = u;
}
if let Some(x) = opts.amqp_exchange {
mq_cfg.exchange = x;
}
if let Some(q) = opts.amqp_queue {
mq_cfg.queue = q;
}
if let Some(rk) = opts.amqp_routing_key {
mq_cfg.routing_key = rk;
}
if let Some(Cmd::Enqueue {
repo_url,
commit_sha,
runs_on,
}) = opts.cmd
{
let mut jr = common::JobRequest::new(common::SourceSystem::Github, repo_url, commit_sha);
jr.runs_on = runs_on;
common::publish_job(&mq_cfg, &jr).await?;
// Print just the request_id on stdout so scripts can capture it reliably.
println!("{}", jr.request_id);
info!(request_id = %jr.request_id, "enqueued job request");
return Ok(());
}
let webhook_secret = opts
.webhook_secret
.or_else(|| std::env::var("WEBHOOK_SECRET").ok());
let hookdeck_signing_secret = opts
.hookdeck_signing_secret
.or_else(|| std::env::var("HOOKDECK_SECRET").ok());
if webhook_secret.is_none() && hookdeck_signing_secret.is_none() {
warn!(
"GITHUB_WEBHOOK_SECRET and HOOKDECK_SIGNING_SECRET are not set — accepting webhooks without signature validation (dev mode)"
);
}
let app_key_pem = match (&opts.app_key_pem, &opts.app_key_path) {
(Some(pem), _) => Some(pem.clone()),
(None, Some(path)) => Some(std::fs::read_to_string(path).into_diagnostic()?),
(None, None) => None,
};
// Parse runs_on overrides map from CLI/env
let runs_on_map = opts
.runs_on_map
.as_deref()
.map(parse_runs_on_map)
.unwrap_or_default();
let state = Arc::new(AppState {
mq_cfg,
webhook_secret,
hookdeck_signing_secret,
github_api_base: opts.github_api_base,
app_id: opts.app_id,
app_key_pem,
check_name: opts.check_name,
orch_http_base: opts.orch_http_base,
logs_base_url: opts.logs_base_url,
s3_endpoint: opts.s3_endpoint,
s3_bucket: opts.s3_bucket,
runs_on_default: opts.runs_on_default,
runs_on_map,
});
// Leak the path string to satisfy 'static requirement for axum route API
let path: &'static str = Box::leak(opts.webhook_path.clone().into_boxed_str());
let router = Router::new()
.route(path, post(handle_webhook))
.with_state(state.clone());
let addr: SocketAddr = opts.http_addr.parse().expect("invalid HTTP_ADDR");
// Start JobResult consumer in background
let state_clone = state.clone();
tokio::spawn(async move {
if let Err(e) = consume_job_results(state_clone).await {
tracing::error!(error = %e, "job result consumer exited");
}
});
axum::serve(
tokio::net::TcpListener::bind(addr).await.expect("bind"),
router,
)
.await
.expect("server error");
Ok(())
}
#[derive(Serialize)]
struct GitHubJwtClaims {
iat: usize,
exp: usize,
iss: String,
}
fn build_app_jwt(app_id: u64, pem: &str) -> Result<String> {
let now = time::OffsetDateTime::now_utc().unix_timestamp();
let iat = (now - 60) as usize;
let exp = (now + 9 * 60) as usize; // 9 minutes to avoid 10-minute max
let claims = GitHubJwtClaims {
iat,
exp,
iss: app_id.to_string(),
};
let key = EncodingKey::from_rsa_pem(pem.as_bytes()).into_diagnostic()?;
let header = Header::new(jsonwebtoken::Algorithm::RS256);
let token = jsonwebtoken::encode(&header, &claims, &key).into_diagnostic()?;
Ok(token)
}
fn github_client() -> reqwest::Client {
reqwest::Client::new()
}
async fn get_installation_token(state: &AppState, installation_id: u64) -> Result<Option<String>> {
let (Some(app_id), Some(pem)) = (state.app_id, state.app_key_pem.as_deref()) else {
return Ok(None);
};
let jwt = build_app_jwt(app_id, pem)?;
let url = format!(
"{}/app/installations/{}/access_tokens",
state.github_api_base.trim_end_matches('/'),
installation_id
);
let resp = github_client()
.post(&url)
.bearer_auth(jwt)
.header("Accept", "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header("User-Agent", "solstice-ci")
.send()
.await
.into_diagnostic()?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!(status = ?status, body = %text, "failed to get installation token");
return Ok(None);
}
let v: serde_json::Value = resp.json().await.into_diagnostic()?;
Ok(v.get("token")
.and_then(|t| t.as_str())
.map(|s| s.to_string()))
}
async fn get_installation_id_for_repo(
state: &AppState,
owner: &str,
repo: &str,
) -> Result<Option<u64>> {
let (Some(app_id), Some(pem)) = (state.app_id, state.app_key_pem.as_deref()) else {
return Ok(None);
};
let jwt = build_app_jwt(app_id, pem)?;
let url = format!(
"{}/repos/{}/{}/installation",
state.github_api_base.trim_end_matches('/'),
owner,
repo
);
let resp = github_client()
.get(&url)
.bearer_auth(jwt)
.header("Accept", "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header("User-Agent", "solstice-ci")
.send()
.await
.into_diagnostic()?;
if resp.status() == StatusCode::NOT_FOUND {
return Ok(None);
}
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!(status = ?status, body = %text, "failed to fetch repo installation");
return Ok(None);
}
let v: serde_json::Value = resp.json().await.into_diagnostic()?;
Ok(v.get("id").and_then(|id| id.as_u64()))
}
async fn fetch_workflow_kdl(
state: &AppState,
owner: &str,
repo: &str,
sha: &str,
installation_id: Option<u64>,
) -> Result<Option<String>> {
let Some(installation_id) = installation_id else {
return Ok(None);
};
let Some(token) = get_installation_token(state, installation_id).await? else {
return Ok(None);
};
let url = format!(
"{}/repos/{}/{}/contents/.solstice/workflow.kdl?ref={}",
state.github_api_base.trim_end_matches('/'),
owner,
repo,
sha
);
let resp = github_client()
.get(&url)
.bearer_auth(token)
.header("Accept", "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header("User-Agent", "solstice-ci")
.send()
.await
.into_diagnostic()?;
if resp.status().is_success() {
let v: serde_json::Value = resp.json().await.into_diagnostic()?;
if let Some(enc) = v.get("encoding").and_then(|e| e.as_str()) {
if enc.eq_ignore_ascii_case("base64") {
if let Some(content) = v.get("content").and_then(|c| c.as_str()) {
let decoded = base64::engine::general_purpose::STANDARD
.decode(content.replace('\n', ""))
.into_diagnostic()?;
let s = String::from_utf8(decoded).into_diagnostic()?;
return Ok(Some(s));
}
}
}
}
Ok(None)
}
#[derive(Debug, Deserialize)]
struct RepoOwner {
#[serde(default)]
login: Option<String>,
#[serde(default)]
name: Option<String>,
}
#[derive(Debug, Deserialize)]
struct RepoInfo {
#[serde(default)]
clone_url: Option<String>,
#[serde(default)]
ssh_url: Option<String>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
owner: Option<RepoOwner>,
}
#[derive(Debug, Deserialize)]
struct InstallationInfo {
id: u64,
}
#[derive(Debug, Deserialize)]
struct PushPayload {
after: String,
repository: RepoInfo,
#[serde(default)]
installation: Option<InstallationInfo>,
}
#[derive(Debug, Deserialize)]
struct PrRepoInfo {
#[serde(default)]
clone_url: Option<String>,
#[serde(default)]
ssh_url: Option<String>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
owner: Option<RepoOwner>,
}
#[derive(Debug, Deserialize)]
struct PrHead {
sha: String,
repo: PrRepoInfo,
}
#[derive(Debug, Deserialize)]
struct Label {
name: String,
}
#[derive(Debug, Deserialize)]
struct PullRequest {
head: PrHead,
#[serde(default)]
labels: Vec<Label>,
}
#[derive(Debug, Deserialize)]
struct PrPayload {
action: String,
#[serde(rename = "pull_request")]
pull_request: PullRequest,
#[serde(default)]
installation: Option<InstallationInfo>,
}
async fn handle_webhook(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
let info = WebhookInfo::from_headers(&headers);
let checks = signature_checks(&state);
let event = info
.github
.event
.as_deref()
.unwrap_or("")
.to_ascii_lowercase();
match event.as_str() {
"push" => match webhook::extract_json::<PushPayload>(
&headers,
body.as_ref(),
&checks,
SignaturePolicy::Any,
) {
Ok(extract) => handle_push(state, extract.payload).await,
Err(err) => map_webhook_error(err),
},
"pull_request" => match webhook::extract_json::<PrPayload>(
&headers,
body.as_ref(),
&checks,
SignaturePolicy::Any,
) {
Ok(extract) => handle_pull_request(state, extract.payload).await,
Err(err) => map_webhook_error(err),
},
"ping" => {
match webhook::verify_signatures(&headers, body.as_ref(), &checks, SignaturePolicy::Any)
{
Ok(_) => StatusCode::OK,
Err(err) => map_webhook_error(WebhookError::Signature(err)),
}
}
_ => StatusCode::NO_CONTENT,
}
}
fn signature_checks(state: &AppState) -> Vec<SignatureCheck> {
let mut checks = Vec::new();
if let Some(secret) = state.webhook_secret.as_deref() {
checks.push(SignatureCheck::github(secret));
}
if let Some(secret) = state.hookdeck_signing_secret.as_deref() {
checks.push(SignatureCheck::hookdeck(secret));
}
checks
}
fn map_webhook_error(err: WebhookError) -> StatusCode {
let (status, message) = match &err {
WebhookError::Signature(_) => (StatusCode::UNAUTHORIZED, "webhook signature check failed"),
WebhookError::Json(_) => (StatusCode::BAD_REQUEST, "failed to parse webhook payload"),
};
let report = miette::Report::new(err);
warn!(error = %report, "{message}");
status
}
async fn handle_push(state: Arc<AppState>, payload: PushPayload) -> StatusCode {
// Ignore delete events (after = all zeros)
let is_delete = payload.after.chars().all(|c| c == '0');
if is_delete {
return StatusCode::NO_CONTENT;
}
let repo_url = pick_repo_url(&payload.repository);
let sha = payload.after;
let (owner, repo_name) = extract_owner_repo(
payload.repository.owner.as_ref(),
payload.repository.name.as_deref(),
&repo_url,
);
let installation_id = payload.installation.as_ref().map(|i| i.id);
match enqueue_jobs(
&state,
repo_url.clone(),
sha.clone(),
None,
owner.clone(),
repo_name.clone(),
installation_id,
)
.await
{
Ok(jobs) => {
if let Some(installation_id) = installation_id {
for job in jobs.iter() {
let details_url = build_details_url(&state, job.request_id);
if let Err(e) = create_check_run(
&state,
installation_id,
owner.as_deref(),
repo_name.as_deref(),
&job.commit_sha,
&job.request_id.to_string(),
job.workflow_job_id.as_deref(),
details_url.as_deref(),
)
.await
{
warn!(error = %e, request_id = %job.request_id, "failed to create check run");
}
}
}
StatusCode::ACCEPTED
}
Err(e) => {
error!(error = %e, "failed to publish jobs");
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
async fn handle_pull_request(state: Arc<AppState>, payload: PrPayload) -> StatusCode {
// Only act on opened/synchronize/reopened
match payload.action.as_str() {
"opened" | "synchronize" | "reopened" => {}
_ => return StatusCode::NO_CONTENT,
}
let repo_url = pick_repo_url_pr(&payload.pull_request.head.repo);
let sha = payload.pull_request.head.sha;
let label_names: Vec<String> = payload
.pull_request
.labels
.into_iter()
.map(|l| l.name)
.collect();
let (owner, repo_name) = extract_owner_repo(
payload.pull_request.head.repo.owner.as_ref(),
payload.pull_request.head.repo.name.as_deref(),
&repo_url,
);
let installation_id = payload.installation.as_ref().map(|i| i.id);
match enqueue_jobs(
&state,
repo_url,
sha,
Some(label_names),
owner.clone(),
repo_name.clone(),
installation_id,
)
.await
{
Ok(jobs) => {
if let Some(installation_id) = installation_id {
for job in jobs.iter() {
let details_url = build_details_url(&state, job.request_id);
if let Err(e) = create_check_run(
&state,
installation_id,
owner.as_deref(),
repo_name.as_deref(),
&job.commit_sha,
&job.request_id.to_string(),
job.workflow_job_id.as_deref(),
details_url.as_deref(),
)
.await
{
warn!(error = %e, request_id = %job.request_id, "failed to create check run");
}
}
}
StatusCode::ACCEPTED
}
Err(e) => {
error!(error = %e, "failed to publish jobs");
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
fn pick_repo_url(repo: &RepoInfo) -> String {
repo.clone_url
.as_deref()
.or(repo.ssh_url.as_deref())
.unwrap_or("")
.to_string()
}
fn pick_repo_url_pr(repo: &PrRepoInfo) -> String {
repo.clone_url
.as_deref()
.or(repo.ssh_url.as_deref())
.unwrap_or("")
.to_string()
}
fn extract_owner_repo(
owner: Option<&RepoOwner>,
repo_name: Option<&str>,
repo_url: &str,
) -> (Option<String>, Option<String>) {
let owner_name = owner
.and_then(|o| o.login.as_ref().or(o.name.as_ref()))
.map(|s| s.to_string());
let repo_name = repo_name.map(|s| s.to_string());
if owner_name.is_some() && repo_name.is_some() {
return (owner_name, repo_name);
}
if let Some((o, r)) = parse_owner_repo(repo_url) {
return (Some(o), Some(r));
}
(owner_name, repo_name)
}
fn build_details_url(state: &AppState, request_id: uuid::Uuid) -> Option<String> {
let base = state
.logs_base_url
.as_ref()
.or(state.orch_http_base.as_ref());
base.map(|b| format!("{}/jobs/{}/logs", b.trim_end_matches('/'), request_id))
}
struct ParsedJob {
id: String,
runs_on: Option<String>,
script: Option<String>,
}
fn parse_workflow_jobs(kdl: &str) -> Vec<ParsedJob> {
// Minimal heuristic parser: find lines starting with `job id="..."` and capture runs_on and an optional `script path="..."` line inside the block.
// This is not a full KDL parser but should work for our simple workflows.
let mut out = Vec::new();
let mut lines = kdl.lines().enumerate().peekable();
while let Some((_i, line)) = lines.next() {
let l = line.trim();
if l.starts_with("job ") && l.contains("id=") {
// capture id and runs_on on the same line if present
let id = capture_attr(l, "id");
let mut runs_on = capture_attr(l, "runs_on");
let mut script: Option<String> = None;
// consume block until closing '}' balancing braces
let mut depth = if l.ends_with('{') { 1 } else { 0 };
while let Some((_j, ln)) = lines.peek().cloned() {
let t = ln.trim();
if t.ends_with('{') {
depth += 1;
}
if t.starts_with('}') {
if depth == 0 {
break;
}
depth -= 1;
if depth == 0 {
lines.next();
break;
}
}
// within job block: look for step or script lines; allow `script path="..."` or `step name="..." run="..."`
if t.starts_with("script ") && t.contains("path=") {
if let Some(p) = capture_attr(t, "path") {
script = Some(p);
}
}
// Also allow runs_on within block as override
if t.contains("runs_on=") && runs_on.is_none() {
runs_on = capture_attr(t, "runs_on");
}
lines.next();
}
if let Some(id_val) = id {
out.push(ParsedJob {
id: id_val,
runs_on,
script,
});
}
}
}
out
}
fn capture_attr(line: &str, key: &str) -> Option<String> {
// Accept key="value" or key='value'
let pattern1 = format!("{}=\"", key);
if let Some(start) = line.find(&pattern1) {
let rest = &line[start + pattern1.len()..];
if let Some(end) = rest.find('"') {
return Some(rest[..end].to_string());
}
}
let pattern2 = format!("{}='", key);
if let Some(start) = line.find(&pattern2) {
let rest = &line[start + pattern2.len()..];
if let Some(end) = rest.find('\'') {
return Some(rest[..end].to_string());
}
}
None
}
async fn enqueue_jobs(
state: &Arc<AppState>,
repo_url: String,
commit_sha: String,
labels: Option<Vec<String>>,
repo_owner: Option<String>,
repo_name: Option<String>,
installation_id: Option<u64>,
) -> Result<Vec<common::JobRequest>> {
use uuid::Uuid;
if repo_url.is_empty() {
miette::bail!("missing repo_url in webhook payload");
}
// Base request (will be cloned per job when a workflow defines multiple jobs)
let mut base = common::JobRequest::new(common::SourceSystem::Github, repo_url, commit_sha);
base.repo_owner = repo_owner;
base.repo_name = repo_name;
if base.repo_owner.is_none() || base.repo_name.is_none() {
if let Some((owner, name)) = parse_owner_repo(&base.repo_url) {
base.repo_owner = base.repo_owner.or(Some(owner));
base.repo_name = base.repo_name.or(Some(name));
}
}
let mut published: Vec<common::JobRequest> = Vec::new();
if let (Some(owner), Some(repo)) = (base.repo_owner.clone(), base.repo_name.clone()) {
if let Ok(Some(kdl)) =
fetch_workflow_kdl(state, &owner, &repo, &base.commit_sha, installation_id).await
{
let jobs = parse_workflow_jobs(&kdl);
if !jobs.is_empty() {
let gid = Uuid::new_v4();
for pj in jobs {
let mut jr = base.clone();
jr.request_id = Uuid::new_v4();
jr.group_id = Some(gid);
jr.workflow_path = Some(".solstice/workflow.kdl".to_string());
jr.workflow_job_id = Some(pj.id);
// runs_on precedence: job-specific -> inferred (labels/map/default)
jr.runs_on = pj.runs_on.clone().or_else(|| {
infer_runs_on(state, &jr.repo_url, labels.as_ref().map(|v| v.as_slice()))
});
jr.script_path = pj.script.clone();
common::publish_job(&state.mq_cfg, &jr).await?;
info!(request_id = %jr.request_id, group_id = ?jr.group_id, repo = %jr.repo_url, sha = %jr.commit_sha, job = ?jr.workflow_job_id, runs_on = ?jr.runs_on, "enqueued workflow job");
published.push(jr);
}
return Ok(published);
}
}
}
// Fallback: no workflow or no jobs parsed — enqueue a single job
base.runs_on = infer_runs_on(state, &base.repo_url, labels.as_ref().map(|v| v.as_slice()));
common::publish_job(&state.mq_cfg, &base).await?;
info!(request_id = %base.request_id, repo = %base.repo_url, sha = %base.commit_sha, runs_on = ?base.runs_on, "enqueued single job (no workflow)");
published.push(base);
Ok(published)
}
fn parse_owner_repo(repo_url: &str) -> Option<(String, String)> {
// Strip .git
let url = repo_url.trim_end_matches(".git");
if let Some(rest) = url
.strip_prefix("https://")
.or_else(|| url.strip_prefix("http://"))
{
let parts: Vec<&str> = rest.split('/').collect();
if parts.len() >= 3 {
return Some((parts[1].to_string(), parts[2].to_string()));
}
} else if let Some(rest) = url.strip_prefix("ssh://") {
// ssh://git@host/owner/repo
let after_host = rest.splitn(2, '/').nth(1)?;
let parts: Vec<&str> = after_host.split('/').collect();
if parts.len() >= 2 {
return Some((parts[0].to_string(), parts[1].to_string()));
}
} else if let Some(idx) = url.find(':') {
// git@host:owner/repo
let after = &url[idx + 1..];
let parts: Vec<&str> = after.split('/').collect();
if parts.len() >= 2 {
return Some((parts[0].to_string(), parts[1].to_string()));
}
}
None
}
fn parse_runs_on_map(s: &str) -> std::collections::HashMap<String, String> {
let mut map = std::collections::HashMap::new();
for part in s.split(',') {
let p = part.trim();
if p.is_empty() {
continue;
}
if let Some((k, v)) = p.split_once('=') {
let key = k.trim().to_string();
let val = v.trim().to_string();
if !key.is_empty() && !val.is_empty() {
map.insert(key, val);
}
}
}
map
}
fn infer_runs_on(state: &AppState, repo_url: &str, labels: Option<&[String]>) -> Option<String> {
// 1) Per-repo override map
if let Some((owner, repo)) = parse_owner_repo(repo_url) {
let key = format!("{}/{}", owner, repo);
if let Some(v) = state.runs_on_map.get(&key) {
return Some(v.clone());
}
}
// 2) From PR labels
if let Some(ls) = labels {
for name in ls {
let n = name.trim();
let lower = n.to_ascii_lowercase();
// patterns: "runs-on: label", "runs-on=label", or "runs-on-label"
if let Some(rest) = lower.strip_prefix("runs-on:") {
let label = rest.trim();
if !label.is_empty() {
return Some(label.to_string());
}
} else if let Some(rest) = lower.strip_prefix("runs-on=") {
let label = rest.trim();
if !label.is_empty() {
return Some(label.to_string());
}
} else if let Some(rest) = lower.strip_prefix("runs-on-") {
let label = rest.trim();
if !label.is_empty() {
return Some(label.to_string());
}
}
}
}
// 3) Default
state.runs_on_default.clone()
}
async fn create_check_run(
state: &AppState,
installation_id: u64,
owner: Option<&str>,
repo: Option<&str>,
sha: &str,
external_id: &str,
job_id: Option<&str>,
details_url: Option<&str>,
) -> Result<()> {
let (Some(owner), Some(repo)) = (owner, repo) else {
return Ok(());
};
let Some(token) = get_installation_token(state, installation_id).await? else {
return Ok(());
};
let name = if let Some(job_id) = job_id {
format!("{} / {}", state.check_name, job_id)
} else {
state.check_name.clone()
};
let url = format!(
"{}/repos/{}/{}/check-runs",
state.github_api_base.trim_end_matches('/'),
owner,
repo
);
let mut body = serde_json::json!({
"name": name,
"head_sha": sha,
"status": "queued",
"external_id": external_id,
"output": {
"title": "Solstice CI",
"summary": "Job queued"
}
});
if let Some(u) = details_url {
body["details_url"] = serde_json::Value::String(u.to_string());
}
let resp = github_client()
.post(&url)
.bearer_auth(token)
.header("Accept", "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header("User-Agent", "solstice-ci")
.json(&body)
.send()
.await
.into_diagnostic()?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!(status = ?status, body = %text, "failed to create check run");
}
Ok(())
}
#[derive(Debug, Deserialize)]
struct CheckRunItem {
id: u64,
#[serde(default)]
external_id: Option<String>,
#[serde(default)]
name: Option<String>,
}
#[derive(Debug, Deserialize)]
struct CheckRunList {
#[serde(default)]
check_runs: Vec<CheckRunItem>,
}
async fn find_check_run_id(
state: &AppState,
token: &str,
owner: &str,
repo: &str,
sha: &str,
external_id: &str,
) -> Result<Option<u64>> {
let url = format!(
"{}/repos/{}/{}/commits/{}/check-runs?per_page=100",
state.github_api_base.trim_end_matches('/'),
owner,
repo,
sha
);
let resp = github_client()
.get(&url)
.bearer_auth(token)
.header("Accept", "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header("User-Agent", "solstice-ci")
.send()
.await
.into_diagnostic()?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!(status = ?status, body = %text, "failed to list check runs");
return Ok(None);
}
let list: CheckRunList = resp.json().await.into_diagnostic()?;
Ok(list
.check_runs
.into_iter()
.find(|c| c.external_id.as_deref() == Some(external_id))
.map(|c| c.id))
}
async fn update_check_run(
state: &AppState,
token: &str,
owner: &str,
repo: &str,
check_run_id: u64,
conclusion: &str,
summary: &str,
details_url: Option<&str>,
) -> Result<()> {
let url = format!(
"{}/repos/{}/{}/check-runs/{}",
state.github_api_base.trim_end_matches('/'),
owner,
repo,
check_run_id
);
let completed_at = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.into_diagnostic()?;
let mut body = serde_json::json!({
"status": "completed",
"conclusion": conclusion,
"completed_at": completed_at,
"output": {
"title": "Solstice CI",
"summary": summary
}
});
if let Some(u) = details_url {
body["details_url"] = serde_json::Value::String(u.to_string());
}
let resp = github_client()
.patch(&url)
.bearer_auth(token)
.header("Accept", "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header("User-Agent", "solstice-ci")
.json(&body)
.send()
.await
.into_diagnostic()?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!(status = ?status, body = %text, "failed to update check run");
}
Ok(())
}
async fn create_completed_check_run(
state: &AppState,
token: &str,
owner: &str,
repo: &str,
sha: &str,
external_id: &str,
conclusion: &str,
summary: &str,
details_url: Option<&str>,
) -> Result<()> {
let url = format!(
"{}/repos/{}/{}/check-runs",
state.github_api_base.trim_end_matches('/'),
owner,
repo
);
let completed_at = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.into_diagnostic()?;
let mut body = serde_json::json!({
"name": state.check_name,
"head_sha": sha,
"status": "completed",
"conclusion": conclusion,
"external_id": external_id,
"completed_at": completed_at,
"output": {
"title": "Solstice CI",
"summary": summary
}
});
if let Some(u) = details_url {
body["details_url"] = serde_json::Value::String(u.to_string());
}
let resp = github_client()
.post(&url)
.bearer_auth(token)
.header("Accept", "application/vnd.github+json")
.header("X-GitHub-Api-Version", "2022-11-28")
.header("User-Agent", "solstice-ci")
.json(&body)
.send()
.await
.into_diagnostic()?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!(status = ?status, body = %text, "failed to create completed check run");
}
Ok(())
}
async fn consume_job_results(state: Arc<AppState>) -> Result<()> {
let url = state.mq_cfg.url.clone();
let exchange = state.mq_cfg.exchange.clone();
let conn = lapin::Connection::connect(&url, lapin::ConnectionProperties::default())
.await
.into_diagnostic()?;
let channel = conn.create_channel().await.into_diagnostic()?;
// Ensure exchange exists (direct)
channel
.exchange_declare(
&exchange,
lapin::ExchangeKind::Direct,
lapin::options::ExchangeDeclareOptions {
durable: true,
auto_delete: false,
internal: false,
nowait: false,
passive: false,
},
lapin::types::FieldTable::default(),
)
.await
.into_diagnostic()?;
// Declare results queue and bind to routing key jobresult.v1
let results_queue =
std::env::var("RESULTS_QUEUE").unwrap_or_else(|_| "solstice.results.v1".into());
channel
.queue_declare(
&results_queue,
lapin::options::QueueDeclareOptions {
durable: true,
auto_delete: false,
exclusive: false,
nowait: false,
passive: false,
},
lapin::types::FieldTable::default(),
)
.await
.into_diagnostic()?;
channel
.queue_bind(
&results_queue,
&exchange,
"jobresult.v1",
lapin::options::QueueBindOptions { nowait: false },
lapin::types::FieldTable::default(),
)
.await
.into_diagnostic()?;
channel
.basic_qos(16, lapin::options::BasicQosOptions { global: false })
.await
.into_diagnostic()?;
let mut consumer = channel
.basic_consume(
&results_queue,
"github-integration",
lapin::options::BasicConsumeOptions {
no_ack: false,
..Default::default()
},
lapin::types::FieldTable::default(),
)
.await
.into_diagnostic()?;
info!(queue = %results_queue, "job results consumer started");
while let Some(delivery) = consumer.next().await {
match delivery {
Ok(d) => {
let tag = d.delivery_tag;
let res: Result<common::messages::JobResult, _> =
serde_json::from_slice(&d.data).into_diagnostic();
match res {
Ok(jobres) => {
if let Err(e) = handle_job_result(&state, &jobres).await {
warn!(error = %e, request_id = %jobres.request_id, "failed to handle JobResult; acking to avoid loops");
}
channel
.basic_ack(tag, lapin::options::BasicAckOptions { multiple: false })
.await
.into_diagnostic()?;
}
Err(e) => {
warn!(error = %e, body = %common::mq::pretty_amqp_body(&d.data), "failed to parse JobResult; acking");
channel
.basic_ack(tag, lapin::options::BasicAckOptions { multiple: false })
.await
.into_diagnostic()?;
}
}
}
Err(e) => {
warn!(error = %e, "consumer error; sleeping");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
}
Ok(())
}
async fn handle_job_result(state: &AppState, jobres: &common::messages::JobResult) -> Result<()> {
let logs_base = state
.logs_base_url
.as_ref()
.or(state.orch_http_base.as_ref());
// Fetch logs
let mut log_text: Option<String> = None;
if let Some(base) = logs_base {
let url = format!(
"{}/jobs/{}/logs",
base.trim_end_matches('/'),
jobres.request_id
);
let resp = reqwest::Client::new()
.get(&url)
.send()
.await
.into_diagnostic()?;
if resp.status().is_success() {
let txt = resp.text().await.into_diagnostic()?;
log_text = Some(txt);
} else {
warn!(status = ?resp.status(), "failed to fetch logs");
}
}
// Upload to S3 if configured and we have logs
let mut target_url: Option<String> = None;
if let (Some(endpoint), Some(bucket), Some(text)) = (
state.s3_endpoint.as_ref(),
state.s3_bucket.as_ref(),
log_text.as_ref(),
) {
if let Ok(url) = upload_to_s3(
endpoint,
bucket,
&format!(
"logs/{}/{}.txt",
jobres.repo_url.replace(':', "/").replace('/', "_"),
jobres.request_id
),
text.as_bytes(),
)
.await
{
target_url = Some(url);
}
}
// Fallback to logs base URL if upload not done
if target_url.is_none() {
if let Some(base) = logs_base {
target_url = Some(format!(
"{}/jobs/{}/logs",
base.trim_end_matches('/'),
jobres.request_id
));
}
}
let (owner, repo) = match (jobres.repo_owner.as_ref(), jobres.repo_name.as_ref()) {
(Some(o), Some(r)) => (Some(o.clone()), Some(r.clone())),
_ => parse_owner_repo(&jobres.repo_url)
.map(|(o, r)| (Some(o), Some(r)))
.unwrap_or((None, None)),
};
let Some(owner) = owner else {
return Ok(());
};
let Some(repo) = repo else {
return Ok(());
};
let installation_id = match get_installation_id_for_repo(state, &owner, &repo).await? {
Some(id) => id,
None => return Ok(()),
};
let Some(token) = get_installation_token(state, installation_id).await? else {
return Ok(());
};
let conclusion = if jobres.success { "success" } else { "failure" };
let summary = jobres.summary.clone().unwrap_or_else(|| {
if jobres.success {
"Job succeeded"
} else {
"Job failed"
}
.to_string()
});
let external_id = jobres.request_id.to_string();
if let Some(check_run_id) = find_check_run_id(
state,
&token,
&owner,
&repo,
&jobres.commit_sha,
&external_id,
)
.await?
{
let _ = update_check_run(
state,
&token,
&owner,
&repo,
check_run_id,
conclusion,
&summary,
target_url.as_deref(),
)
.await;
} else {
let _ = create_completed_check_run(
state,
&token,
&owner,
&repo,
&jobres.commit_sha,
&external_id,
conclusion,
&summary,
target_url.as_deref(),
)
.await;
}
Ok(())
}
async fn upload_to_s3(endpoint: &str, bucket: &str, key: &str, bytes: &[u8]) -> Result<String> {
let loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
.load()
.await;
// Override endpoint and enforce path-style
let conf = aws_sdk_s3::config::Builder::from(&loader)
.endpoint_url(endpoint)
.force_path_style(true)
.build();
let client = aws_sdk_s3::Client::from_conf(conf);
client
.put_object()
.bucket(bucket)
.key(key)
.body(ByteStream::from(bytes.to_vec()))
.content_type("text/plain; charset=utf-8")
.send()
.await
.into_diagnostic()?;
// Construct path-style URL
let url = format!("{}/{}/{}", endpoint.trim_end_matches('/'), bucket, key);
Ok(url)
}