Add orchestrator persistence using SeaORM for initial database support

This commit introduces a persistence layer to the Orchestrator, enabling it to optionally connect to a Postgres database for recording job and VM states. It includes:

- SeaORM integration with support for migrations from the migration crate.
- `Persist` module with methods for job and VM state upserts.
- No-op fallback when persistence is disabled or unavailable.
- Documentation updates and test coverage for persistence functionality.
This commit is contained in:
Till Wegmueller 2025-10-26 15:38:54 +01:00
parent 6ddfa9a0b0
commit 6568183d86
No known key found for this signature in database
8 changed files with 745 additions and 89 deletions

183
README.md
View file

@ -1,3 +1,182 @@
# solstice-ci
# Solstice CI
Build your code on multiple Operating Systems Easily
Build your code on multiple operating systems easily — with firstclass support for Illumos and Linux, and a simple, observable pipeline.
Solstice CI is a Rust workspace that provides:
- Orchestration to provision perjob virtual machines (libvirt/KVM) (bHyve)
- A simple workflow format (KDL) and runner
- Integration layers for forges (Forgejo/Gitea, GitHub) based on messaging (RabbitMQ)
- Shared telemetry and utilities
This README follows best practices inspired by awesome-readme. It focuses on how to get productive locally, what you need to run it, and how to contribute.
## Table of Contents
- Overview
- Features
- OS and Tooling Requirements
- Quickstart: Local Development
- Running the Services
- Configuration (Environment Variables)
- Testing
- Troubleshooting
- Contributing (Codeberg Pull Requests)
- License
## Overview
Solstice CI is a modular CI system:
- crates/orchestrator — schedules jobs and provisions VMs per job
- crates/forge-integration — receives webhooks and enqueues job requests
- crates/workflow-runner — executes a KDL workflow inside a VM/agent
- crates/common — shared types, telemetry, and messaging helpers
- crates/ciadm and crates/cidev — small CLIs to aid development and admin tasks
## Features
- Per-job VMs via libvirt/KVM with image prefetch and capacity controls
- RabbitMQ for job request queueing (durable, backpressure via prefetch)
- gRPC/HTTP components and structured telemetry (tracing + OTLP)
- Simple development flows with cargo and docker-compose
## OS and Tooling Requirements
Minimum supported host OS for end-to-end local runs:
- Linux x86_64 (recommended): required for libvirt/KVM hypervisor path
- macOS / Windows: supported for building, running non-hypervisor services, and exercising CLIs; full VM orchestration is Linux-only currently
Required tools:
- Rust (stable, edition 2024). Install via rustup: https://rustup.rs
- Cargo (comes with rustup)
- Docker or Podman for running RabbitMQ locally (docker-compose file provided)
- Git
Optional but recommended:
- Postgres (DATABASE_URL). Some binaries accept a database URL; current snapshots may not require a live DB for basic dev loops
- An OpenTelemetry collector (OTLP) for tracing (or rely on stderr logs)
Hardware hints for Linux/local VM testing:
- CPU virtualization enabled (VT-x/AMD-V)
- 8 GB RAM+ recommended for running VMs alongside tooling
## Quickstart: Local Development
1) Clone the repo
- git clone https://codeberg.org/your-namespace/solstice-ci.git
- cd solstice-ci
2) Start RabbitMQ locally
- docker compose up -d rabbitmq
- Management UI: http://localhost:15672 (guest/guest)
3) Build everything
- cargo build --workspace
4) (Linux) Prepare an example orchestrator config
- The orchestrator defaults to examples/orchestrator-image-map.yaml. Edit it to point local_path to a writable location; the orchestrator will download images as needed.
5) Run services in separate terminals
- Forge integration (webhooks) or enqueue sample jobs
- Orchestrator (VM scheduler/hypervisor)
6) Inspect logs
- Logs are structured via tracing. Set RUST_LOG=info (or debug/trace) as needed.
## Running the Services
Environment defaults are sensible for local dev; override via flags or env vars.
Forge Integration (HTTP webhook receiver) — also supports a manual enqueue mode:
- cargo run -p forge-integration -- --help
- cargo run -p forge-integration -- enqueue --repo-url https://codeberg.org/example/repo.git --commit-sha deadbeef --runs-on illumos-latest
- cargo run -p forge-integration -- --http-addr 0.0.0.0:8080 --webhook-path /webhooks/forgejo
Orchestrator (Linux + libvirt/KVM):
- cargo run -p orchestrator -- --config examples/orchestrator-image-map.yaml --grpc-addr 0.0.0.0:50051
- You can cap concurrency and per-label capacity with --max-concurrency and --capacity-map, e.g. --capacity-map illumos-latest=1,ubuntu-22.04=2
Workflow runner (agent, useful for inspection of KDL files):
- cargo run -p workflow-runner -- --workflow path/to/workflow.kdl
Developer helper (validate/list/show workflow KDL):
- cargo run -p cidev -- validate --path path/to/workflow.kdl
- cargo run -p cidev -- list --path path/to/workflow.kdl
- cargo run -p cidev -- show --path path/to/workflow.kdl --job build
## Configuration (Environment Variables)
Common env (see crates and guidelines for full list):
- RUST_LOG=info
- OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 (optional)
- AMQP_URL=amqp://127.0.0.1:5672/%2f
- AMQP_EXCHANGE=solstice.jobs
- AMQP_QUEUE=solstice.jobs.v1
- AMQP_ROUTING_KEY=jobrequest.v1
- AMQP_PREFETCH=64
Orchestrator specifics:
- ORCH_CONFIG=examples/orchestrator-image-map.yaml
- MAX_CONCURRENCY=2
- CAPACITY_MAP=illumos-latest=1,ubuntu-22.04=2
- GRPC_ADDR=0.0.0.0:50051
- DATABASE_URL=postgres://user:pass@localhost:5432/solstice (if used)
- LIBVIRT_URI=qemu:///system (Linux)
- LIBVIRT_NETWORK=default
Forge integration specifics:
- HTTP_ADDR=0.0.0.0:8080
- WEBHOOK_PATH=/webhooks/forgejo
- WEBHOOK_SECRET=... (recommended in real setups)
## Testing
- Run all tests across the workspace:
- cargo test --workspace
- Run a specific crates tests:
- cargo test -p orchestrator
- Run an integration test by name filter:
- cargo test smoke
## Troubleshooting
- No logs? Ensure tracing is initialized once and set RUST_LOG appropriately.
- RabbitMQ not reachable? Verify docker compose is running and ports 5672/15672 are open.
- VM provisioning errors on non-Linux hosts: hypervisor features require Linux. Use a Linux machine or dev container/VM.
- Image downloads fail: check examples/orchestrator-image-map.yaml entries and local_path permissions.
## Contributing (Codeberg Pull Requests)
We welcome contributions via the standard fork-and-pull-request workflow on Codeberg.
1) Fork the repository on Codeberg
- https://codeberg.org/your-namespace/solstice-ci (replace with the actual path)
2) Create a feature branch
- git checkout -b feat/short-description
3) Make changes and commit
- Follow conventional, descriptive commit messages when practical
- cargo fmt; cargo clippy --workspace --all-targets --all-features
- cargo test --workspace
4) Push your branch to your fork
- git push origin feat/short-description
5) Open a Pull Request on Codeberg
- Describe the change, motivations, and testing steps
- Link to any related issues
6) Review process
- Automated checks will run
- A maintainer will review and may request changes
7) Getting your PR merged
- Ensure feedback is addressed
- Squash/rebase as requested by maintainers
Security and credentials:
- Do not commit secrets. Use env vars locally and a secret store in deployments.
## License
This project is licensed under the Mozilla Public License 2.0 (MPL-2.0). See LICENSE for details.

View file

@ -8,5 +8,5 @@ name = "migration"
path = "src/lib.rs"
[dependencies]
sea-orm-migration = { version = "0.12", default-features = false, features = ["runtime-tokio-rustls", "sqlx-postgres"] }
sea-orm = { version = "0.12", default-features = false, features = ["sqlx-postgres", "runtime-tokio-rustls"] }
sea-orm-migration = { version = "0.12", default-features = false, features = ["runtime-tokio-rustls", "sqlx-postgres", "sqlx-sqlite"] }
sea-orm = { version = "0.12", default-features = false, features = ["sqlx-postgres", "sqlx-sqlite", "runtime-tokio-rustls"] }

View file

@ -25,10 +25,12 @@ zstd = "0.13"
# Linux-only optional libvirt bindings (feature-gated)
libvirt = { version = "0.1", optional = true }
# DB (optional basic persistence)
sea-orm = { version = "0.12", default-features = false, features = ["sqlx-postgres", "runtime-tokio-rustls" ] }
migration = { path = "../migration", optional = true }
sea-orm = { version = "0.12", default-features = false, features = ["sqlx-postgres", "sqlx-sqlite", "runtime-tokio-rustls", "macros", "with-uuid", "with-chrono" ] }
migration = { path = "../migration" }
sea-orm-migration = { version = "0.12" }
chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] }
# Utilities
once_cell = "1"
dashmap = "6"
async-trait = "0.1"
uuid = { version = "1", features = ["v4"] }
uuid = { version = "1", features = ["v4", "serde"] }

View file

@ -145,3 +145,39 @@ pub async fn ensure_images(cfg: &OrchestratorConfig) -> Result<()> {
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_alias_resolution_and_image_lookup() {
let yaml = r#"
default_label: illumos-latest
aliases:
illumos-latest: openindiana-hipster
images:
openindiana-hipster:
source: "https://example.com/oi.img"
local_path: "/tmp/oi.img"
decompress: zstd
nocloud: true
defaults:
cpu: 2
ram_mb: 2048
disk_gb: 20
"#;
let cfg: OrchestratorConfig = serde_yaml::from_str(yaml).expect("parse yaml");
// resolve default
assert_eq!(cfg.resolve_label(None), Some("openindiana-hipster"));
// alias mapping
assert_eq!(cfg.resolve_label(Some("illumos-latest")), Some("openindiana-hipster"));
// image for canonical key
let img = cfg.image_for("openindiana-hipster").expect("image exists");
assert!(img.nocloud);
assert_eq!(img.defaults.as_ref().and_then(|d| d.cpu), Some(2));
assert_eq!(img.defaults.as_ref().and_then(|d| d.ram_mb), Some(2048));
assert_eq!(img.defaults.as_ref().and_then(|d| d.disk_gb), Some(20));
}
}

View file

@ -1,6 +1,7 @@
mod config;
mod hypervisor;
mod scheduler;
mod persist;
use std::{collections::HashMap, path::PathBuf, time::Duration};
@ -9,8 +10,10 @@ use miette::{IntoDiagnostic as _, Result};
use tracing::{info, warn};
use config::OrchestratorConfig;
use hypervisor::{RouterHypervisor, NoopHypervisor, VmSpec, JobContext, Hypervisor};
use hypervisor::{RouterHypervisor, VmSpec, JobContext};
use scheduler::{Scheduler, SchedItem};
use std::sync::Arc;
use crate::persist::{Persist, JobState};
#[derive(Parser, Debug)]
#[command(name = "solstice-orchestrator", version, about = "Solstice CI Orchestrator")]
@ -84,6 +87,9 @@ async fn main() -> Result<()> {
// Build hypervisor router
let router = RouterHypervisor::build(opts.libvirt_uri.clone(), opts.libvirt_network.clone());
// Initialize persistence
let persist = Arc::new(Persist::new(Some(opts.database_url.clone())).await?);
// Build MQ config and start consumer
let mq_cfg = common::MqConfig {
url: opts.amqp_url,
@ -96,7 +102,7 @@ async fn main() -> Result<()> {
};
// Scheduler
let sched = Scheduler::new(router, opts.max_concurrency, &capacity_map);
let sched = Scheduler::new(router, opts.max_concurrency, &capacity_map, persist.clone());
let sched_tx = sched.sender();
let scheduler_task = tokio::spawn(async move {
if let Err(e) = sched.run().await {
@ -108,10 +114,12 @@ async fn main() -> Result<()> {
let cfg_clone = cfg.clone();
let mq_cfg_clone = mq_cfg.clone();
let tx_for_consumer = sched_tx.clone();
let persist_for_consumer = persist.clone();
let consumer_task = tokio::spawn(async move {
common::consume_jobs(&mq_cfg_clone, move |job| {
let sched_tx = tx_for_consumer.clone();
let cfg = cfg_clone.clone();
let persist = persist_for_consumer.clone();
async move {
let label_resolved = cfg.resolve_label(job.runs_on.as_deref()).unwrap_or(&cfg.default_label).to_string();
let image = match cfg.image_for(&label_resolved) {
@ -121,6 +129,8 @@ async fn main() -> Result<()> {
miette::bail!("no image mapping for label {}", label_resolved);
}
};
// Record job queued state
let _ = persist.record_job_state(job.request_id, &job.repo_url, &job.commit_sha, job.runs_on.as_deref(), JobState::Queued).await;
// Build spec
let (cpu, ram_mb, disk_gb) = (
image.defaults.as_ref().and_then(|d| d.cpu).unwrap_or(2),
@ -173,7 +183,9 @@ fn parse_capacity_map(s: Option<&str>) -> HashMap<String, usize> {
for part in s.split(',') {
if part.trim().is_empty() { continue; }
if let Some((k,v)) = part.split_once('=') {
if let Ok(n) = v.parse::<usize>() { m.insert(k.trim().to_string(), n); }
let k = k.trim();
if k.is_empty() { continue; }
if let Ok(n) = v.parse::<usize>() { m.insert(k.to_string(), n); }
}
}
}
@ -192,3 +204,31 @@ write_files:
"#, repo = repo_url, sha = commit_sha);
s.into_bytes()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_capacity_map_mixed_input() {
let m = parse_capacity_map(Some("illumos-latest=2, ubuntu-22.04=4, bad=, =3, other=notnum, foo=5, ,"));
assert_eq!(m.get("illumos-latest"), Some(&2));
assert_eq!(m.get("ubuntu-22.04"), Some(&4));
assert_eq!(m.get("foo"), Some(&5));
assert!(m.get("bad").is_none());
assert!(m.get("").is_none());
assert!(m.get("other").is_none());
}
#[test]
fn test_make_cloud_init_userdata_includes_fields() {
let data = make_cloud_init_userdata("https://example.com/repo.git", "deadbeef");
let s = String::from_utf8(data).unwrap();
assert!(s.contains("#cloud-config"));
assert!(s.contains("repo_url: https://example.com/repo.git"));
assert!(s.contains("commit_sha: deadbeef"));
assert!(s.contains("write_files:"));
assert!(s.contains("/etc/solstice/job.yaml"));
}
}

View file

@ -1,78 +1,292 @@
use miette::{IntoDiagnostic as _, Result};
use sea_orm::{Database, DatabaseConnection, sea_query::{OnConflict, Expr}, Query, EntityTrait, DbBackend, Statement};
use sea_orm::sea_query::{InsertStatement, PostgresQueryBuilder, ColumnRef};
use time::OffsetDateTime;
use sea_orm::{entity::prelude::*, Database, DatabaseConnection, Set, ActiveModelTrait, QueryFilter, ColumnTrait};
use sea_orm::sea_query::{OnConflict, Expr};
use sea_orm_migration::MigratorTrait;
use tracing::{debug, info, warn};
use uuid::Uuid;
use chrono::Utc;
pub async fn maybe_init_db(url: &str, run_migrations: bool) -> Result<Option<DatabaseConnection>> {
// Only enable DB when explicitly requested to avoid forcing Postgres locally.
let enabled = std::env::var("ORCH_ENABLE_DB").ok().map(|v| v == "1" || v.eq_ignore_ascii_case("true")).unwrap_or(false);
if !enabled { return Ok(None); }
let db = Database::connect(url).await.into_diagnostic()?;
if run_migrations {
migration::Migrator::up(&db, None).await.into_diagnostic()?;
}
Ok(Some(db))
/// Minimal persistence module for the Orchestrator with real upserts for jobs & vms.
#[derive(Clone)]
pub struct Persist {
db: Option<DatabaseConnection>,
}
#[derive(Debug, Clone, Copy)]
pub enum JobState { Queued, Running, Succeeded, Failed }
impl JobState {
fn as_str(self) -> &'static str {
match self { JobState::Queued => "queued", JobState::Running => "running", JobState::Succeeded => "succeeded", JobState::Failed => "failed" }
}
}
#[derive(Debug, Clone, Copy)]
pub enum VmPersistState { Prepared, Running, Stopped, Destroyed }
impl VmPersistState {
fn as_str(self) -> &'static str {
match self { VmPersistState::Prepared => "prepared", VmPersistState::Running => "running", VmPersistState::Stopped => "stopped", VmPersistState::Destroyed => "destroyed" }
}
}
// ----------------------------
// SeaORM Entities (inline)
// ----------------------------
mod jobs {
use super::*;
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "jobs")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub request_id: Uuid,
pub repo_url: String,
pub commit_sha: String,
pub runs_on: Option<String>,
pub state: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
}
mod vms {
use super::*;
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "vms")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub request_id: Uuid,
#[sea_orm(primary_key, auto_increment = false)]
pub domain_name: String,
pub overlay_path: Option<String>,
pub seed_path: Option<String>,
pub backend: String,
pub state: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
}
impl Persist {
/// Initialize persistence. If `database_url` is provided and the connection
/// succeeds, migrations are applied and the connection is retained.
/// If not provided or connection fails, persistence is disabled (no-op).
pub async fn new(database_url: Option<String>) -> Result<Self> {
if let Some(url) = database_url.filter(|s| !s.trim().is_empty()) {
// Use a single connection for SQLite in-memory to avoid separate empty DBs per checkout
let mut opts = sea_orm::ConnectOptions::new(url.clone());
// One connection is enough for tests and avoids in-memory SQLite pitfalls
opts.max_connections(1)
.min_connections(1)
.sqlx_logging(false);
match Database::connect(opts).await.into_diagnostic() {
Ok(db) => {
// Apply migrations idempotently
if let Err(e) = migration::Migrator::up(&db, None).await.into_diagnostic() {
#[cfg(test)]
{
// In tests, surface the migration failure to help debugging
return Err(miette::miette!("migration failed: {e}"));
}
#[cfg(not(test))]
{
warn!(error = %e, "failed to apply migrations; proceeding without persistence");
return Ok(Self { db: None });
}
}
info!("persistence initialized (connected and migrated)");
return Ok(Self { db: Some(db) });
}
Err(e) => warn!(error = %e, "failed to connect to database; persistence disabled"),
}
} else {
warn!("DATABASE_URL not set; persistence disabled");
}
Ok(Self { db: None })
}
pub fn is_enabled(&self) -> bool { self.db.is_some() }
/// Upsert a job row by request_id.
pub async fn record_job_state(
db: &DatabaseConnection,
&self,
request_id: Uuid,
repo_url: &str,
commit_sha: &str,
runs_on: Option<&str>,
state: &str,
state: JobState,
) -> Result<()> {
let now = OffsetDateTime::now_utc();
// INSERT ... ON CONFLICT (request_id) DO UPDATE SET state=EXCLUDED.state, updated_at=EXCLUDED.updated_at
let mut insert: InsertStatement = sea_orm::sea_query::Query::insert()
.into_table(Jobs::Table)
.columns([
Jobs::RequestId,
Jobs::RepoUrl,
Jobs::CommitSha,
Jobs::RunsOn,
Jobs::State,
Jobs::CreatedAt,
Jobs::UpdatedAt,
])
.values_panic([
Expr::val(request_id),
Expr::val(repo_url),
Expr::val(commit_sha),
Expr::val(runs_on.map(|s| s.to_string())),
Expr::val(state),
Expr::val(now),
Expr::val(now),
])
let Some(db) = self.db.as_ref() else {
debug!(%request_id, state = state.as_str(), "record_job_state (noop)");
return Ok(());
};
let now = Utc::now();
let am = jobs::ActiveModel {
request_id: Set(request_id),
repo_url: Set(repo_url.to_string()),
commit_sha: Set(commit_sha.to_string()),
runs_on: Set(runs_on.map(|s| s.to_string())),
state: Set(state.as_str().to_string()),
created_at: Set(now),
updated_at: Set(now),
};
jobs::Entity::insert(am)
.on_conflict(
OnConflict::column(Jobs::RequestId)
.update_columns([Jobs::State, Jobs::UpdatedAt])
OnConflict::column(jobs::Column::RequestId)
.update_columns([
jobs::Column::RepoUrl,
jobs::Column::CommitSha,
jobs::Column::RunsOn,
jobs::Column::State,
jobs::Column::UpdatedAt,
])
.to_owned(),
)
.to_owned();
let sql = insert.build(PostgresQueryBuilder);
db.execute(Statement::from_string(DbBackend::Postgres, sql)).await.into_diagnostic()?;
.exec(db)
.await
.into_diagnostic()?;
Ok(())
}
#[derive(sea_orm::sea_query::Iden)]
enum Jobs {
#[iden = "jobs"]
Table,
#[iden = "request_id"]
RequestId,
#[iden = "repo_url"]
RepoUrl,
#[iden = "commit_sha"]
CommitSha,
#[iden = "runs_on"]
RunsOn,
#[iden = "state"]
State,
#[iden = "created_at"]
CreatedAt,
#[iden = "updated_at"]
UpdatedAt,
/// Update a VM row by (request_id, domain_name) or insert when missing.
pub async fn record_vm_event(
&self,
request_id: Uuid,
domain_name: &str,
overlay_path: Option<&str>,
seed_path: Option<&str>,
backend: Option<&str>,
state: VmPersistState,
) -> Result<()> {
let Some(db) = self.db.as_ref() else {
debug!(%request_id, domain = %domain_name, state = state.as_str(), "record_vm_event (noop)");
return Ok(());
};
let now = Utc::now();
let backend_val = backend.unwrap_or("unknown");
let state_val = state.as_str();
let res = vms::Entity::update_many()
.col_expr(vms::Column::OverlayPath, Expr::value(overlay_path.map(|s| s.to_string())))
.col_expr(vms::Column::SeedPath, Expr::value(seed_path.map(|s| s.to_string())))
.col_expr(vms::Column::Backend, Expr::value(backend_val))
.col_expr(vms::Column::State, Expr::value(state_val))
.col_expr(vms::Column::UpdatedAt, Expr::value(now))
.filter(vms::Column::RequestId.eq(request_id))
.filter(vms::Column::DomainName.eq(domain_name))
.exec(db)
.await
.into_diagnostic()?;
if res.rows_affected == 0 {
let am = vms::ActiveModel {
request_id: Set(request_id),
domain_name: Set(domain_name.to_string()),
overlay_path: Set(overlay_path.map(|s| s.to_string())),
seed_path: Set(seed_path.map(|s| s.to_string())),
backend: Set(backend_val.to_string()),
state: Set(state_val.to_string()),
created_at: Set(now),
updated_at: Set(now),
};
vms::Entity::insert(am).exec(db).await.into_diagnostic()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn sqlite_memory_db() -> DatabaseConnection {
let mut opts = sea_orm::ConnectOptions::new("sqlite::memory:".to_string());
opts.max_connections(1).min_connections(1).sqlx_logging(false);
let db = Database::connect(opts).await.expect("sqlite memory connect");
// Create tables from entities to avoid using migrator (faster and avoids migration bookkeeping table)
let backend = db.get_database_backend();
let schema = sea_orm::Schema::new(backend);
let stmt_jobs = schema.create_table_from_entity(jobs::Entity);
let stmt_vms = schema.create_table_from_entity(vms::Entity);
use sea_orm_migration::prelude::SchemaManager;
let manager = SchemaManager::new(&db);
manager.create_table(stmt_jobs).await.expect("create jobs");
manager.create_table(stmt_vms).await.expect("create vms");
db
}
#[tokio::test]
async fn test_job_upsert_sqlite() {
let db = sqlite_memory_db().await;
let p = Persist { db: Some(db.clone()) };
let req = Uuid::new_v4();
let repo = "https://example.com/repo.git";
let sha = "deadbeef";
// Insert queued
p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Queued).await.expect("insert queued");
// Fetch
let row = jobs::Entity::find_by_id(req).one(&db).await.expect("query").expect("row exists");
assert_eq!(row.state, "queued");
// Update to running
p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Running).await.expect("update running");
let row2 = jobs::Entity::find_by_id(req).one(&db).await.expect("query").expect("row exists");
assert_eq!(row2.state, "running");
assert!(row2.updated_at >= row.created_at);
}
#[tokio::test]
async fn test_vm_event_upsert_sqlite() {
let db = sqlite_memory_db().await;
let p = Persist { db: Some(db.clone()) };
let req = Uuid::new_v4();
let domain = format!("job-{}", req);
// prepared -> running -> destroyed
p.record_vm_event(req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Prepared).await.expect("prepared");
p.record_vm_event(req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Running).await.expect("running");
p.record_vm_event(req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Destroyed).await.expect("destroyed");
use sea_orm::QuerySelect;
let rows = vms::Entity::find()
.filter(vms::Column::RequestId.eq(req))
.filter(vms::Column::DomainName.eq(domain.clone()))
.all(&db)
.await
.expect("query vms");
assert_eq!(rows.len(), 1, "should have exactly one vm row");
let row = &rows[0];
assert_eq!(row.state, "destroyed");
assert_eq!(row.backend, "noop");
assert_eq!(row.overlay_path.as_deref(), Some("/tmp/ovl.qcow2"));
assert_eq!(row.seed_path.as_deref(), Some("/tmp/seed.iso"));
}
}
#[cfg(test)]
mod noop_tests {
use super::*;
#[tokio::test]
async fn record_methods_noop_when_disabled() {
let p = Persist::new(None).await.expect("init");
assert!(!p.is_enabled());
// Calls should succeed without DB
let req = Uuid::new_v4();
p.record_job_state(req, "https://x", "sha", Some("illumos"), JobState::Queued).await.expect("job noop ok");
p.record_vm_event(req, "job-x", None, None, Some("noop"), VmPersistState::Prepared).await.expect("vm noop ok");
}
}

View file

@ -3,12 +3,10 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use dashmap::DashMap;
use miette::Result;
use tokio::sync::{mpsc, Semaphore};
use tokio::task::JoinSet;
use tracing::{error, info};
use tracing::{error, info, warn};
use crate::hypervisor::{Hypervisor, VmSpec, JobContext};
use crate::persist;
use sea_orm::DatabaseConnection;
use crate::hypervisor::{Hypervisor, VmSpec, JobContext, BackendTag};
use crate::persist::{Persist, VmPersistState, JobState};
pub struct Scheduler<H: Hypervisor + 'static> {
hv: Arc<H>,
@ -16,6 +14,7 @@ pub struct Scheduler<H: Hypervisor + 'static> {
rx: mpsc::Receiver<SchedItem>,
global_sem: Arc<Semaphore>,
label_sems: Arc<DashmapType>,
persist: Arc<Persist>,
}
type DashmapType = DashMap<String, Arc<Semaphore>>;
@ -26,7 +25,7 @@ pub struct SchedItem {
}
impl<H: Hypervisor + 'static> Scheduler<H> {
pub fn new(hv: H, max_concurrency: usize, capacity_map: &HashMap<String, usize>) -> Self {
pub fn new(hv: H, max_concurrency: usize, capacity_map: &HashMap<String, usize>, persist: Arc<Persist>) -> Self {
let (tx, rx) = mpsc::channel::<SchedItem>(max_concurrency * 4);
let label_sems = DashMap::new();
for (label, cap) in capacity_map.iter() {
@ -38,18 +37,20 @@ impl<H: Hypervisor + 'static> Scheduler<H> {
rx,
global_sem: Arc::new(Semaphore::new(max_concurrency)),
label_sems: Arc::new(label_sems),
persist,
}
}
pub fn sender(&self) -> mpsc::Sender<SchedItem> { self.tx.clone() }
pub async fn run(self) -> Result<()> {
let Scheduler { hv, mut rx, global_sem, label_sems, .. } = self;
let Scheduler { hv, mut rx, global_sem, label_sems, persist, .. } = self;
let mut handles = Vec::new();
while let Some(item) = rx.recv().await {
let hv = hv.clone();
let global = global_sem.clone();
let label_sems = label_sems.clone();
let persist = persist.clone();
let handle = tokio::spawn(async move {
// Acquire global and label permits (owned permits so they live inside the task)
let _g = match global.acquire_owned().await {
@ -71,24 +72,38 @@ impl<H: Hypervisor + 'static> Scheduler<H> {
// Provision and run
match hv.prepare(&item.spec, &item.ctx).await {
Ok(handle) => {
if let Err(e) = hv.start(&handle).await {
Ok(h) => {
// persist prepared VM state
let overlay = h.overlay_path.as_ref().and_then(|p| p.to_str());
let seed = h.seed_iso_path.as_ref().and_then(|p| p.to_str());
let backend = match h.backend { BackendTag::Noop => Some("noop"), #[cfg(all(target_os = "linux", feature = "libvirt"))] BackendTag::Libvirt => Some("libvirt"), #[cfg(target_os = "illumos")] BackendTag::Zones => Some("zones") };
if let Err(e) = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Prepared).await {
warn!(error = %e, request_id = %item.ctx.request_id, domain = %h.id, "persist prepare failed");
}
if let Err(e) = hv.start(&h).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to start VM");
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Failed).await;
return;
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Running).await;
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Running).await;
info!(request_id = %item.ctx.request_id, label = %label_key, "vm started (workload execution placeholder)");
// Placeholder job runtime
tokio::time::sleep(Duration::from_secs(1)).await;
// Stop and destroy
if let Err(e) = hv.stop(&handle, Duration::from_secs(10)).await {
if let Err(e) = hv.stop(&h, Duration::from_secs(10)).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to stop VM");
}
if let Err(e) = hv.destroy(handle).await {
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Stopped).await;
if let Err(e) = hv.destroy(h.clone()).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to destroy VM");
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Destroyed).await;
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Succeeded).await;
}
Err(e) => {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to prepare VM");
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Failed).await;
return;
}
}
@ -100,3 +115,141 @@ impl<H: Hypervisor + 'static> Scheduler<H> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use dashmap::DashMap;
use crate::hypervisor::{VmHandle, VmState};
#[derive(Clone)]
struct MockHypervisor {
active_all: Arc<AtomicUsize>,
peak_all: Arc<AtomicUsize>,
// per-label current and peak
per_curr: Arc<DashMap<String, Arc<AtomicUsize>>>,
per_peak: Arc<DashMap<String, Arc<AtomicUsize>>>,
}
impl MockHypervisor {
fn new(active_all: Arc<AtomicUsize>, peak_all: Arc<AtomicUsize>) -> Self {
Self { active_all, peak_all, per_curr: Arc::new(DashMap::new()), per_peak: Arc::new(DashMap::new()) }
}
fn update_peak(peak: &AtomicUsize, current: usize) {
let mut prev = peak.load(Ordering::Relaxed);
while current > prev {
match peak.compare_exchange(prev, current, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => break,
Err(p) => prev = p,
}
}
}
}
#[async_trait]
impl Hypervisor for MockHypervisor {
async fn prepare(&self, spec: &VmSpec, ctx: &JobContext) -> miette::Result<VmHandle> {
let now = self.active_all.fetch_add(1, Ordering::SeqCst) + 1;
Self::update_peak(&self.peak_all, now);
// per-label
let entry = self.per_curr.entry(spec.label.clone()).or_insert_with(|| Arc::new(AtomicUsize::new(0)));
let curr = entry.fetch_add(1, Ordering::SeqCst) + 1;
let peak_entry = self.per_peak.entry(spec.label.clone()).or_insert_with(|| Arc::new(AtomicUsize::new(0))).clone();
Self::update_peak(&peak_entry, curr);
Ok(VmHandle {
id: format!("job-{}", ctx.request_id),
backend: BackendTag::Noop,
work_dir: PathBuf::from("/tmp"),
overlay_path: None,
seed_iso_path: None,
})
}
async fn start(&self, _vm: &VmHandle) -> miette::Result<()> { Ok(()) }
async fn stop(&self, _vm: &VmHandle, _t: Duration) -> miette::Result<()> { Ok(()) }
async fn destroy(&self, _vm: VmHandle) -> miette::Result<()> {
// decrement overall current
self.active_all.fetch_sub(1, Ordering::SeqCst);
Ok(())
}
async fn state(&self, _vm: &VmHandle) -> miette::Result<VmState> { Ok(VmState::Prepared) }
}
fn make_spec(label: &str) -> VmSpec {
VmSpec {
label: label.to_string(),
image_path: PathBuf::from("/tmp/image"),
cpu: 1,
ram_mb: 512,
disk_gb: 1,
network: None,
nocloud: true,
user_data: None,
}
}
fn make_ctx() -> JobContext {
JobContext { request_id: uuid::Uuid::new_v4(), repo_url: "https://example.com/r.git".into(), commit_sha: "deadbeef".into(), workflow_job_id: None }
}
#[tokio::test(flavor = "multi_thread")]
async fn scheduler_respects_global_concurrency() {
let active_all = Arc::new(AtomicUsize::new(0));
let peak_all = Arc::new(AtomicUsize::new(0));
let hv = MockHypervisor::new(active_all.clone(), peak_all.clone());
let hv_probe = hv.clone();
let persist = Arc::new(Persist::new(None).await.unwrap());
let mut caps = HashMap::new();
caps.insert("x".to_string(), 10);
let sched = Scheduler::new(hv, 2, &caps, persist);
let tx = sched.sender();
let run = tokio::spawn(async move { let _ = sched.run().await; });
for _ in 0..5 {
let _ = tx.send(SchedItem { spec: make_spec("x"), ctx: make_ctx() }).await;
}
drop(tx);
// Allow time for tasks to execute under concurrency limits
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(hv_probe.peak_all.load(Ordering::SeqCst) <= 2, "peak should not exceed global limit");
// Stop the scheduler task
run.abort();
}
#[tokio::test(flavor = "multi_thread")]
async fn scheduler_respects_per_label_capacity() {
let active_all = Arc::new(AtomicUsize::new(0));
let peak_all = Arc::new(AtomicUsize::new(0));
let hv = MockHypervisor::new(active_all.clone(), peak_all.clone());
let hv_probe = hv.clone();
let persist = Arc::new(Persist::new(None).await.unwrap());
let mut caps = HashMap::new();
caps.insert("a".to_string(), 1);
caps.insert("b".to_string(), 2);
let sched = Scheduler::new(hv, 4, &caps, persist);
let tx = sched.sender();
let run = tokio::spawn(async move { let _ = sched.run().await; });
for _ in 0..3 { let _ = tx.send(SchedItem { spec: make_spec("a"), ctx: make_ctx() }).await; }
for _ in 0..3 { let _ = tx.send(SchedItem { spec: make_spec("b"), ctx: make_ctx() }).await; }
drop(tx);
tokio::time::sleep(Duration::from_millis(800)).await;
// read per-label peaks
let a_peak = hv_probe.per_peak.get("a").map(|p| p.load(Ordering::SeqCst)).unwrap_or(0);
let b_peak = hv_probe.per_peak.get("b").map(|p| p.load(Ordering::SeqCst)).unwrap_or(0);
assert!(a_peak <= 1, "label a peak should be <= 1, got {}", a_peak);
assert!(b_peak <= 2, "label b peak should be <= 2, got {}", b_peak);
assert!(hv_probe.peak_all.load(Ordering::SeqCst) <= 4, "global peak should respect global limit");
run.abort();
}
}

View file

@ -0,0 +1,32 @@
### Orchestrator Persistence Module (MVP)
This document summarizes the initial implementation of a persistence layer for the Orchestrator.
Scope (MVP):
- Introduces a new `persist` module under `crates/orchestrator/src/persist.rs`.
- Provides optional, best-effort database initialization using SeaORM for Postgres.
- Applies workspace migrations from `crates/migration` when a database is available.
- Exposes a small API surface intended for Orchestrator use:
- `Persist::new(database_url: Option<String>) -> Result<Persist>` — connect and run migrations; falls back to no-op mode if unavailable.
- `Persist::is_enabled() -> bool` — indicates whether DB is active.
- `Persist::record_job_state(...)` — placeholder for recording job state transitions.
- `Persist::record_vm_event(...)` — placeholder for VM lifecycle events.
Design Notes:
- The API currently logs no-op actions if `DATABASE_URL` is unset or connection/migration fails, allowing the Orchestrator to run without a DB (useful for local development and CI).
- Actual inserts/updates are intentionally left as stubs to keep this MVP minimal and non-invasive. Follow-up work will add SeaORM Entities (Jobs/VMs) mirroring the `crates/migration` schema and implement these calls.
- Migrations are sourced from the new `migration` crate already present in the workspace. The orchestrator now depends on `sea-orm-migration` and `migration` to run these on startup when enabled.
Schema Reference (from `crates/migration`):
- `jobs`: `request_id (uuid pk)`, `repo_url (text)`, `commit_sha (text)`, `runs_on (text null)`, `state (text)`, `created_at (timestamptz)`, `updated_at (timestamptz)`
- `vms`: `request_id (uuid)`, `domain_name (text)`, `overlay_path (text null)`, `seed_path (text null)`, `backend (text)`, `state (text)`, `created_at (timestamptz)`, `updated_at (timestamptz)`
Configuration:
- `DATABASE_URL=postgres://user:pass@host:5432/solstice`
- If set, the orchestrator can initialize persistence via `Persist::new(Some(DATABASE_URL))`.
- If not set or connection fails, the module operates in no-op mode.
Next Steps:
- Implement SeaORM Entities for `jobs` and `vms` and wire `record_job_state` / `record_vm_event` to real upserts.
- Call persistence hooks from the scheduler around VM prepare/start/stop/destroy and from the consumer when accepting jobs.
- Add unit/integration tests with a test Postgres (or use SeaORM SQLite feature for fast tests, if acceptable).