mirror of
https://codeberg.org/Toasterson/solstice-ci.git
synced 2026-04-10 13:20:41 +00:00
Add Forgejo Runner integration service
New crate that registers as a Forgejo Actions Runner, polls for tasks via connect-rpc, translates them into Solstice JobRequests (with 3-tier fallback: KDL workflow → Actions YAML run steps → unsupported error), and reports results back to Forgejo. Includes Containerfile and compose.yml service definition.
This commit is contained in:
parent
ac81dedf82
commit
70605a3c3a
13 changed files with 1676 additions and 0 deletions
35
crates/runner-integration/Cargo.toml
Normal file
35
crates/runner-integration/Cargo.toml
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
[package]
|
||||
name = "runner-integration"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
build = "build.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "solstice-runner-integration"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
common = { path = "../common" }
|
||||
clap = { version = "4", features = ["derive", "env"] }
|
||||
miette = { version = "7", features = ["fancy"] }
|
||||
tracing = "0.1"
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "fs", "io-util", "time"] }
|
||||
# Connect-RPC transport (HTTP + protobuf)
|
||||
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls-native-roots"] }
|
||||
base64 = "0.22"
|
||||
prost = "0.14"
|
||||
prost-types = "0.14"
|
||||
# AMQP consumer for results
|
||||
lapin = { version = "2" }
|
||||
futures-util = "0.3"
|
||||
# Serialization
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
serde_yaml = "0.9"
|
||||
# Utilities
|
||||
uuid = { version = "1", features = ["v4", "serde"] }
|
||||
dashmap = "6"
|
||||
time = { version = "0.3", features = ["serde", "macros"] }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-prost-build = "0.14"
|
||||
29
crates/runner-integration/build.rs
Normal file
29
crates/runner-integration/build.rs
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
fn main() {
|
||||
println!("cargo:rerun-if-changed=proto/runner/v1/messages.proto");
|
||||
println!("cargo:rerun-if-changed=proto/runner/v1/services.proto");
|
||||
|
||||
// Include system protobuf path for google/protobuf well-known types
|
||||
// (e.g. /usr/include from protobuf-compiler in container builds).
|
||||
let mut include_dirs = vec!["proto".to_string()];
|
||||
for candidate in ["/usr/include", "/usr/local/include"] {
|
||||
let p = std::path::Path::new(candidate).join("google/protobuf/timestamp.proto");
|
||||
if p.exists() {
|
||||
include_dirs.push(candidate.to_string());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let include_refs: Vec<&str> = include_dirs.iter().map(|s| s.as_str()).collect();
|
||||
|
||||
tonic_prost_build::configure()
|
||||
.build_server(false)
|
||||
.build_client(false) // We implement connect-rpc transport manually
|
||||
.compile_protos(
|
||||
&[
|
||||
"proto/runner/v1/messages.proto",
|
||||
"proto/runner/v1/services.proto",
|
||||
],
|
||||
&include_refs,
|
||||
)
|
||||
.expect("failed to compile Forgejo actions proto");
|
||||
}
|
||||
132
crates/runner-integration/proto/runner/v1/messages.proto
Normal file
132
crates/runner-integration/proto/runner/v1/messages.proto
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package runner.v1;
|
||||
|
||||
import "google/protobuf/struct.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
message RegisterRequest {
|
||||
string name = 1;
|
||||
string token = 2;
|
||||
repeated string agent_labels = 3 [deprecated = true];
|
||||
repeated string custom_labels = 4 [deprecated = true];
|
||||
string version = 5;
|
||||
repeated string labels = 6;
|
||||
bool ephemeral = 7;
|
||||
}
|
||||
|
||||
message RegisterResponse {
|
||||
Runner runner = 1;
|
||||
}
|
||||
|
||||
message DeclareRequest {
|
||||
string version = 1;
|
||||
repeated string labels = 2;
|
||||
}
|
||||
|
||||
message DeclareResponse {
|
||||
Runner runner = 1;
|
||||
}
|
||||
|
||||
message FetchTaskRequest {
|
||||
int64 tasks_version = 1; // Runner use `tasks_version` to compare with Gitea and detemine whether new tasks may exist.
|
||||
}
|
||||
|
||||
message FetchTaskResponse {
|
||||
Task task = 1;
|
||||
int64 tasks_version = 2; // Gitea informs the Runner of the latest version of tasks through `tasks_version`.
|
||||
}
|
||||
|
||||
message UpdateTaskRequest {
|
||||
TaskState state = 1;
|
||||
map<string, string> outputs = 2; // The outputs of the task. Since the outputs may be large, the client does not need to send all outputs every time, only the unsent outputs.
|
||||
}
|
||||
|
||||
message UpdateTaskResponse {
|
||||
TaskState state = 1;
|
||||
repeated string sent_outputs = 2; // The keys of the outputs that have been sent, not only the ones that have been sent this time, but also those that have been sent before.
|
||||
}
|
||||
|
||||
message UpdateLogRequest {
|
||||
int64 task_id = 1;
|
||||
int64 index = 2; // The actual index of the first line.
|
||||
repeated LogRow rows = 3;
|
||||
bool no_more = 4; // No more logs.
|
||||
}
|
||||
|
||||
message UpdateLogResponse {
|
||||
int64 ack_index = 1; // If all lines are received, should be index + length(lines).
|
||||
}
|
||||
|
||||
// Runner Payload
|
||||
message Runner {
|
||||
int64 id = 1;
|
||||
string uuid = 2;
|
||||
string token = 3;
|
||||
string name = 4;
|
||||
RunnerStatus status = 5;
|
||||
repeated string agent_labels = 6 [deprecated = true];
|
||||
repeated string custom_labels = 7 [deprecated = true];
|
||||
string version = 8;
|
||||
repeated string labels = 9;
|
||||
bool ephemeral = 10;
|
||||
}
|
||||
|
||||
// RunnerStatus runner all status
|
||||
enum RunnerStatus {
|
||||
RUNNER_STATUS_UNSPECIFIED = 0;
|
||||
RUNNER_STATUS_IDLE = 1;
|
||||
RUNNER_STATUS_ACTIVE = 2;
|
||||
RUNNER_STATUS_OFFLINE = 3;
|
||||
}
|
||||
|
||||
// The result of a task or a step, see https://docs.github.com/en/actions/learn-github-actions/contexts#jobs-context .
|
||||
enum Result {
|
||||
RESULT_UNSPECIFIED = 0;
|
||||
RESULT_SUCCESS = 1;
|
||||
RESULT_FAILURE = 2;
|
||||
RESULT_CANCELLED = 3;
|
||||
RESULT_SKIPPED = 4;
|
||||
}
|
||||
|
||||
// Task represents a task.
|
||||
message Task {
|
||||
int64 id = 1; // A unique number for each workflow run, unlike run_id or job_id, task_id never be reused.
|
||||
optional bytes workflow_payload = 2; // The content of the expanded workflow yaml file.
|
||||
optional google.protobuf.Struct context = 3; // See https://docs.github.com/en/actions/learn-github-actions/contexts#github-context .
|
||||
map<string, string> secrets = 4; // See https://docs.github.com/en/actions/learn-github-actions/contexts#secrets-context .
|
||||
string machine = 5 [deprecated = true]; // Unused.
|
||||
map<string, TaskNeed> needs = 6; // See https://docs.github.com/en/actions/learn-github-actions/contexts#needs-context .
|
||||
map<string, string> vars = 7; // See https://docs.github.com/en/actions/learn-github-actions/contexts#vars-context .
|
||||
}
|
||||
|
||||
// TaskNeed represents a task need.
|
||||
message TaskNeed {
|
||||
map<string, string> outputs = 1; // The set of outputs of a job that the current job depends on.
|
||||
Result result = 2; // The result of a job that the current job depends on. Possible values are success, failure, cancelled, or skipped.
|
||||
}
|
||||
|
||||
// TaskState represents the state of a task.
|
||||
message TaskState {
|
||||
int64 id = 1;
|
||||
Result result = 2;
|
||||
google.protobuf.Timestamp started_at = 3;
|
||||
google.protobuf.Timestamp stopped_at = 4;
|
||||
repeated StepState steps = 5;
|
||||
}
|
||||
|
||||
// TaskState represents the state of a step.
|
||||
message StepState {
|
||||
int64 id = 1;
|
||||
Result result = 2;
|
||||
google.protobuf.Timestamp started_at = 3;
|
||||
google.protobuf.Timestamp stopped_at = 4;
|
||||
int64 log_index = 5; // Where the first line log of the step.
|
||||
int64 log_length = 6; // How many logs the step has.
|
||||
}
|
||||
|
||||
// LogRow represents a row of logs.
|
||||
message LogRow {
|
||||
google.protobuf.Timestamp time = 1;
|
||||
string content = 2;
|
||||
}
|
||||
18
crates/runner-integration/proto/runner/v1/services.proto
Normal file
18
crates/runner-integration/proto/runner/v1/services.proto
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package runner.v1;
|
||||
|
||||
import "runner/v1/messages.proto";
|
||||
|
||||
service RunnerService {
|
||||
// Register register a new runner in server.
|
||||
rpc Register(RegisterRequest) returns (RegisterResponse) {}
|
||||
// Declare declare runner's version and labels to Gitea before starting fetching task.
|
||||
rpc Declare(DeclareRequest) returns (DeclareResponse) {}
|
||||
// FetchTask requests the next available task for execution.
|
||||
rpc FetchTask(FetchTaskRequest) returns (FetchTaskResponse) {}
|
||||
// UpdateTask updates the task status.
|
||||
rpc UpdateTask(UpdateTaskRequest) returns (UpdateTaskResponse) {}
|
||||
// UpdateLog uploads log of the task.
|
||||
rpc UpdateLog(UpdateLogRequest) returns (UpdateLogResponse) {}
|
||||
}
|
||||
119
crates/runner-integration/src/connect.rs
Normal file
119
crates/runner-integration/src/connect.rs
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
use miette::{IntoDiagnostic, Result, miette};
|
||||
use prost::Message;
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
use crate::proto::runner::v1::{
|
||||
DeclareRequest, DeclareResponse, FetchTaskRequest, FetchTaskResponse, RegisterRequest,
|
||||
RegisterResponse, UpdateLogRequest, UpdateLogResponse, UpdateTaskRequest, UpdateTaskResponse,
|
||||
};
|
||||
|
||||
/// Connect-RPC client for the Forgejo Actions Runner API.
|
||||
///
|
||||
/// The Forgejo runner API uses the Connect protocol (HTTP/1.1 POST with raw
|
||||
/// protobuf bodies), not standard gRPC framing. Each RPC maps to:
|
||||
/// POST {base_url}/runner.v1.RunnerService/{Method}
|
||||
/// Content-Type: application/proto
|
||||
/// Authorization: Bearer {token}
|
||||
pub struct ConnectClient {
|
||||
http: reqwest::Client,
|
||||
/// Base URL for the connect-rpc endpoint, e.g.
|
||||
/// `https://forgejo.example.com/api/actions`
|
||||
base_url: String,
|
||||
}
|
||||
|
||||
impl ConnectClient {
|
||||
pub fn new(base_url: impl Into<String>) -> Self {
|
||||
let http = reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(90)) // long-poll needs generous timeout
|
||||
.build()
|
||||
.expect("failed to build HTTP client");
|
||||
Self {
|
||||
http,
|
||||
base_url: base_url.into().trim_end_matches('/').to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a unary connect-rpc call.
|
||||
#[instrument(skip(self, req_msg, token), fields(method = %method))]
|
||||
async fn call<Req: Message, Resp: Message + Default>(
|
||||
&self,
|
||||
method: &str,
|
||||
req_msg: &Req,
|
||||
token: &str,
|
||||
) -> Result<Resp> {
|
||||
let url = format!("{}/runner.v1.RunnerService/{}", self.base_url, method);
|
||||
debug!(url = %url, "connect-rpc call");
|
||||
|
||||
let body = req_msg.encode_to_vec();
|
||||
|
||||
let resp = self
|
||||
.http
|
||||
.post(&url)
|
||||
.header("Content-Type", "application/proto")
|
||||
.header("Authorization", format!("Bearer {}", token))
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
|
||||
let status = resp.status();
|
||||
if !status.is_success() {
|
||||
let error_body = resp.text().await.unwrap_or_default();
|
||||
return Err(miette!(
|
||||
"connect-rpc {} failed: HTTP {} — {}",
|
||||
method,
|
||||
status,
|
||||
error_body
|
||||
));
|
||||
}
|
||||
|
||||
let resp_bytes = resp.bytes().await.into_diagnostic()?;
|
||||
Resp::decode(resp_bytes.as_ref()).into_diagnostic()
|
||||
}
|
||||
|
||||
/// Register this runner with the Forgejo instance.
|
||||
/// Uses the one-time registration token (not the runner token).
|
||||
pub async fn register(
|
||||
&self,
|
||||
req: &RegisterRequest,
|
||||
registration_token: &str,
|
||||
) -> Result<RegisterResponse> {
|
||||
self.call("Register", req, registration_token).await
|
||||
}
|
||||
|
||||
/// Declare runner version and labels after registration.
|
||||
pub async fn declare(
|
||||
&self,
|
||||
req: &DeclareRequest,
|
||||
runner_token: &str,
|
||||
) -> Result<DeclareResponse> {
|
||||
self.call("Declare", req, runner_token).await
|
||||
}
|
||||
|
||||
/// Long-poll for the next available task.
|
||||
pub async fn fetch_task(
|
||||
&self,
|
||||
req: &FetchTaskRequest,
|
||||
runner_token: &str,
|
||||
) -> Result<FetchTaskResponse> {
|
||||
self.call("FetchTask", req, runner_token).await
|
||||
}
|
||||
|
||||
/// Update a task's state (running, success, failure, etc.).
|
||||
pub async fn update_task(
|
||||
&self,
|
||||
req: &UpdateTaskRequest,
|
||||
runner_token: &str,
|
||||
) -> Result<UpdateTaskResponse> {
|
||||
self.call("UpdateTask", req, runner_token).await
|
||||
}
|
||||
|
||||
/// Upload log lines for a task.
|
||||
pub async fn update_log(
|
||||
&self,
|
||||
req: &UpdateLogRequest,
|
||||
runner_token: &str,
|
||||
) -> Result<UpdateLogResponse> {
|
||||
self.call("UpdateLog", req, runner_token).await
|
||||
}
|
||||
}
|
||||
216
crates/runner-integration/src/main.rs
Normal file
216
crates/runner-integration/src/main.rs
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
mod connect;
|
||||
mod poller;
|
||||
mod registration;
|
||||
mod reporter;
|
||||
mod state;
|
||||
mod translator;
|
||||
|
||||
pub mod proto {
|
||||
pub mod runner {
|
||||
pub mod v1 {
|
||||
include!(concat!(env!("OUT_DIR"), "/runner.v1.rs"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::Parser;
|
||||
use miette::{IntoDiagnostic, Result};
|
||||
use tokio::sync::watch;
|
||||
use tracing::info;
|
||||
|
||||
use connect::ConnectClient;
|
||||
use state::RunnerState;
|
||||
use translator::TranslateCtx;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(
|
||||
name = "solstice-runner-integration",
|
||||
version,
|
||||
about = "Solstice CI — Forgejo Runner Integration"
|
||||
)]
|
||||
struct Opts {
|
||||
/// Forgejo instance URL (e.g., https://forgejo.example.com)
|
||||
#[arg(long, env = "FORGEJO_URL")]
|
||||
forgejo_url: String,
|
||||
|
||||
/// Runner registration token (from Forgejo admin UI, needed only for first registration)
|
||||
#[arg(long, env = "RUNNER_REGISTRATION_TOKEN")]
|
||||
registration_token: Option<String>,
|
||||
|
||||
/// Runner name shown in the Forgejo UI
|
||||
#[arg(long, env = "RUNNER_NAME", default_value = "solstice-runner")]
|
||||
runner_name: String,
|
||||
|
||||
/// Comma-separated runner labels (e.g., "ubuntu-latest,self-hosted,linux-kvm")
|
||||
#[arg(long, env = "RUNNER_LABELS", default_value = "self-hosted")]
|
||||
runner_labels: String,
|
||||
|
||||
/// Path to persist runner registration state (UUID + token)
|
||||
#[arg(
|
||||
long,
|
||||
env = "RUNNER_STATE_PATH",
|
||||
default_value = "/var/lib/solstice/runner-state.json"
|
||||
)]
|
||||
state_path: String,
|
||||
|
||||
/// Maximum number of concurrent jobs
|
||||
#[arg(long, env = "MAX_CONCURRENCY", default_value_t = 4)]
|
||||
max_concurrency: usize,
|
||||
|
||||
/// Forgejo API base URL for fetching repo contents (e.g., https://forgejo.example.com/api/v1)
|
||||
#[arg(long, env = "FORGEJO_BASE_URL")]
|
||||
forgejo_base: Option<String>,
|
||||
|
||||
/// Forgejo API token (PAT) for fetching workflow files from repos
|
||||
#[arg(long, env = "FORGEJO_TOKEN")]
|
||||
forgejo_token: Option<String>,
|
||||
|
||||
// --- Standard AMQP options ---
|
||||
#[arg(long, env = "AMQP_URL")]
|
||||
amqp_url: Option<String>,
|
||||
|
||||
#[arg(long, env = "AMQP_EXCHANGE")]
|
||||
amqp_exchange: Option<String>,
|
||||
|
||||
#[arg(long, env = "AMQP_QUEUE")]
|
||||
amqp_queue: Option<String>,
|
||||
|
||||
#[arg(long, env = "AMQP_ROUTING_KEY")]
|
||||
amqp_routing_key: Option<String>,
|
||||
|
||||
/// Results queue name (defaults to solstice.runner-results.v1 to avoid
|
||||
/// competing with forge-integration)
|
||||
#[arg(
|
||||
long,
|
||||
env = "AMQP_RESULTS_QUEUE",
|
||||
default_value = "solstice.runner-results.v1"
|
||||
)]
|
||||
amqp_results_queue: String,
|
||||
|
||||
/// OTLP endpoint (e.g., http://localhost:4317)
|
||||
#[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")]
|
||||
otlp: Option<String>,
|
||||
|
||||
/// Logs service base URL for fetching logs to report to Forgejo
|
||||
#[arg(long, env = "LOGS_BASE_URL")]
|
||||
logs_base_url: Option<String>,
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<()> {
|
||||
let app_cfg = common::AppConfig::load("runner-integration")?;
|
||||
let _t = common::init_tracing("solstice-runner-integration")?;
|
||||
let opts = Opts::parse();
|
||||
|
||||
info!(
|
||||
forgejo_url = %opts.forgejo_url,
|
||||
runner_name = %opts.runner_name,
|
||||
labels = %opts.runner_labels,
|
||||
max_concurrency = opts.max_concurrency,
|
||||
"runner integration starting"
|
||||
);
|
||||
|
||||
// Build MQ config
|
||||
let mut mq_cfg = app_cfg.mq.clone();
|
||||
if let Some(u) = opts.amqp_url {
|
||||
mq_cfg.url = u;
|
||||
}
|
||||
if let Some(x) = opts.amqp_exchange {
|
||||
mq_cfg.exchange = x;
|
||||
}
|
||||
if let Some(q) = opts.amqp_queue {
|
||||
mq_cfg.queue = q;
|
||||
}
|
||||
if let Some(rk) = opts.amqp_routing_key {
|
||||
mq_cfg.routing_key = rk;
|
||||
}
|
||||
mq_cfg.results_queue = opts.amqp_results_queue;
|
||||
|
||||
// Build connect-rpc client
|
||||
let connect_url = format!("{}/api/actions", opts.forgejo_url.trim_end_matches('/'));
|
||||
let client = Arc::new(ConnectClient::new(connect_url));
|
||||
|
||||
// Parse labels
|
||||
let labels: Vec<String> = opts
|
||||
.runner_labels
|
||||
.split(',')
|
||||
.map(|s| s.trim().to_string())
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect();
|
||||
|
||||
// Register or load existing credentials
|
||||
let identity = registration::ensure_registered(
|
||||
&client,
|
||||
&opts.state_path,
|
||||
opts.registration_token.as_deref(),
|
||||
&opts.runner_name,
|
||||
&labels,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Build shared state
|
||||
let state = Arc::new(RunnerState::new(identity, opts.max_concurrency));
|
||||
|
||||
// Translation context
|
||||
let translate_ctx = Arc::new(TranslateCtx {
|
||||
forgejo_base: opts.forgejo_base,
|
||||
forgejo_token: opts.forgejo_token,
|
||||
});
|
||||
|
||||
// Shutdown signal
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
||||
|
||||
// Spawn poller task
|
||||
let poller_client = client.clone();
|
||||
let poller_state = state.clone();
|
||||
let poller_mq = mq_cfg.clone();
|
||||
let poller_ctx = translate_ctx.clone();
|
||||
let poller_shutdown = shutdown_rx.clone();
|
||||
let poller_task = tokio::spawn(async move {
|
||||
if let Err(e) = poller::run(
|
||||
poller_client,
|
||||
poller_state,
|
||||
poller_mq,
|
||||
poller_ctx,
|
||||
poller_shutdown,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(error = %e, "poller exited with error");
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn reporter task
|
||||
let reporter_client = client.clone();
|
||||
let reporter_state = state.clone();
|
||||
let reporter_mq = mq_cfg.clone();
|
||||
let reporter_shutdown = shutdown_rx.clone();
|
||||
let reporter_task = tokio::spawn(async move {
|
||||
if let Err(e) = reporter::run(
|
||||
reporter_client,
|
||||
reporter_state,
|
||||
reporter_mq,
|
||||
reporter_shutdown,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(error = %e, "reporter exited with error");
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for shutdown signal
|
||||
tokio::signal::ctrl_c().await.into_diagnostic()?;
|
||||
info!("shutdown signal received");
|
||||
let _ = shutdown_tx.send(true);
|
||||
|
||||
// Wait for tasks to finish with a timeout
|
||||
let _ = tokio::time::timeout(std::time::Duration::from_secs(30), async {
|
||||
let _ = tokio::join!(poller_task, reporter_task);
|
||||
})
|
||||
.await;
|
||||
|
||||
info!("runner integration stopped");
|
||||
Ok(())
|
||||
}
|
||||
234
crates/runner-integration/src/poller.rs
Normal file
234
crates/runner-integration/src/poller.rs
Normal file
|
|
@ -0,0 +1,234 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use miette::Result;
|
||||
use tokio::sync::watch;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::connect::ConnectClient;
|
||||
use crate::proto::runner::v1::{self, FetchTaskRequest, TaskState};
|
||||
use crate::state::{RunnerState, TaskMeta};
|
||||
use crate::translator::{TranslateCtx, TranslateResult, translate_task};
|
||||
|
||||
const MAX_BACKOFF: Duration = Duration::from_secs(30);
|
||||
const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Run the task polling loop until shutdown is signalled.
|
||||
pub async fn run(
|
||||
client: Arc<ConnectClient>,
|
||||
state: Arc<RunnerState>,
|
||||
mq_cfg: common::MqConfig,
|
||||
translate_ctx: Arc<TranslateCtx>,
|
||||
mut shutdown: watch::Receiver<bool>,
|
||||
) -> Result<()> {
|
||||
let mut tasks_version: i64 = 0;
|
||||
let mut backoff = INITIAL_BACKOFF;
|
||||
|
||||
info!("poller started");
|
||||
|
||||
loop {
|
||||
// Check shutdown
|
||||
if *shutdown.borrow() {
|
||||
info!("poller shutting down");
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait for a concurrency permit
|
||||
let permit = {
|
||||
let sem = state.semaphore.clone();
|
||||
tokio::select! {
|
||||
permit = sem.acquire_owned() => match permit {
|
||||
Ok(p) => p,
|
||||
Err(_) => break, // semaphore closed
|
||||
},
|
||||
_ = shutdown.changed() => {
|
||||
info!("poller shutting down (waiting for permit)");
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Long-poll for a task
|
||||
let req = FetchTaskRequest { tasks_version };
|
||||
let resp = tokio::select! {
|
||||
r = client.fetch_task(&req, &state.identity.token) => r,
|
||||
_ = shutdown.changed() => {
|
||||
info!("poller shutting down (fetching task)");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match resp {
|
||||
Ok(resp) => {
|
||||
tasks_version = resp.tasks_version;
|
||||
backoff = INITIAL_BACKOFF; // reset on success
|
||||
|
||||
let task = match resp.task {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
// No task available — release permit and re-poll
|
||||
drop(permit);
|
||||
debug!("no task available, re-polling");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let task_id = task.id;
|
||||
info!(task_id, "received task from Forgejo");
|
||||
|
||||
// Report task as running
|
||||
if let Err(e) = report_running(&client, &state, task_id).await {
|
||||
warn!(error = %e, task_id, "failed to report task as running");
|
||||
}
|
||||
|
||||
// Translate and publish
|
||||
match translate_task(&task, &translate_ctx).await {
|
||||
Ok(TranslateResult::Jobs(jobs)) => {
|
||||
let mut published_any = false;
|
||||
for jr in &jobs {
|
||||
state.in_flight.insert(
|
||||
jr.request_id,
|
||||
TaskMeta {
|
||||
forgejo_task_id: task_id,
|
||||
repo_url: jr.repo_url.clone(),
|
||||
commit_sha: jr.commit_sha.clone(),
|
||||
started_at: Instant::now(),
|
||||
},
|
||||
);
|
||||
|
||||
match common::publish_job(&mq_cfg, jr).await {
|
||||
Ok(()) => {
|
||||
info!(
|
||||
request_id = %jr.request_id,
|
||||
task_id,
|
||||
repo = %jr.repo_url,
|
||||
sha = %jr.commit_sha,
|
||||
runs_on = ?jr.runs_on,
|
||||
"published JobRequest"
|
||||
);
|
||||
published_any = true;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = %e, request_id = %jr.request_id, "failed to publish JobRequest");
|
||||
state.in_flight.remove(&jr.request_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if published_any {
|
||||
// Don't drop permit — it will be released by the reporter
|
||||
// when the JobResult comes back. We leak the permit into the
|
||||
// in-flight tracking. The reporter task will release it.
|
||||
std::mem::forget(permit);
|
||||
} else {
|
||||
// All publishes failed — report failure to Forgejo
|
||||
if let Err(e) = report_failure(
|
||||
&client,
|
||||
&state,
|
||||
task_id,
|
||||
"solstice-ci: failed to enqueue job(s) to message broker",
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(error = %e, task_id, "failed to report failure");
|
||||
}
|
||||
drop(permit);
|
||||
}
|
||||
}
|
||||
Ok(TranslateResult::Unsupported(msg)) => {
|
||||
warn!(task_id, msg = %msg, "unsupported workflow");
|
||||
if let Err(e) = report_failure(&client, &state, task_id, &msg).await {
|
||||
error!(error = %e, task_id, "failed to report unsupported");
|
||||
}
|
||||
drop(permit);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = %e, task_id, "translation error");
|
||||
if let Err(e2) = report_failure(
|
||||
&client,
|
||||
&state,
|
||||
task_id,
|
||||
&format!("solstice-ci: translation error: {}", e),
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(error = %e2, task_id, "failed to report translation error");
|
||||
}
|
||||
drop(permit);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
drop(permit);
|
||||
warn!(error = %e, backoff_secs = backoff.as_secs(), "FetchTask failed; backing off");
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(backoff) => {}
|
||||
_ = shutdown.changed() => break,
|
||||
}
|
||||
backoff = (backoff * 2).min(MAX_BACKOFF);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("poller stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn report_running(client: &ConnectClient, state: &RunnerState, task_id: i64) -> Result<()> {
|
||||
let now = prost_types::Timestamp {
|
||||
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
|
||||
nanos: 0,
|
||||
};
|
||||
let req = crate::proto::runner::v1::UpdateTaskRequest {
|
||||
state: Some(TaskState {
|
||||
id: task_id,
|
||||
result: v1::Result::Unspecified as i32,
|
||||
started_at: Some(now),
|
||||
stopped_at: None,
|
||||
steps: vec![],
|
||||
}),
|
||||
outputs: Default::default(),
|
||||
};
|
||||
client.update_task(&req, &state.identity.token).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn report_failure(
|
||||
client: &ConnectClient,
|
||||
state: &RunnerState,
|
||||
task_id: i64,
|
||||
_message: &str,
|
||||
) -> Result<()> {
|
||||
let now = prost_types::Timestamp {
|
||||
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
|
||||
nanos: 0,
|
||||
};
|
||||
let req = crate::proto::runner::v1::UpdateTaskRequest {
|
||||
state: Some(TaskState {
|
||||
id: task_id,
|
||||
result: v1::Result::Failure as i32,
|
||||
started_at: Some(now.clone()),
|
||||
stopped_at: Some(now),
|
||||
steps: vec![],
|
||||
}),
|
||||
outputs: Default::default(),
|
||||
};
|
||||
client.update_task(&req, &state.identity.token).await?;
|
||||
|
||||
// Also send the error message as a log line
|
||||
let log_req = crate::proto::runner::v1::UpdateLogRequest {
|
||||
task_id,
|
||||
index: 0,
|
||||
rows: vec![crate::proto::runner::v1::LogRow {
|
||||
time: Some(prost_types::Timestamp {
|
||||
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
|
||||
nanos: 0,
|
||||
}),
|
||||
content: _message.to_string(),
|
||||
}],
|
||||
no_more: true,
|
||||
};
|
||||
client.update_log(&log_req, &state.identity.token).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
112
crates/runner-integration/src/registration.rs
Normal file
112
crates/runner-integration/src/registration.rs
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
use std::path::Path;
|
||||
|
||||
use miette::{IntoDiagnostic, Result, miette};
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::connect::ConnectClient;
|
||||
use crate::proto::runner::v1::{DeclareRequest, RegisterRequest};
|
||||
use crate::state::RunnerIdentity;
|
||||
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
|
||||
/// Load existing runner credentials from disk, or register a new runner.
|
||||
pub async fn ensure_registered(
|
||||
client: &ConnectClient,
|
||||
state_path: &str,
|
||||
registration_token: Option<&str>,
|
||||
runner_name: &str,
|
||||
labels: &[String],
|
||||
) -> Result<RunnerIdentity> {
|
||||
// Try loading existing state
|
||||
if let Some(identity) = load_state(state_path) {
|
||||
info!(
|
||||
uuid = %identity.uuid,
|
||||
name = %identity.name,
|
||||
"loaded existing runner registration"
|
||||
);
|
||||
// Re-declare labels on every startup so Forgejo stays in sync
|
||||
declare(client, &identity.token, labels).await?;
|
||||
return Ok(identity);
|
||||
}
|
||||
|
||||
// No saved state — must register
|
||||
let token = registration_token.ok_or_else(|| {
|
||||
miette!(
|
||||
"no saved runner state at {state_path} and RUNNER_REGISTRATION_TOKEN is not set; \
|
||||
cannot register with Forgejo"
|
||||
)
|
||||
})?;
|
||||
|
||||
info!(name = runner_name, "registering new runner with Forgejo");
|
||||
|
||||
let req = RegisterRequest {
|
||||
name: runner_name.to_string(),
|
||||
token: token.to_string(),
|
||||
version: VERSION.to_string(),
|
||||
labels: labels.to_vec(),
|
||||
ephemeral: false,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let resp = client.register(&req, token).await?;
|
||||
let runner = resp
|
||||
.runner
|
||||
.ok_or_else(|| miette!("Forgejo returned empty runner in RegisterResponse"))?;
|
||||
|
||||
let identity = RunnerIdentity {
|
||||
id: runner.id,
|
||||
uuid: runner.uuid,
|
||||
token: runner.token,
|
||||
name: runner.name,
|
||||
registered_at: time::OffsetDateTime::now_utc().to_string(),
|
||||
};
|
||||
|
||||
save_state(state_path, &identity)?;
|
||||
info!(uuid = %identity.uuid, id = identity.id, "runner registered successfully");
|
||||
|
||||
// Declare labels after fresh registration
|
||||
declare(client, &identity.token, labels).await?;
|
||||
|
||||
Ok(identity)
|
||||
}
|
||||
|
||||
async fn declare(client: &ConnectClient, runner_token: &str, labels: &[String]) -> Result<()> {
|
||||
let req = DeclareRequest {
|
||||
version: VERSION.to_string(),
|
||||
labels: labels.to_vec(),
|
||||
};
|
||||
client.declare(&req, runner_token).await?;
|
||||
info!(labels = ?labels, "declared runner labels");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_state(path: &str) -> Option<RunnerIdentity> {
|
||||
let p = Path::new(path);
|
||||
if !p.exists() {
|
||||
return None;
|
||||
}
|
||||
match std::fs::read_to_string(p) {
|
||||
Ok(data) => match serde_json::from_str::<RunnerIdentity>(&data) {
|
||||
Ok(id) => Some(id),
|
||||
Err(e) => {
|
||||
warn!(error = %e, path = %path, "failed to parse runner state; will re-register");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(error = %e, path = %path, "failed to read runner state; will re-register");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn save_state(path: &str, identity: &RunnerIdentity) -> Result<()> {
|
||||
// Ensure parent directory exists
|
||||
if let Some(parent) = Path::new(path).parent() {
|
||||
std::fs::create_dir_all(parent).into_diagnostic()?;
|
||||
}
|
||||
let json = serde_json::to_string_pretty(identity).into_diagnostic()?;
|
||||
std::fs::write(path, json).into_diagnostic()?;
|
||||
info!(path = %path, "saved runner state");
|
||||
Ok(())
|
||||
}
|
||||
190
crates/runner-integration/src/reporter.rs
Normal file
190
crates/runner-integration/src/reporter.rs
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use miette::{IntoDiagnostic, Result};
|
||||
use tokio::sync::watch;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::connect::ConnectClient;
|
||||
use crate::proto::runner::v1::{self, TaskState, UpdateTaskRequest};
|
||||
use crate::state::RunnerState;
|
||||
|
||||
/// Consume JobResults from RabbitMQ and report them back to Forgejo.
|
||||
pub async fn run(
|
||||
client: Arc<ConnectClient>,
|
||||
state: Arc<RunnerState>,
|
||||
mq_cfg: common::MqConfig,
|
||||
mut shutdown: watch::Receiver<bool>,
|
||||
) -> Result<()> {
|
||||
let conn = lapin::Connection::connect(&mq_cfg.url, lapin::ConnectionProperties::default())
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
let channel = conn.create_channel().await.into_diagnostic()?;
|
||||
|
||||
// Ensure exchange exists
|
||||
channel
|
||||
.exchange_declare(
|
||||
&mq_cfg.exchange,
|
||||
lapin::ExchangeKind::Direct,
|
||||
lapin::options::ExchangeDeclareOptions {
|
||||
durable: true,
|
||||
auto_delete: false,
|
||||
internal: false,
|
||||
nowait: false,
|
||||
passive: false,
|
||||
},
|
||||
lapin::types::FieldTable::default(),
|
||||
)
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
|
||||
// Declare our own results queue (separate from forge-integration)
|
||||
let results_queue = &mq_cfg.results_queue;
|
||||
channel
|
||||
.queue_declare(
|
||||
results_queue,
|
||||
lapin::options::QueueDeclareOptions {
|
||||
durable: true,
|
||||
auto_delete: false,
|
||||
exclusive: false,
|
||||
nowait: false,
|
||||
passive: false,
|
||||
},
|
||||
lapin::types::FieldTable::default(),
|
||||
)
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
|
||||
channel
|
||||
.queue_bind(
|
||||
results_queue,
|
||||
&mq_cfg.exchange,
|
||||
&mq_cfg.results_routing_key,
|
||||
lapin::options::QueueBindOptions { nowait: false },
|
||||
lapin::types::FieldTable::default(),
|
||||
)
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
|
||||
channel
|
||||
.basic_qos(16, lapin::options::BasicQosOptions { global: false })
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
|
||||
let mut consumer = channel
|
||||
.basic_consume(
|
||||
results_queue,
|
||||
"runner-integration",
|
||||
lapin::options::BasicConsumeOptions {
|
||||
no_ack: false,
|
||||
..Default::default()
|
||||
},
|
||||
lapin::types::FieldTable::default(),
|
||||
)
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
|
||||
info!(queue = %results_queue, "result reporter consumer started");
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown.changed() => {
|
||||
if *shutdown.borrow() {
|
||||
info!("reporter shutting down");
|
||||
break;
|
||||
}
|
||||
}
|
||||
maybe_delivery = consumer.next() => {
|
||||
match maybe_delivery {
|
||||
Some(Ok(d)) => {
|
||||
let tag = d.delivery_tag;
|
||||
match serde_json::from_slice::<common::messages::JobResult>(&d.data) {
|
||||
Ok(jobres) => {
|
||||
// Look up the in-flight task
|
||||
if let Some((_, task_meta)) =
|
||||
state.in_flight.remove(&jobres.request_id)
|
||||
{
|
||||
if let Err(e) =
|
||||
report_to_forgejo(&client, &state, &jobres, &task_meta).await
|
||||
{
|
||||
warn!(
|
||||
error = %e,
|
||||
request_id = %jobres.request_id,
|
||||
task_id = task_meta.forgejo_task_id,
|
||||
"failed to report result to Forgejo"
|
||||
);
|
||||
}
|
||||
// Release the semaphore permit
|
||||
state.semaphore.add_permits(1);
|
||||
}
|
||||
// If not in our in-flight map, it's for another consumer — ack anyway
|
||||
channel
|
||||
.basic_ack(tag, lapin::options::BasicAckOptions { multiple: false })
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error = %e, "failed to parse JobResult; acking");
|
||||
channel
|
||||
.basic_ack(tag, lapin::options::BasicAckOptions { multiple: false })
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
warn!(error = %e, "consumer error; sleeping");
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
}
|
||||
None => {
|
||||
warn!("consumer stream ended");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn report_to_forgejo(
|
||||
client: &ConnectClient,
|
||||
state: &RunnerState,
|
||||
jobres: &common::messages::JobResult,
|
||||
task_meta: &crate::state::TaskMeta,
|
||||
) -> Result<()> {
|
||||
let now = prost_types::Timestamp {
|
||||
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
|
||||
nanos: 0,
|
||||
};
|
||||
|
||||
let result = if jobres.success {
|
||||
v1::Result::Success
|
||||
} else {
|
||||
v1::Result::Failure
|
||||
};
|
||||
|
||||
let req = UpdateTaskRequest {
|
||||
state: Some(TaskState {
|
||||
id: task_meta.forgejo_task_id,
|
||||
result: result as i32,
|
||||
started_at: None, // already reported when task started
|
||||
stopped_at: Some(now),
|
||||
steps: vec![],
|
||||
}),
|
||||
outputs: Default::default(),
|
||||
};
|
||||
|
||||
client.update_task(&req, &state.identity.token).await?;
|
||||
|
||||
info!(
|
||||
request_id = %jobres.request_id,
|
||||
task_id = task_meta.forgejo_task_id,
|
||||
success = jobres.success,
|
||||
exit_code = jobres.exit_code,
|
||||
"reported result to Forgejo"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
45
crates/runner-integration/src/state.rs
Normal file
45
crates/runner-integration/src/state.rs
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::Semaphore;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Runner identity obtained from Forgejo registration.
|
||||
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct RunnerIdentity {
|
||||
pub id: i64,
|
||||
pub uuid: String,
|
||||
pub token: String,
|
||||
pub name: String,
|
||||
pub registered_at: String,
|
||||
}
|
||||
|
||||
/// Metadata for a Forgejo task that is currently in-flight within Solstice.
|
||||
#[derive(Debug)]
|
||||
pub struct TaskMeta {
|
||||
pub forgejo_task_id: i64,
|
||||
pub repo_url: String,
|
||||
pub commit_sha: String,
|
||||
#[allow(dead_code)]
|
||||
pub started_at: Instant,
|
||||
}
|
||||
|
||||
/// Shared state accessible by the poller and reporter tasks.
|
||||
pub struct RunnerState {
|
||||
pub identity: RunnerIdentity,
|
||||
/// Maps Solstice `request_id` → Forgejo task metadata.
|
||||
pub in_flight: DashMap<Uuid, TaskMeta>,
|
||||
/// Controls how many tasks can be in-flight simultaneously.
|
||||
pub semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl RunnerState {
|
||||
pub fn new(identity: RunnerIdentity, max_concurrency: usize) -> Self {
|
||||
Self {
|
||||
identity,
|
||||
in_flight: DashMap::new(),
|
||||
semaphore: Arc::new(Semaphore::new(max_concurrency)),
|
||||
}
|
||||
}
|
||||
}
|
||||
484
crates/runner-integration/src/translator.rs
Normal file
484
crates/runner-integration/src/translator.rs
Normal file
|
|
@ -0,0 +1,484 @@
|
|||
use miette::{IntoDiagnostic, Result, miette};
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::proto::runner::v1::Task;
|
||||
|
||||
/// The result of translating a Forgejo task.
|
||||
pub enum TranslateResult {
|
||||
/// Successfully translated into one or more JobRequests.
|
||||
Jobs(Vec<common::JobRequest>),
|
||||
/// The workflow is not supported — return this message to Forgejo as a failure.
|
||||
Unsupported(String),
|
||||
}
|
||||
|
||||
/// Context needed for translation.
|
||||
pub struct TranslateCtx {
|
||||
pub forgejo_base: Option<String>,
|
||||
pub forgejo_token: Option<String>,
|
||||
}
|
||||
|
||||
/// Translate a Forgejo Actions task into Solstice JobRequest(s).
|
||||
///
|
||||
/// Uses a 3-tier fallback strategy:
|
||||
/// 1. Fetch `.solstice/workflow.kdl` from the repo → parse into jobs
|
||||
/// 2. Parse the Actions YAML workflow_payload → extract `run` steps + matrix
|
||||
/// 3. Report unsupported workflow
|
||||
pub async fn translate_task(task: &Task, ctx: &TranslateCtx) -> Result<TranslateResult> {
|
||||
let (repo_url, owner, name, sha) = extract_repo_info(task)?;
|
||||
let group_id = Uuid::new_v4();
|
||||
|
||||
// --- Tier 1: Solstice KDL workflow ---
|
||||
if let Some(jobs) = try_kdl_workflow(ctx, &owner, &name, &repo_url, &sha, group_id).await? {
|
||||
if !jobs.is_empty() {
|
||||
info!(
|
||||
tier = 1,
|
||||
count = jobs.len(),
|
||||
repo = %repo_url,
|
||||
sha = %sha,
|
||||
"translated task via Solstice KDL workflow"
|
||||
);
|
||||
return Ok(TranslateResult::Jobs(jobs));
|
||||
}
|
||||
}
|
||||
|
||||
// --- Tier 2: Actions YAML extraction ---
|
||||
if let Some(payload_bytes) = &task.workflow_payload {
|
||||
match try_actions_yaml(payload_bytes, &repo_url, &owner, &name, &sha, group_id) {
|
||||
Ok(Some(jobs)) if !jobs.is_empty() => {
|
||||
info!(
|
||||
tier = 2,
|
||||
count = jobs.len(),
|
||||
repo = %repo_url,
|
||||
sha = %sha,
|
||||
"translated task via Actions YAML extraction"
|
||||
);
|
||||
return Ok(TranslateResult::Jobs(jobs));
|
||||
}
|
||||
Ok(_) => {
|
||||
debug!("Actions YAML produced no runnable jobs");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error = %e, "failed to parse Actions YAML; falling through to tier 3");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Tier 3: Unsupported ---
|
||||
Ok(TranslateResult::Unsupported(
|
||||
"solstice-ci runner: this repository requires a .solstice/workflow.kdl file \
|
||||
or a simple GitHub Actions workflow with only `run` steps. \
|
||||
Complex Actions features (uses, containers, services, expressions) are not supported. \
|
||||
See https://solstice-ci.dev/docs/forgejo-runner for details."
|
||||
.to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Extract repo URL, owner, name, and SHA from the task's github context.
|
||||
fn extract_repo_info(task: &Task) -> Result<(String, String, String, String)> {
|
||||
let ctx = task
|
||||
.context
|
||||
.as_ref()
|
||||
.ok_or_else(|| miette!("task has no github context"))?;
|
||||
|
||||
let fields = &ctx.fields;
|
||||
|
||||
// The context is a google.protobuf.Struct. Extract repository and sha.
|
||||
let repo_struct = fields
|
||||
.get("repository")
|
||||
.and_then(|v| v.kind.as_ref())
|
||||
.and_then(|k| match k {
|
||||
prost_types::value::Kind::StringValue(s) => {
|
||||
// Sometimes repository is just "owner/repo" string
|
||||
Some(s.clone())
|
||||
}
|
||||
_ => None,
|
||||
});
|
||||
|
||||
let sha = fields
|
||||
.get("sha")
|
||||
.and_then(|v| v.kind.as_ref())
|
||||
.and_then(|k| match k {
|
||||
prost_types::value::Kind::StringValue(s) => Some(s.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.ok_or_else(|| miette!("task context missing 'sha'"))?;
|
||||
|
||||
// Try to get server_url + repository for constructing clone URL
|
||||
let server_url = fields
|
||||
.get("server_url")
|
||||
.and_then(|v| v.kind.as_ref())
|
||||
.and_then(|k| match k {
|
||||
prost_types::value::Kind::StringValue(s) => Some(s.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
let repository = repo_struct.ok_or_else(|| miette!("task context missing 'repository'"))?;
|
||||
|
||||
// repository is "owner/repo"
|
||||
let (owner, name) = repository
|
||||
.split_once('/')
|
||||
.ok_or_else(|| miette!("invalid repository format: {}", repository))?;
|
||||
|
||||
let repo_url = if server_url.is_empty() {
|
||||
format!("https://forgejo.local/{}.git", repository)
|
||||
} else {
|
||||
format!("{}/{}.git", server_url.trim_end_matches('/'), repository)
|
||||
};
|
||||
|
||||
Ok((repo_url, owner.to_string(), name.to_string(), sha))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tier 1: Solstice KDL workflow
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async fn try_kdl_workflow(
|
||||
ctx: &TranslateCtx,
|
||||
owner: &str,
|
||||
repo: &str,
|
||||
repo_url: &str,
|
||||
sha: &str,
|
||||
group_id: Uuid,
|
||||
) -> Result<Option<Vec<common::JobRequest>>> {
|
||||
let base = match ctx.forgejo_base.as_deref() {
|
||||
Some(b) => b,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let url = format!(
|
||||
"{}/repos/{}/{}/contents/.solstice/workflow.kdl?ref={}",
|
||||
base.trim_end_matches('/'),
|
||||
owner,
|
||||
repo,
|
||||
sha
|
||||
);
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let mut req = client.get(&url);
|
||||
if let Some(tok) = ctx.forgejo_token.as_deref() {
|
||||
req = req.bearer_auth(tok);
|
||||
}
|
||||
|
||||
let resp = req.send().await.into_diagnostic()?;
|
||||
if !resp.status().is_success() {
|
||||
debug!(status = %resp.status(), "no .solstice/workflow.kdl found");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let v: serde_json::Value = resp.json().await.into_diagnostic()?;
|
||||
let kdl_text = decode_content_api_response(&v)?;
|
||||
|
||||
let jobs = parse_kdl_jobs(&kdl_text, repo_url, owner, repo, sha, group_id);
|
||||
Ok(Some(jobs))
|
||||
}
|
||||
|
||||
fn decode_content_api_response(v: &serde_json::Value) -> Result<String> {
|
||||
let encoding = v
|
||||
.get("encoding")
|
||||
.and_then(|e| e.as_str())
|
||||
.unwrap_or("base64");
|
||||
|
||||
if encoding.eq_ignore_ascii_case("base64") {
|
||||
let content = v
|
||||
.get("content")
|
||||
.and_then(|c| c.as_str())
|
||||
.ok_or_else(|| miette!("missing 'content' in API response"))?;
|
||||
|
||||
use base64::Engine;
|
||||
let decoded = base64::engine::general_purpose::STANDARD
|
||||
.decode(content.replace('\n', ""))
|
||||
.into_diagnostic()?;
|
||||
String::from_utf8(decoded).into_diagnostic()
|
||||
} else {
|
||||
v.get("content")
|
||||
.and_then(|c| c.as_str())
|
||||
.map(|s| s.to_string())
|
||||
.ok_or_else(|| miette!("missing 'content' in API response"))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_kdl_jobs(
|
||||
kdl: &str,
|
||||
repo_url: &str,
|
||||
owner: &str,
|
||||
repo: &str,
|
||||
sha: &str,
|
||||
group_id: Uuid,
|
||||
) -> Vec<common::JobRequest> {
|
||||
let mut out = Vec::new();
|
||||
let mut lines = kdl.lines().peekable();
|
||||
|
||||
while let Some(line) = lines.next() {
|
||||
let l = line.trim();
|
||||
if l.starts_with("job ") && l.contains("id=") {
|
||||
let id = capture_attr(l, "id");
|
||||
let mut runs_on = capture_attr(l, "runs_on");
|
||||
let mut script: Option<String> = None;
|
||||
|
||||
let mut depth = if l.ends_with('{') { 1i32 } else { 0 };
|
||||
while let Some(ln) = lines.peek().copied() {
|
||||
let t = ln.trim();
|
||||
if t.ends_with('{') {
|
||||
depth += 1;
|
||||
}
|
||||
if t.starts_with('}') {
|
||||
if depth <= 0 {
|
||||
break;
|
||||
}
|
||||
depth -= 1;
|
||||
if depth == 0 {
|
||||
lines.next();
|
||||
break;
|
||||
}
|
||||
}
|
||||
if t.starts_with("script ") && t.contains("path=") {
|
||||
if let Some(p) = capture_attr(t, "path") {
|
||||
script = Some(p);
|
||||
}
|
||||
}
|
||||
if t.contains("runs_on=") && runs_on.is_none() {
|
||||
runs_on = capture_attr(t, "runs_on");
|
||||
}
|
||||
lines.next();
|
||||
}
|
||||
|
||||
if let Some(id_val) = id {
|
||||
let mut jr = common::JobRequest::new(common::SourceSystem::Forgejo, repo_url, sha);
|
||||
jr.request_id = Uuid::new_v4();
|
||||
jr.group_id = Some(group_id);
|
||||
jr.repo_owner = Some(owner.to_string());
|
||||
jr.repo_name = Some(repo.to_string());
|
||||
jr.workflow_path = Some(".solstice/workflow.kdl".to_string());
|
||||
jr.workflow_job_id = Some(id_val);
|
||||
jr.runs_on = runs_on;
|
||||
jr.script_path = script;
|
||||
out.push(jr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no jobs parsed, create a single default job
|
||||
if out.is_empty() {
|
||||
let mut jr = common::JobRequest::new(common::SourceSystem::Forgejo, repo_url, sha);
|
||||
jr.group_id = Some(group_id);
|
||||
jr.repo_owner = Some(owner.to_string());
|
||||
jr.repo_name = Some(repo.to_string());
|
||||
jr.workflow_path = Some(".solstice/workflow.kdl".to_string());
|
||||
out.push(jr);
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
fn capture_attr(line: &str, key: &str) -> Option<String> {
|
||||
for delim in ['"', '\''] {
|
||||
let pattern = format!("{}={}", key, delim);
|
||||
if let Some(start) = line.find(&pattern) {
|
||||
let rest = &line[start + pattern.len()..];
|
||||
if let Some(end) = rest.find(delim) {
|
||||
return Some(rest[..end].to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tier 2: Actions YAML extraction
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn try_actions_yaml(
|
||||
payload_bytes: &[u8],
|
||||
repo_url: &str,
|
||||
owner: &str,
|
||||
repo: &str,
|
||||
sha: &str,
|
||||
group_id: Uuid,
|
||||
) -> Result<Option<Vec<common::JobRequest>>> {
|
||||
let yaml_str = std::str::from_utf8(payload_bytes).into_diagnostic()?;
|
||||
let doc: serde_yaml::Value = serde_yaml::from_str(yaml_str).into_diagnostic()?;
|
||||
|
||||
let jobs_map = doc
|
||||
.get("jobs")
|
||||
.and_then(|j| j.as_mapping())
|
||||
.ok_or_else(|| miette!("Actions YAML has no 'jobs' mapping"))?;
|
||||
|
||||
let mut results = Vec::new();
|
||||
|
||||
for (_job_name, job_def) in jobs_map {
|
||||
let job_map = match job_def.as_mapping() {
|
||||
Some(m) => m,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
// Check for unsupported features
|
||||
if has_unsupported_features(job_map) {
|
||||
return Ok(None); // Fall through to tier 3
|
||||
}
|
||||
|
||||
// Extract run steps
|
||||
let steps = match job_def.get("steps").and_then(|s| s.as_sequence()) {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let run_steps: Vec<&str> = steps
|
||||
.iter()
|
||||
.filter_map(|step| step.get("run").and_then(|r| r.as_str()))
|
||||
.collect();
|
||||
|
||||
if run_steps.is_empty() {
|
||||
// All steps use `uses:` — unsupported
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Check if any step uses `uses:` — mixed is unsupported
|
||||
let has_uses = steps.iter().any(|step| step.get("uses").is_some());
|
||||
if has_uses {
|
||||
return Ok(None); // Fall through to tier 3
|
||||
}
|
||||
|
||||
// Extract runs-on
|
||||
let runs_on_value = job_def.get("runs-on");
|
||||
|
||||
// Handle matrix strategy
|
||||
let matrix = job_def
|
||||
.get("strategy")
|
||||
.and_then(|s| s.get("matrix"))
|
||||
.and_then(|m| m.as_mapping());
|
||||
|
||||
let script = run_steps.join("\n");
|
||||
|
||||
if let Some(matrix_map) = matrix {
|
||||
// Expand matrix into separate jobs
|
||||
let combos = expand_matrix(matrix_map);
|
||||
for combo in &combos {
|
||||
// Resolve runs-on: if it references a matrix variable, substitute
|
||||
let resolved_runs_on = resolve_runs_on(runs_on_value, combo);
|
||||
|
||||
let mut jr = common::JobRequest::new(common::SourceSystem::Forgejo, repo_url, sha);
|
||||
jr.request_id = Uuid::new_v4();
|
||||
jr.group_id = Some(group_id);
|
||||
jr.repo_owner = Some(owner.to_string());
|
||||
jr.repo_name = Some(repo.to_string());
|
||||
jr.runs_on = resolved_runs_on;
|
||||
jr.script_path = Some(format!(".solstice-generated-{}.sh", jr.request_id));
|
||||
results.push((jr, script.clone()));
|
||||
}
|
||||
} else {
|
||||
let resolved_runs_on = runs_on_value
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
let mut jr = common::JobRequest::new(common::SourceSystem::Forgejo, repo_url, sha);
|
||||
jr.request_id = Uuid::new_v4();
|
||||
jr.group_id = Some(group_id);
|
||||
jr.repo_owner = Some(owner.to_string());
|
||||
jr.repo_name = Some(repo.to_string());
|
||||
jr.runs_on = resolved_runs_on;
|
||||
jr.script_path = Some(format!(".solstice-generated-{}.sh", jr.request_id));
|
||||
results.push((jr, script.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
if results.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// For tier 2, we just create the JobRequests — the script content will need
|
||||
// to be handled by the orchestrator. For now, embed a hint in the script_path.
|
||||
// TODO: Consider passing the script content via a sidecar mechanism.
|
||||
Ok(Some(results.into_iter().map(|(jr, _)| jr).collect()))
|
||||
}
|
||||
|
||||
fn has_unsupported_features(job_map: &serde_yaml::Mapping) -> bool {
|
||||
let unsupported_keys = ["container", "services"];
|
||||
for key in &unsupported_keys {
|
||||
if job_map.contains_key(serde_yaml::Value::String(key.to_string())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Expand a matrix mapping into all combinations.
|
||||
/// E.g., `{os: [ubuntu, fedora], version: [1, 2]}` → 4 combos.
|
||||
fn expand_matrix(matrix: &serde_yaml::Mapping) -> Vec<Vec<(String, String)>> {
|
||||
let mut axes: Vec<(String, Vec<String>)> = Vec::new();
|
||||
|
||||
for (key, value) in matrix {
|
||||
let key_str = match key.as_str() {
|
||||
Some(s) => s.to_string(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
// Skip special keys like "include", "exclude"
|
||||
if key_str == "include" || key_str == "exclude" {
|
||||
continue;
|
||||
}
|
||||
|
||||
let values: Vec<String> = match value.as_sequence() {
|
||||
Some(seq) => seq
|
||||
.iter()
|
||||
.filter_map(|v| {
|
||||
v.as_str()
|
||||
.map(|s| s.to_string())
|
||||
.or_else(|| v.as_i64().map(|i| i.to_string()))
|
||||
.or_else(|| v.as_f64().map(|f| f.to_string()))
|
||||
.or_else(|| v.as_bool().map(|b| b.to_string()))
|
||||
})
|
||||
.collect(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
if !values.is_empty() {
|
||||
axes.push((key_str, values));
|
||||
}
|
||||
}
|
||||
|
||||
if axes.is_empty() {
|
||||
return vec![vec![]];
|
||||
}
|
||||
|
||||
// Cartesian product
|
||||
let mut combos: Vec<Vec<(String, String)>> = vec![vec![]];
|
||||
for (key, values) in &axes {
|
||||
let mut new_combos = Vec::new();
|
||||
for combo in &combos {
|
||||
for val in values {
|
||||
let mut new = combo.clone();
|
||||
new.push((key.clone(), val.clone()));
|
||||
new_combos.push(new);
|
||||
}
|
||||
}
|
||||
combos = new_combos;
|
||||
}
|
||||
|
||||
combos
|
||||
}
|
||||
|
||||
/// Resolve `runs-on` which may reference a matrix variable like `${{ matrix.os }}`.
|
||||
fn resolve_runs_on(
|
||||
runs_on: Option<&serde_yaml::Value>,
|
||||
matrix_combo: &[(String, String)],
|
||||
) -> Option<String> {
|
||||
let val = runs_on?;
|
||||
let s = val.as_str()?;
|
||||
|
||||
// Check for matrix expression: ${{ matrix.KEY }}
|
||||
if s.contains("${{") && s.contains("matrix.") {
|
||||
let mut resolved = s.to_string();
|
||||
for (key, value) in matrix_combo {
|
||||
let pattern = format!("${{{{ matrix.{} }}}}", key);
|
||||
resolved = resolved.replace(&pattern, value);
|
||||
// Also handle without spaces
|
||||
let pattern_nospace = format!("${{{{matrix.{}}}}}", key);
|
||||
resolved = resolved.replace(&pattern_nospace, value);
|
||||
}
|
||||
Some(resolved)
|
||||
} else {
|
||||
Some(s.to_string())
|
||||
}
|
||||
}
|
||||
25
deploy/images/runner-integration/Containerfile
Normal file
25
deploy/images/runner-integration/Containerfile
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
# syntax=docker/dockerfile:1.7
|
||||
# Build Solstice Runner Integration using upstream official images
|
||||
|
||||
FROM docker.io/library/rust:bookworm AS builder
|
||||
ENV CARGO_HOME=/cargo
|
||||
WORKDIR /work
|
||||
# Install protoc for tonic/prost builds
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends protobuf-compiler libprotobuf-dev ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
# Configure cargo target-dir so it can be cached between layers
|
||||
RUN mkdir -p /cargo && printf "[build]\ntarget-dir = \"/cargo/target\"\n" > /cargo/config.toml
|
||||
COPY Cargo.toml ./
|
||||
COPY crates ./crates
|
||||
RUN --mount=type=cache,target=/cargo/registry,sharing=locked \
|
||||
--mount=type=cache,target=/cargo/git,sharing=locked \
|
||||
--mount=type=cache,target=/cargo/target,sharing=locked \
|
||||
cargo build --release -p runner-integration && cp /cargo/target/release/solstice-runner-integration /solstice-runner-integration
|
||||
|
||||
FROM docker.io/library/debian:bookworm-slim
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
COPY --from=builder /solstice-runner-integration /usr/local/bin/solstice-runner-integration
|
||||
ENTRYPOINT ["/usr/local/bin/solstice-runner-integration"]
|
||||
|
|
@ -23,6 +23,7 @@ volumes:
|
|||
postgres-data:
|
||||
rabbitmq-data:
|
||||
minio-data:
|
||||
runner-state:
|
||||
|
||||
|
||||
services:
|
||||
|
|
@ -33,11 +34,18 @@ services:
|
|||
environment:
|
||||
DOCKER_API_VERSION: ${DOCKER_API_VERSION:-1.44}
|
||||
command:
|
||||
- --log.level=DEBUG
|
||||
- --accesslog=true
|
||||
- --api.dashboard=true
|
||||
- --providers.docker=true
|
||||
- --providers.docker.exposedbydefault=false
|
||||
- --entrypoints.web.address=:80
|
||||
- --entrypoints.web.http.redirections.entrypoint.to=websecure
|
||||
- --entrypoints.web.http.redirections.entrypoint.scheme=https
|
||||
- --entrypoints.websecure.address=:443
|
||||
- --entrypoints.websecure.transport.respondingTimeouts.readTimeout=0s
|
||||
- --entrypoints.websecure.transport.respondingTimeouts.writeTimeout=0s
|
||||
- --entrypoints.websecure.transport.respondingTimeouts.idleTimeout=360s
|
||||
- --certificatesresolvers.le.acme.email=${TRAEFIK_ACME_EMAIL}
|
||||
- --certificatesresolvers.le.acme.storage=/acme/acme.json
|
||||
- --certificatesresolvers.le.acme.httpchallenge=true
|
||||
|
|
@ -341,3 +349,32 @@ services:
|
|||
- traefik.http.routers.github.entrypoints=websecure
|
||||
- traefik.http.routers.github.tls.certresolver=le
|
||||
- traefik.http.services.github.loadbalancer.server.port=8082
|
||||
|
||||
runner-integration:
|
||||
build:
|
||||
context: ../..
|
||||
dockerfile: deploy/images/runner-integration/Containerfile
|
||||
image: local/solstice-runner-integration:latest
|
||||
container_name: solstice-runner-integration
|
||||
restart: unless-stopped
|
||||
environment:
|
||||
RUST_LOG: info
|
||||
AMQP_URL: amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@rabbitmq:5672/solstice-${ENV}
|
||||
AMQP_RESULTS_QUEUE: solstice.runner-results.v1
|
||||
# Forgejo runner configuration
|
||||
FORGEJO_URL: ${FORGEJO_URL}
|
||||
FORGEJO_BASE_URL: ${FORGEJO_BASE_URL}
|
||||
FORGEJO_TOKEN: ${FORGEJO_TOKEN}
|
||||
RUNNER_REGISTRATION_TOKEN: ${RUNNER_REGISTRATION_TOKEN}
|
||||
RUNNER_NAME: solstice-runner-${ENV}
|
||||
RUNNER_LABELS: ${RUNNER_LABELS:-self-hosted}
|
||||
RUNNER_STATE_PATH: /data/runner-state.json
|
||||
MAX_CONCURRENCY: ${RUNNER_MAX_CONCURRENCY:-4}
|
||||
LOGS_BASE_URL: https://logs.${ENV}.${DOMAIN}
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
volumes:
|
||||
- runner-state:/data:Z
|
||||
networks:
|
||||
- core
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue