Enable job log persistence, HTTP server, and extend CI/packaging support

This commit introduces:
- Log persistence feature with a new `job_logs` table and related APIs for recording and retrieving job logs.
- An HTTP server for serving log endpoints and job results.
- Updates to the CI pipeline to enable persistence by default and ensure PostgreSQL readiness.
- Docker Compose updates with a Postgres service and MinIO integration for object storage.
- Packaging scripts for Arch Linux, including systemd service units for deployment.
This commit is contained in:
Till Wegmueller 2025-11-02 23:37:11 +01:00
parent 6631ce4d6e
commit 81a93ef1a7
No known key found for this signature in database
22 changed files with 937 additions and 30 deletions

17
.idea/dataSources.xml generated Normal file
View file

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DataSourceManagerImpl" format="xml" multifile-model="true">
<data-source source="LOCAL" name="solstice@localhost" uuid="8be7b89e-6fc5-4e5d-a84a-57129d3a04d4">
<driver-ref>postgresql</driver-ref>
<synchronize>true</synchronize>
<jdbc-driver>org.postgresql.Driver</jdbc-driver>
<jdbc-url>jdbc:postgresql://localhost:5432/solstice</jdbc-url>
<jdbc-additional-properties>
<property name="com.intellij.clouds.kubernetes.db.host.port" />
<property name="com.intellij.clouds.kubernetes.db.enabled" value="false" />
<property name="com.intellij.clouds.kubernetes.db.container.port" />
</jdbc-additional-properties>
<working-dir>$ProjectFileDir$</working-dir>
</data-source>
</component>
</project>

View file

@ -68,6 +68,9 @@ export SOLSTICE_RUNNER_URL=${SOLSTICE_RUNNER_URL:-http://$HOST_IP:$SOL_RUNNER_PO
SERVE_PID=$!
# Start orchestrator in background
# Enable persistence by default in CI: use local Postgres from docker-compose
export ORCH_SKIP_PERSIST=${ORCH_SKIP_PERSIST:-false}
export DATABASE_URL=${DATABASE_URL:-postgres://solstice:solstice@127.0.0.1:5432/solstice}
LOGFILE=${SOL_ORCH_LOG:-"$ROOT_DIR/target/orchestrator.local.log"}
echo "Starting orchestrator... (logs: $LOGFILE)" >&2
(

View file

@ -72,8 +72,9 @@ ILLUMOS_URL="http://$HOST_IP:$SOL_RUNNER_PORT_ILLUMOS/solstice-runner-illumos"
export SOLSTICE_RUNNER_URLS="$LINUX_URL $ILLUMOS_URL"
# Start orchestrator in background (inherits env including SOLSTICE_RUNNER_URLS/ORCH_CONTACT_ADDR)
# Speed up startup by skipping persistence unless explicitly disabled
export ORCH_SKIP_PERSIST=${ORCH_SKIP_PERSIST:-true}
# Enable persistence by default in CI: use local Postgres from docker-compose
export ORCH_SKIP_PERSIST=${ORCH_SKIP_PERSIST:-false}
export DATABASE_URL=${DATABASE_URL:-postgres://solstice:solstice@127.0.0.1:5432/solstice}
LOGFILE=${SOL_ORCH_LOG:-"$ROOT_DIR/target/orchestrator.vm-build.log"}
echo "Starting orchestrator... (logs: $LOGFILE)" >&2
(

View file

@ -1,11 +1,11 @@
#!/usr/bin/env bash
set -euo pipefail
# Start local development dependencies (RabbitMQ) via docker compose
# Start local development dependencies (RabbitMQ + Postgres) via docker compose
if command -v docker >/dev/null 2>&1; then
if command -v docker-compose >/dev/null 2>&1; then
exec docker-compose up -d rabbitmq
exec docker-compose up -d rabbitmq postgres
else
exec docker compose up -d rabbitmq
exec docker compose up -d rabbitmq postgres
fi
elif command -v podman >/dev/null 2>&1; then
echo "Podman detected but this project uses docker-compose file; please use Docker or translate to podman-compose" >&2

24
.mise/tasks/pkg/build Normal file
View file

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -euo pipefail
# Build Arch packages for Solstice CI components.
# Requires: Arch Linux with base-devel, rust, cargo, makepkg.
# Outputs: pkg files under packaging/arch/*/*.pkg.tar.*
ROOT_DIR=$(cd "$(dirname "$0")/../../../" && pwd)
cd "$ROOT_DIR"
# Create a clean source tarball of the repository
TARBALL="solstice-ci.tar.gz"
TMPDIR=$(mktemp -d)
trap 'rm -rf "$TMPDIR"' EXIT
git ls-files -z | tar --null -czf "$TMPDIR/$TARBALL" -T -
for pkg in solstice-orchestrator solstice-forge-integration; do
PKG_DIR="$ROOT_DIR/packaging/arch/$pkg"
mkdir -p "$PKG_DIR"
cp "$TMPDIR/$TARBALL" "$PKG_DIR/$TARBALL"
( cd "$PKG_DIR" && makepkg -fC --noconfirm )
echo "Built package(s) in $PKG_DIR:" >&2
ls -1 "$PKG_DIR"/*.pkg.tar.* 2>/dev/null || true
done

43
.mise/tasks/pkg/install Normal file
View file

@ -0,0 +1,43 @@
#!/usr/bin/env bash
set -euo pipefail
# Install built Arch packages and enable services
# Requires: sudo privileges for pacman and systemctl
ROOT_DIR=$(cd "$(dirname "$0")/../../../" && pwd)
cd "$ROOT_DIR"
shopt -s nullglob
PKGS=()
for p in packaging/arch/solstice-orchestrator/*.pkg.tar.* packaging/arch/solstice-forge-integration/*.pkg.tar.*; do
PKGS+=("$p")
done
if [[ ${#PKGS[@]} -eq 0 ]]; then
echo "No packages found. Build first: mise run pkg:build" >&2
exit 1
fi
echo "Installing: ${PKGS[*]}" >&2
sudo pacman -U --noconfirm "${PKGS[@]}"
# Place example env files if not present
sudo install -d -m 755 /etc/solstice
if [[ ! -f /etc/solstice/orchestrator.env ]]; then
sudo install -m 644 packaging/arch/solstice-orchestrator/orchestrator.env.example /etc/solstice/orchestrator.env
fi
if [[ ! -f /etc/solstice/forge-integration.env ]]; then
sudo install -m 644 packaging/arch/solstice-forge-integration/forge-integration.env.example /etc/solstice/forge-integration.env
fi
# Ship example orchestrator image map if none present
if [[ ! -f /etc/solstice/orchestrator-image-map.yaml ]]; then
sudo install -m 644 examples/orchestrator-image-map.yaml /etc/solstice/orchestrator-image-map.yaml
fi
# Reload systemd and optionally enable services
sudo systemctl daemon-reload
if [[ "${SOL_ENABLE_SERVICES:-1}" == "1" ]]; then
sudo systemctl enable --now solstice-orchestrator.service || true
sudo systemctl enable --now solstice-forge-integration.service || true
fi
echo "Done. Adjust /etc/solstice/*.env and /etc/solstice/orchestrator-image-map.yaml as needed." >&2

12
.mise/tasks/setup/install Normal file
View file

@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail
# Build Solstice CI Arch packages and install them with systemd units
# Usage:
# mise run setup:install # build packages, install, and enable services
# SOL_ENABLE_SERVICES=0 mise run setup:install # build and install without enabling
ROOT_DIR=$(cd "$(dirname "$0")/../../../" && pwd)
cd "$ROOT_DIR"
"$ROOT_DIR/.mise/tasks/pkg/build"
"$ROOT_DIR/.mise/tasks/pkg/install"

View file

@ -8,12 +8,19 @@ common = { path = "../common" }
clap = { version = "4", features = ["derive", "env"] }
miette = { version = "7", features = ["fancy"] }
tracing = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "fs", "io-util", "time"] }
# HTTP + Webhooks
axum = { version = "0.8", features = ["macros"] }
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls-native-roots"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# Signature verification
hmac = "0.12"
sha2 = "0.10"
hex = "0.4"
# AMQP consumer for results
lapin = { version = "2" }
futures-util = "0.3"
# S3/Garage upload
aws-config = { version = "1", default-features = false, features = ["behavior-version-latest", "rt-tokio"] }
aws-sdk-s3 = { version = "1", default-features = false, features = ["rt-tokio", "rustls"] }

View file

@ -1,5 +1,6 @@
use std::net::SocketAddr;
use std::sync::Arc;
use aws_sdk_s3::primitives::ByteStream;
use axum::{
Router,
@ -11,10 +12,11 @@ use axum::{
};
use clap::{Parser, Subcommand};
use hmac::{Hmac, Mac};
use miette::Result;
use miette::{Result, IntoDiagnostic};
use serde::Deserialize;
use sha2::Sha256;
use tracing::{error, info, warn};
use futures_util::StreamExt;
#[derive(Subcommand, Debug)]
enum Cmd {
@ -71,6 +73,34 @@ struct Opts {
#[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")]
otlp: Option<String>,
/// Forgejo API base (e.g., https://codeberg.org/api/v1)
#[arg(long, env = "FORGEJO_BASE_URL")]
forgejo_base: Option<String>,
/// Forgejo token (PAT or app token)
#[arg(long, env = "FORGEJO_TOKEN")]
forgejo_token: Option<String>,
/// Commit status context
#[arg(long, env = "FORGE_CONTEXT", default_value = "solstice/ci")]
forge_context: String,
/// Orchestrator HTTP base for logs (e.g., http://localhost:8081)
#[arg(long, env = "ORCH_HTTP_BASE")]
orch_http_base: Option<String>,
/// S3-compatible endpoint for Garage/MinIO (e.g., http://localhost:9000)
#[arg(long, env = "S3_ENDPOINT")]
s3_endpoint: Option<String>,
/// Bucket to upload logs into
#[arg(long, env = "S3_BUCKET")]
s3_bucket: Option<String>,
/// Default runs_on label to use when not specified via labels or repo map
#[arg(long, env = "RUNS_ON_DEFAULT")]
runs_on_default: Option<String>,
/// Per-repo runs_on overrides: comma-separated owner/repo=label pairs
#[arg(long, env = "RUNS_ON_MAP")]
runs_on_map: Option<String>,
#[command(subcommand)]
cmd: Option<Cmd>,
}
@ -79,6 +109,14 @@ struct Opts {
struct AppState {
mq_cfg: common::MqConfig,
webhook_secret: Option<String>,
forgejo_base: Option<String>,
forgejo_token: Option<String>,
forge_context: String,
orch_http_base: Option<String>,
s3_endpoint: Option<String>,
s3_bucket: Option<String>,
runs_on_default: Option<String>,
runs_on_map: std::collections::HashMap<String, String>, // key: owner/repo
}
type HmacSha256 = Hmac<Sha256>;
@ -125,9 +163,24 @@ async fn main() -> Result<()> {
);
}
// Parse runs_on overrides map from CLI/env
let runs_on_map = opts
.runs_on_map
.as_deref()
.map(parse_runs_on_map)
.unwrap_or_default();
let state = Arc::new(AppState {
mq_cfg,
webhook_secret: opts.webhook_secret,
forgejo_base: opts.forgejo_base,
forgejo_token: opts.forgejo_token,
forge_context: opts.forge_context,
orch_http_base: opts.orch_http_base,
s3_endpoint: opts.s3_endpoint,
s3_bucket: opts.s3_bucket,
runs_on_default: opts.runs_on_default,
runs_on_map,
});
// Leak the path string to satisfy 'static requirement for axum route API
@ -138,6 +191,14 @@ async fn main() -> Result<()> {
.with_state(state.clone());
let addr: SocketAddr = opts.http_addr.parse().expect("invalid HTTP_ADDR");
// Start JobResult consumer in background
let state_clone = state.clone();
tokio::spawn(async move {
if let Err(e) = consume_job_results(state_clone).await {
tracing::error!(error = %e, "job result consumer exited");
}
});
axum::serve(
tokio::net::TcpListener::bind(addr).await.expect("bind"),
router,
@ -148,6 +209,278 @@ async fn main() -> Result<()> {
Ok(())
}
async fn post_commit_status(
base: &str,
token: &str,
repo_url: &str,
sha: &str,
context: &str,
state: &str,
target_url: Option<&str>,
description: Option<&str>,
) -> Result<()> {
// Extract owner/repo from repo_url (supports https://.../owner/repo.git and ssh://git@host/owner/repo.git)
let (owner, repo) = parse_owner_repo(repo_url).ok_or_else(|| miette::miette!("cannot parse owner/repo from repo_url: {repo_url}"))?;
let api = format!("{}/repos/{}/{}/statuses/{}", base.trim_end_matches('/'), owner, repo, sha);
let mut body = serde_json::json!({
"state": state,
"context": context,
});
if let Some(u) = target_url { body["target_url"] = serde_json::Value::String(u.to_string()); }
if let Some(d) = description { body["description"] = serde_json::Value::String(d.to_string()); }
let client = reqwest::Client::new();
let resp = client.post(&api)
.bearer_auth(token)
.json(&body)
.send()
.await
.into_diagnostic()?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
tracing::warn!(status = ?status, body = %text, "forgejo status post failed");
}
Ok(())
}
async fn consume_job_results(state: Arc<AppState>) -> Result<()> {
// Only start if we have at least Forgejo base+token to post statuses
if state.forgejo_base.is_none() || state.forgejo_token.is_none() {
warn!("FORGEJO_* not set; job result consumer disabled");
return Ok(());
}
let url = state.mq_cfg.url.clone();
let exchange = state.mq_cfg.exchange.clone();
let conn = lapin::Connection::connect(&url, lapin::ConnectionProperties::default())
.await
.into_diagnostic()?;
let channel = conn.create_channel().await.into_diagnostic()?;
// Ensure exchange exists (direct)
channel
.exchange_declare(
&exchange,
lapin::ExchangeKind::Direct,
lapin::options::ExchangeDeclareOptions {
durable: true,
auto_delete: false,
internal: false,
nowait: false,
passive: false,
},
lapin::types::FieldTable::default(),
)
.await
.into_diagnostic()?;
// Declare results queue and bind to routing key jobresult.v1
let results_queue = std::env::var("RESULTS_QUEUE").unwrap_or_else(|_| "solstice.results.v1".into());
channel
.queue_declare(
&results_queue,
lapin::options::QueueDeclareOptions { durable: true, auto_delete: false, exclusive: false, nowait: false, passive: false },
lapin::types::FieldTable::default(),
)
.await
.into_diagnostic()?;
channel
.queue_bind(
&results_queue,
&exchange,
"jobresult.v1",
lapin::options::QueueBindOptions { nowait: false },
lapin::types::FieldTable::default(),
)
.await
.into_diagnostic()?;
channel
.basic_qos(16, lapin::options::BasicQosOptions { global: false })
.await
.into_diagnostic()?;
let mut consumer = channel
.basic_consume(
&results_queue,
"forge-integration",
lapin::options::BasicConsumeOptions { no_ack: false, ..Default::default() },
lapin::types::FieldTable::default(),
)
.await
.into_diagnostic()?;
info!(queue = %results_queue, "job results consumer started");
while let Some(delivery) = consumer.next().await {
match delivery {
Ok(d) => {
let tag = d.delivery_tag;
let res: Result<common::messages::JobResult, _> = serde_json::from_slice(&d.data).into_diagnostic();
match res {
Ok(jobres) => {
if let Err(e) = handle_job_result(&state, &jobres).await {
warn!(error = %e, request_id = %jobres.request_id, "failed to handle JobResult; acking to avoid loops");
}
channel
.basic_ack(tag, lapin::options::BasicAckOptions { multiple: false })
.await
.into_diagnostic()?;
}
Err(e) => {
warn!(error = %e, "failed to parse JobResult; acking");
channel
.basic_ack(tag, lapin::options::BasicAckOptions { multiple: false })
.await
.into_diagnostic()?;
}
}
}
Err(e) => {
warn!(error = %e, "consumer error; sleeping");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
}
Ok(())
}
async fn handle_job_result(state: &AppState, jobres: &common::messages::JobResult) -> Result<()> {
// Fetch logs
let mut log_text: Option<String> = None;
if let Some(base) = state.orch_http_base.as_ref() {
let url = format!("{}/jobs/{}/logs", base.trim_end_matches('/'), jobres.request_id);
let resp = reqwest::Client::new().get(&url).send().await.into_diagnostic()?;
if resp.status().is_success() {
let txt = resp.text().await.into_diagnostic()?;
log_text = Some(txt);
} else {
warn!(status = ?resp.status(), "failed to fetch logs from orchestrator HTTP");
}
}
// Upload to S3 if configured and we have logs
let mut target_url: Option<String> = None;
if let (Some(endpoint), Some(bucket), Some(text)) = (state.s3_endpoint.as_ref(), state.s3_bucket.as_ref(), log_text.as_ref()) {
if let Ok(url) = upload_to_s3(endpoint, bucket, &format!("logs/{}/{}.txt", jobres.repo_url.replace(':', "/").replace('/', "_"), jobres.request_id), text.as_bytes()).await {
target_url = Some(url);
}
}
// Fallback to orchestrator log URL if upload not done
if target_url.is_none() {
if let Some(base) = state.orch_http_base.as_ref() {
target_url = Some(format!("{}/jobs/{}/logs", base.trim_end_matches('/'), jobres.request_id));
}
}
// Post final status to Forgejo
let state_str = if jobres.success { "success" } else { "failure" };
if let (Some(base), Some(token)) = (state.forgejo_base.as_ref(), state.forgejo_token.as_ref()) {
let desc = if jobres.success { Some("Job succeeded") } else { Some("Job failed") };
let _ = post_commit_status(
base,
token,
&jobres.repo_url,
&jobres.commit_sha,
&state.forge_context,
state_str,
target_url.as_deref(),
desc,
)
.await;
}
Ok(())
}
async fn upload_to_s3(endpoint: &str, bucket: &str, key: &str, bytes: &[u8]) -> Result<String> {
let loader = aws_config::defaults(aws_config::BehaviorVersion::latest()).load().await;
// Override endpoint and enforce path-style
let conf = aws_sdk_s3::config::Builder::from(&loader)
.endpoint_url(endpoint)
.force_path_style(true)
.build();
let client = aws_sdk_s3::Client::from_conf(conf);
client
.put_object()
.bucket(bucket)
.key(key)
.body(ByteStream::from(bytes.to_vec()))
.content_type("text/plain; charset=utf-8")
.send()
.await
.into_diagnostic()?;
// Construct path-style URL
let url = format!("{}/{}/{}", endpoint.trim_end_matches('/'), bucket, key);
Ok(url)
}
fn parse_owner_repo(repo_url: &str) -> Option<(String, String)> {
// Strip .git
let url = repo_url.trim_end_matches(".git");
if let Some(rest) = url.strip_prefix("https://").or_else(|| url.strip_prefix("http://")) {
let parts: Vec<&str> = rest.split('/').collect();
if parts.len() >= 3 { return Some((parts[1].to_string(), parts[2].to_string())); }
} else if let Some(rest) = url.strip_prefix("ssh://") {
// ssh://git@host/owner/repo
let after_host = rest.splitn(2, '/').nth(1)?;
let parts: Vec<&str> = after_host.split('/').collect();
if parts.len() >= 2 { return Some((parts[0].to_string(), parts[1].to_string())); }
} else if let Some(idx) = url.find(':') {
// git@host:owner/repo
let after = &url[idx+1..];
let parts: Vec<&str> = after.split('/').collect();
if parts.len() >= 2 { return Some((parts[0].to_string(), parts[1].to_string())); }
}
None
}
fn parse_runs_on_map(s: &str) -> std::collections::HashMap<String, String> {
let mut map = std::collections::HashMap::new();
for part in s.split(',') {
let p = part.trim();
if p.is_empty() { continue; }
if let Some((k, v)) = p.split_once('=') {
let key = k.trim().to_string();
let val = v.trim().to_string();
if !key.is_empty() && !val.is_empty() {
map.insert(key, val);
}
}
}
map
}
fn infer_runs_on(state: &AppState, repo_url: &str, labels: Option<&[String]>) -> Option<String> {
// 1) Per-repo override map
if let Some((owner, repo)) = parse_owner_repo(repo_url) {
let key = format!("{}/{}", owner, repo);
if let Some(v) = state.runs_on_map.get(&key) {
return Some(v.clone());
}
}
// 2) From PR labels
if let Some(ls) = labels {
for name in ls {
let n = name.trim();
let lower = n.to_ascii_lowercase();
// patterns: "runs-on: label", "runs-on=label", or "runs-on-label"
if let Some(rest) = lower.strip_prefix("runs-on:") {
let label = rest.trim();
if !label.is_empty() { return Some(label.to_string()); }
} else if let Some(rest) = lower.strip_prefix("runs-on=") {
let label = rest.trim();
if !label.is_empty() { return Some(label.to_string()); }
} else if let Some(rest) = lower.strip_prefix("runs-on-") {
let label = rest.trim();
if !label.is_empty() { return Some(label.to_string()); }
}
}
}
// 3) Default
state.runs_on_default.clone()
}
async fn handle_webhook(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
@ -233,8 +566,27 @@ async fn handle_push(state: Arc<AppState>, body: Bytes) -> StatusCode {
let repo_url = pick_repo_url(&payload.repository);
let sha = payload.after;
match enqueue_job(&state, repo_url, sha).await {
Ok(_) => StatusCode::ACCEPTED,
match enqueue_job(&state, repo_url.clone(), sha.clone(), None).await {
Ok(jr) => {
if let (Some(base), Some(token), Some(orch)) = (
state.forgejo_base.as_ref(),
state.forgejo_token.as_ref(),
state.orch_http_base.as_ref(),
) {
let _ = post_commit_status(
base,
token,
&jr.repo_url,
&jr.commit_sha,
&state.forge_context,
"pending",
Some(&format!("{}/jobs/{}/logs", orch.trim_end_matches('/'), jr.request_id)),
Some("Solstice job queued"),
)
.await;
}
StatusCode::ACCEPTED
}
Err(e) => {
error!(error = %e, "failed to publish job");
StatusCode::INTERNAL_SERVER_ERROR
@ -256,9 +608,14 @@ struct PrHead {
repo: PrRepoInfo,
}
#[derive(Debug, Deserialize)]
struct Label { name: String }
#[derive(Debug, Deserialize)]
struct PullRequest {
head: PrHead,
#[serde(default)]
labels: Vec<Label>,
}
#[derive(Debug, Deserialize)]
@ -285,8 +642,14 @@ async fn handle_pull_request(state: Arc<AppState>, body: Bytes) -> StatusCode {
let repo_url = pick_repo_url_pr(&payload.pull_request.head.repo);
let sha = payload.pull_request.head.sha;
let label_names: Vec<String> = payload
.pull_request
.labels
.into_iter()
.map(|l| l.name)
.collect();
match enqueue_job(&state, repo_url, sha).await {
match enqueue_job(&state, repo_url, sha, Some(label_names)).await {
Ok(_) => StatusCode::ACCEPTED,
Err(e) => {
error!(error = %e, "failed to publish job");
@ -311,13 +674,14 @@ fn pick_repo_url_pr(repo: &PrRepoInfo) -> String {
.to_string()
}
async fn enqueue_job(state: &Arc<AppState>, repo_url: String, commit_sha: String) -> Result<()> {
async fn enqueue_job(state: &Arc<AppState>, repo_url: String, commit_sha: String, labels: Option<Vec<String>>) -> Result<common::JobRequest> {
if repo_url.is_empty() {
miette::bail!("missing repo_url in webhook payload");
}
let jr = common::JobRequest::new(common::SourceSystem::Forgejo, repo_url, commit_sha);
// TODO: infer runs_on from repo defaults or labels
let mut jr = common::JobRequest::new(common::SourceSystem::Forgejo, repo_url, commit_sha);
// Infer runs_on from repo map, labels, or default
jr.runs_on = infer_runs_on(state, &jr.repo_url, labels.as_ref().map(|v| v.as_slice()));
common::publish_job(&state.mq_cfg, &jr).await?;
info!(request_id = %jr.request_id, repo = %jr.repo_url, sha = %jr.commit_sha, "enqueued job from webhook");
Ok(())
info!(request_id = %jr.request_id, repo = %jr.repo_url, sha = %jr.commit_sha, runs_on = ?jr.runs_on, "enqueued job from webhook");
Ok(jr)
}

View file

@ -8,6 +8,7 @@ impl MigratorTrait for Migrator {
vec![
Box::new(m2025_10_25_000001_create_jobs::Migration),
Box::new(m2025_10_25_000002_create_vms::Migration),
Box::new(m2025_11_02_000003_create_job_logs::Migration),
]
}
}
@ -15,9 +16,14 @@ impl MigratorTrait for Migrator {
mod m2025_10_25_000001_create_jobs {
use super::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
impl sea_orm_migration::prelude::MigrationName for Migration {
fn name(&self) -> &str {
"m2025_10_25_000001_create_jobs"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
@ -74,9 +80,14 @@ mod m2025_10_25_000001_create_jobs {
mod m2025_10_25_000002_create_vms {
use super::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
impl sea_orm_migration::prelude::MigrationName for Migration {
fn name(&self) -> &str {
"m2025_10_25_000002_create_vms"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
@ -126,3 +137,72 @@ mod m2025_10_25_000002_create_vms {
UpdatedAt,
}
}
mod m2025_11_02_000003_create_job_logs {
use super::*;
pub struct Migration;
impl sea_orm_migration::prelude::MigrationName for Migration {
fn name(&self) -> &str {
"m2025_11_02_000003_create_job_logs"
}
}
#[derive(Iden)]
enum JobLogs {
Table,
RequestId,
Seq,
Ts,
Stderr,
Line,
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(JobLogs::Table)
.if_not_exists()
.col(ColumnDef::new(JobLogs::RequestId).uuid().not_null())
.col(ColumnDef::new(JobLogs::Seq).big_integer().not_null())
.col(
ColumnDef::new(JobLogs::Ts)
.timestamp_with_time_zone()
.not_null()
.default(Expr::current_timestamp()),
)
.col(ColumnDef::new(JobLogs::Stderr).boolean().not_null())
.col(ColumnDef::new(JobLogs::Line).text().not_null())
.primary_key(
Index::create()
.name("pk_job_logs")
.col(JobLogs::RequestId)
.col(JobLogs::Seq),
)
.to_owned(),
)
.await?;
// helpful index for retrieval by request
manager
.create_index(
Index::create()
.name("idx_job_logs_request")
.table(JobLogs::Table)
.col(JobLogs::RequestId)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(JobLogs::Table).to_owned())
.await
}
}
}

View file

@ -11,7 +11,6 @@ libvirt = []
common = { path = "../common" }
clap = { version = "4", features = ["derive", "env"] }
miette = { version = "7", features = ["fancy"] }
thiserror = "2"
tracing = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "fs", "io-util"] }
serde = { version = "1", features = ["derive"] }
@ -19,8 +18,8 @@ serde_json = "1"
serde_yaml = "0.9"
config = { version = "0.15", default-features = false, features = ["yaml"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2", "gzip", "brotli", "zstd"] }
bytes = "1"
path-absolutize = "3"
# HTTP server for logs
axum = { version = "0.8", features = ["macros"] }
# gRPC server
tonic = { version = "0.14", features = ["transport"] }
# Compression/decompression
@ -31,7 +30,6 @@ migration = { path = "../migration" }
sea-orm-migration = { version = "1.1.17" }
chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] }
# Utilities
once_cell = "1"
dashmap = "6"
async-trait = "0.1"
uuid = { version = "1", features = ["v4", "serde"] }

View file

@ -9,14 +9,18 @@ use common::runner::v1::{
runner_server::{Runner, RunnerServer},
};
use common::{MqConfig, publish_job_result};
use std::sync::Arc;
use crate::persist::Persist;
pub struct RunnerSvc {
mq_cfg: MqConfig,
persist: Arc<Persist>,
}
impl RunnerSvc {
pub fn new(mq_cfg: MqConfig) -> Self {
Self { mq_cfg }
pub fn new(mq_cfg: MqConfig, persist: Arc<Persist>) -> Self {
Self { mq_cfg, persist }
}
}
@ -35,6 +39,7 @@ impl Runner for RunnerSvc {
let mut lines_stdout: usize = 0;
let mut lines_stderr: usize = 0;
let mut got_end: bool = false;
let mut seq: i64 = 0;
info!("runner log stream opened");
while let Some(item) = stream
@ -65,6 +70,13 @@ impl Runner for RunnerSvc {
lines_stdout += 1;
info!(request_id = %item.request_id, line = %chunk.line, "runner:stdout");
}
// Best effort: persist the line if we have a parsed UUID
if let Some(id) = req_id {
if let Err(e) = self.persist.record_log_line(id, seq, chunk.stderr, &chunk.line).await {
warn!(error = %e, request_id = %id, seq = seq, "failed to persist log line");
}
seq += 1;
}
}
common::runner::v1::log_item::Event::End(end) => {
exit_code = end.exit_code;
@ -110,11 +122,12 @@ impl Runner for RunnerSvc {
pub async fn serve_with_shutdown(
addr: SocketAddr,
mq_cfg: MqConfig,
persist: Arc<Persist>,
shutdown: impl std::future::Future<Output = ()>,
) -> Result<()> {
info!(%addr, "gRPC server starting");
tonic::transport::Server::builder()
.add_service(RunnerServer::new(RunnerSvc::new(mq_cfg)))
.add_service(RunnerServer::new(RunnerSvc::new(mq_cfg, persist)))
.serve_with_shutdown(addr, shutdown)
.await
.into_diagnostic()

View file

@ -0,0 +1,55 @@
use axum::{extract::Path, http::StatusCode, response::{IntoResponse, Response}, routing::get, Router};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{info, warn};
use uuid::Uuid;
use crate::persist::Persist;
#[derive(Clone)]
pub struct HttpState {
persist: Arc<Persist>,
}
pub fn build_router(persist: Arc<Persist>) -> Router {
let state = HttpState { persist };
Router::new()
.route("/jobs/:request_id/logs", get(get_logs))
.with_state(state)
}
async fn get_logs(
Path(request_id): Path<String>,
axum::extract::State(state): axum::extract::State<HttpState>,
) -> Response {
let Ok(id) = Uuid::parse_str(&request_id) else {
return StatusCode::BAD_REQUEST.into_response();
};
if !state.persist.is_enabled() {
return (StatusCode::SERVICE_UNAVAILABLE, "persistence disabled").into_response();
}
match state.persist.get_logs_text(id).await {
Ok(Some(text)) => (
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, "text/plain; charset=utf-8")],
text,
)
.into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => {
warn!(error = %e, request_id = %id, "failed to read logs");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
pub async fn serve(addr: SocketAddr, persist: Arc<Persist>, shutdown: impl std::future::Future<Output = ()>) {
let app = build_router(persist);
info!(%addr, "http server starting");
let listener = tokio::net::TcpListener::bind(addr).await.expect("bind http");
let server = axum::serve(listener, app);
let _ = tokio::select! {
_ = server => {},
_ = shutdown => {},
};
}

View file

@ -3,6 +3,7 @@ mod grpc;
mod hypervisor;
mod persist;
mod scheduler;
mod http;
use std::{collections::HashMap, path::PathBuf, time::Duration};
@ -87,6 +88,10 @@ struct Opts {
/// Placeholder VM run time in seconds (temporary until agent wiring)
#[arg(long, env = "VM_PLACEHOLDER_RUN_SECS", default_value_t = 3600)]
vm_placeholder_run_secs: u64,
/// HTTP listen address for exposing basic endpoints (logs)
#[arg(long, env = "HTTP_ADDR", default_value = "0.0.0.0:8081")]
http_addr: String,
}
#[tokio::main(flavor = "multi_thread")]
@ -126,9 +131,10 @@ async fn main() -> Result<()> {
// Start gRPC server for runner log streaming
let grpc_addr: std::net::SocketAddr = opts.grpc_addr.parse().into_diagnostic()?;
let mq_cfg_for_grpc = mq_cfg.clone();
let persist_for_grpc = persist.clone();
let (grpc_shutdown_tx, grpc_shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let grpc_task = tokio::spawn(async move {
let _ = crate::grpc::serve_with_shutdown(grpc_addr, mq_cfg_for_grpc, async move {
let _ = crate::grpc::serve_with_shutdown(grpc_addr, mq_cfg_for_grpc, persist_for_grpc, async move {
let _ = grpc_shutdown_rx.await;
})
.await;
@ -212,6 +218,16 @@ async fn main() -> Result<()> {
}).await
});
// Start HTTP server
let http_addr: std::net::SocketAddr = opts.http_addr.parse().into_diagnostic()?;
let persist_for_http = persist.clone();
let (http_shutdown_tx, http_shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let http_task = tokio::spawn(async move {
crate::http::serve(http_addr, persist_for_http, async move {
let _ = http_shutdown_rx.await;
}).await;
});
// Wait for ctrl-c
tokio::signal::ctrl_c().await.into_diagnostic()?;
@ -224,11 +240,14 @@ async fn main() -> Result<()> {
// Stop gRPC server
let _ = grpc_shutdown_tx.send(());
// Stop HTTP server
let _ = http_shutdown_tx.send(());
// Drop sender to let scheduler drain and exit
drop(sched_tx);
// Wait for consumer, scheduler and grpc to finish concurrently
let (_c_res, _s_res, _g_res) = tokio::join!(consumer_task, scheduler_task, grpc_task);
let (_c_res, _s_res, _g_res, _h_res) = tokio::join!(consumer_task, scheduler_task, grpc_task, http_task);
Ok(())
}

View file

@ -5,6 +5,7 @@ use sea_orm::{
entity::prelude::*, ActiveModelTrait, ColumnTrait, Database, DatabaseConnection, QueryFilter,
Set,
};
use sea_orm::QueryOrder;
use sea_orm_migration::MigratorTrait;
use tracing::{debug, info, warn};
use uuid::Uuid;
@ -103,6 +104,27 @@ mod vms {
impl ActiveModelBehavior for ActiveModel {}
}
mod job_logs {
use super::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "job_logs")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub request_id: Uuid,
#[sea_orm(primary_key, auto_increment = false)]
pub seq: i64,
pub ts: chrono::DateTime<chrono::Utc>,
pub stderr: bool,
pub line: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
}
impl Persist {
/// Initialize persistence.
/// - If `database_url` is Some(non-empty), attempt to connect and run migrations.
@ -134,6 +156,57 @@ impl Persist {
self.db.is_some()
}
/// Return aggregated logs for a job as a single text blob. Newline-terminated.
/// When persistence is disabled or no logs are found, returns Ok(None).
pub async fn get_logs_text(&self, request_id: Uuid) -> Result<Option<String>> {
let Some(db) = self.db.as_ref() else {
return Ok(None);
};
let mut out = String::new();
let rows = job_logs::Entity::find()
.filter(job_logs::Column::RequestId.eq(request_id))
.order_by_asc(job_logs::Column::Seq)
.all(db)
.await
.into_diagnostic()?;
if rows.is_empty() {
return Ok(None);
}
for r in rows {
if r.stderr {
out.push_str("[stderr] ");
}
out.push_str(&r.line);
if !out.ends_with('\n') {
out.push('\n');
}
}
Ok(Some(out))
}
/// Record a single log line for a job. No-op when persistence is disabled.
pub async fn record_log_line(
&self,
request_id: Uuid,
seq: i64,
stderr: bool,
line: &str,
) -> Result<()> {
let Some(db) = self.db.as_ref() else {
debug!(%request_id, %seq, stderr, "record_log_line (noop)");
return Ok(());
};
let am = job_logs::ActiveModel {
request_id: Set(request_id),
seq: Set(seq),
ts: Set(Utc::now()),
stderr: Set(stderr),
line: Set(line.to_string()),
};
job_logs::Entity::insert(am).exec(db).await.into_diagnostic()?;
Ok(())
}
/// Upsert a job row by request_id.
pub async fn record_job_state(
&self,

View file

@ -16,8 +16,62 @@ services:
timeout: 5s
retries: 5
start_period: 5s
# volumes:
# - rabbitmq-data:/var/lib/rabbitmq
volumes:
- rabbitmq-data:/var/lib/rabbitmq
#volumes:
# rabbitmq-data:
postgres:
image: postgres:16-alpine
container_name: solstice-postgres
restart: unless-stopped
environment:
POSTGRES_USER: solstice
POSTGRES_PASSWORD: solstice
POSTGRES_DB: solstice
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U $$POSTGRES_USER"]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
volumes:
- postgres-data:/var/lib/postgresql/data
# S3-compatible object storage (MinIO) for Garage-like testing
minio:
image: minio/minio:latest
container_name: solstice-minio
restart: unless-stopped
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
ports:
- "9000:9000" # S3 API
- "9001:9001" # Console
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
volumes:
- minio-data:/data
# One-shot setup to create a default bucket for logs
minio-setup:
image: minio/mc:latest
container_name: solstice-minio-setup
depends_on:
minio:
condition: service_healthy
entrypoint: ["/bin/sh", "-c"]
command: >-
"mc alias set local http://minio:9000 minioadmin minioadmin &&
mc mb -p local/solstice-logs || true"
volumes:
rabbitmq-data:
postgres-data:
minio-data:

View file

@ -0,0 +1,28 @@
# Maintainer: Solstice CI Team
pkgname=solstice-forge-integration
pkgver=0.1.0
pkgrel=1
pkgdesc="Solstice CI Forge Integration (webhook receiver and enqueuer)"
arch=(x86_64)
url="https://codeberg.org/your-namespace/solstice-ci"
license=(MPL2)
depends=(glibc)
makedepends=(rust cargo)
source=("solstice-ci.tar.gz"
"solstice-forge-integration.service"
"forge-integration.env.example")
sha256sums=('SKIP'
'SKIP'
'SKIP')
build() {
cd "$srcdir/solstice-ci"
cargo build --release -p forge-integration
}
package() {
install -Dm755 "$srcdir/solstice-ci/target/release/forge-integration" "$pkgdir/usr/bin/forge-integration"
install -Dm644 "$srcdir/solstice-forge-integration.service" "$pkgdir/usr/lib/systemd/system/solstice-forge-integration.service"
install -Dm644 "$srcdir/forge-integration.env.example" "$pkgdir/usr/share/solstice/examples/forge-integration.env"
install -dm755 "$pkgdir/etc/solstice"
}

View file

@ -0,0 +1,32 @@
# Solstice Forge Integration environment
RUST_LOG=info
# HTTP server address for webhooks
HTTP_ADDR=0.0.0.0:8080
# RabbitMQ to publish job requests
AMQP_URL=amqp://127.0.0.1:5672/%2f
AMQP_EXCHANGE=solstice.jobs
AMQP_ROUTING_KEY=jobrequest.v1
# Results queue to consume job results
RESULTS_QUEUE=solstice.results.v1
# Optional: webhook path/secret
# WEBHOOK_PATH=/webhooks/forgejo
# WEBHOOK_SECRET=change-me
# Forgejo commit status posting
# FORGEJO_BASE_URL=https://codeberg.org/api/v1
# FORGEJO_TOKEN=your_token_here
FORGE_CONTEXT=solstice/ci
# Orchestrator HTTP base to fetch logs
ORCH_HTTP_BASE=http://127.0.0.1:8081
# S3-compatible (Garage/MinIO) for uploading logs
S3_ENDPOINT=http://127.0.0.1:9000
S3_BUCKET=solstice-logs
# AWS-style credentials for S3-compatible storage
AWS_ACCESS_KEY_ID=minioadmin
AWS_SECRET_ACCESS_KEY=minioadmin
AWS_REGION=us-east-1
# runs_on inference
# Use this default label when not specified via PR labels or repo map
# RUNS_ON_DEFAULT=ubuntu-22.04
# Per-repo overrides: comma-separated owner/repo=label pairs
# Example: RUNS_ON_MAP=illumos/solstice-ci=illumos-latest,otherorg/another-repo=ubuntu-22.04

View file

@ -0,0 +1,18 @@
[Unit]
Description=Solstice CI Forge Integration (webhook receiver / enqueuer)
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
EnvironmentFile=-/etc/solstice/forge-integration.env
ExecStart=/usr/bin/forge-integration --http-addr ${HTTP_ADDR:-0.0.0.0:8080}
Restart=on-failure
RestartSec=3s
NoNewPrivileges=true
ProtectSystem=full
ProtectHome=true
PrivateTmp=true
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,31 @@
# Maintainer: Solstice CI Team
pkgname=solstice-orchestrator
pkgver=0.1.0
pkgrel=1
pkgdesc="Solstice CI Orchestrator service"
arch=(x86_64)
url="https://codeberg.org/your-namespace/solstice-ci"
license=(MPL2)
depends=(glibc libvirt)
makedepends=(rust cargo)
source=("solstice-ci.tar.gz"
"solstice-orchestrator.service"
"orchestrator.env.example")
sha256sums=('SKIP'
'SKIP'
'SKIP')
build() {
cd "$srcdir/solstice-ci"
cargo build --release -p orchestrator --features libvirt
}
package() {
install -Dm755 "$srcdir/solstice-ci/target/release/orchestrator" "$pkgdir/usr/bin/orchestrator"
install -Dm644 "$srcdir/solstice-orchestrator.service" "$pkgdir/usr/lib/systemd/system/solstice-orchestrator.service"
install -Dm644 "$srcdir/orchestrator.env.example" "$pkgdir/usr/share/solstice/examples/orchestrator.env"
# ship example orchestrator image map
install -Dm644 "$srcdir/solstice-ci/examples/orchestrator-image-map.yaml" "$pkgdir/usr/share/solstice/examples/orchestrator-image-map.yaml"
# default config directory
install -dm755 "$pkgdir/etc/solstice"
}

View file

@ -0,0 +1,16 @@
# Solstice Orchestrator environment
RUST_LOG=info
# RabbitMQ
AMQP_URL=amqp://127.0.0.1:5672/%2f
AMQP_EXCHANGE=solstice.jobs
AMQP_QUEUE=solstice.jobs.v1
AMQP_ROUTING_KEY=jobrequest.v1
AMQP_PREFETCH=2
# gRPC listen address for runner connections
GRPC_ADDR=0.0.0.0:50051
# Postgres persistence
DATABASE_URL=postgres://solstice:solstice@127.0.0.1:5432/solstice
# Orchestrator image map
ORCH_CONFIG=/etc/solstice/orchestrator-image-map.yaml
# Optional: contact address injected into cloud-init (host:port runners dial back)
# ORCH_CONTACT_ADDR=127.0.0.1:50051

View file

@ -0,0 +1,19 @@
[Unit]
Description=Solstice CI Orchestrator
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
EnvironmentFile=-/etc/solstice/orchestrator.env
ExecStart=/usr/bin/orchestrator
Restart=on-failure
RestartSec=3s
# Hardening (adjust as needed for libvirt access etc.)
NoNewPrivileges=true
ProtectSystem=full
ProtectHome=true
PrivateTmp=true
[Install]
WantedBy=multi-user.target