solstice-ci/crates/orchestrator/src/persist.rs

526 lines
17 KiB
Rust
Raw Normal View History

2025-11-01 14:56:46 +01:00
use chrono::Utc;
use miette::{IntoDiagnostic as _, Result};
use crate::error::OrchestratorError;
2025-11-01 14:56:46 +01:00
use sea_orm::sea_query::{Expr, OnConflict};
use sea_orm::{
entity::prelude::*, ColumnTrait, Database, DatabaseConnection, QueryFilter,
Set,
2025-11-01 14:56:46 +01:00
};
use sea_orm::QueryOrder;
use sea_orm_migration::MigratorTrait;
use tracing::{debug, info, warn};
use uuid::Uuid;
/// Minimal persistence module for the Orchestrator with real upserts for jobs & vms.
#[derive(Clone)]
pub struct Persist {
db: Option<DatabaseConnection>,
}
#[derive(Debug, Clone, Copy)]
2025-11-01 14:56:46 +01:00
pub enum JobState {
Queued,
Running,
Succeeded,
Failed,
}
impl JobState {
fn as_str(self) -> &'static str {
2025-11-01 14:56:46 +01:00
match self {
JobState::Queued => "queued",
JobState::Running => "running",
JobState::Succeeded => "succeeded",
JobState::Failed => "failed",
}
}
}
#[derive(Debug, Clone, Copy)]
2025-11-01 14:56:46 +01:00
pub enum VmPersistState {
Prepared,
Running,
Stopped,
Destroyed,
}
impl VmPersistState {
fn as_str(self) -> &'static str {
2025-11-01 14:56:46 +01:00
match self {
VmPersistState::Prepared => "prepared",
VmPersistState::Running => "running",
VmPersistState::Stopped => "stopped",
VmPersistState::Destroyed => "destroyed",
}
}
}
// ----------------------------
// SeaORM Entities (inline)
// ----------------------------
mod jobs {
use super::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "jobs")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub request_id: Uuid,
pub repo_url: String,
pub commit_sha: String,
pub runs_on: Option<String>,
pub state: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
}
mod vms {
use super::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "vms")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub request_id: Uuid,
#[sea_orm(primary_key, auto_increment = false)]
pub domain_name: String,
pub overlay_path: Option<String>,
pub seed_path: Option<String>,
pub backend: String,
pub state: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
}
mod job_logs {
use super::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "job_logs")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub request_id: Uuid,
#[sea_orm(primary_key, auto_increment = false)]
pub seq: i64,
pub ts: chrono::DateTime<chrono::Utc>,
pub stderr: bool,
pub line: String,
/// Logical category to group logs (e.g., "setup", "build", "test")
pub category: String,
/// Log level (e.g., "info", "warn", "error")
pub level: Option<String>,
/// Arbitrary structured fields from NDJSON, stored as raw JSON string
pub fields: Option<String>,
/// Convenience flag for quick error presence checks
pub has_error: bool,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
}
mod job_ssh_keys {
use super::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "job_ssh_keys")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub request_id: Uuid,
pub public_key: String,
pub private_key: String,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
}
impl Persist {
/// Save per-job SSH keys (public + private OpenSSH format). No-op if persistence disabled.
pub async fn save_job_ssh_keys(&self, request_id: Uuid, public_key: &str, private_key: &str) -> Result<()> {
let Some(db) = self.db.as_ref() else { return Ok(()); };
let now = Utc::now();
let am = job_ssh_keys::ActiveModel {
request_id: Set(request_id),
public_key: Set(public_key.to_string()),
private_key: Set(private_key.to_string()),
created_at: Set(now),
};
job_ssh_keys::Entity::insert(am)
.on_conflict(
OnConflict::column(job_ssh_keys::Column::RequestId)
.update_columns([
job_ssh_keys::Column::PublicKey,
job_ssh_keys::Column::PrivateKey,
job_ssh_keys::Column::CreatedAt,
])
.to_owned(),
)
.exec(db)
.await
.into_diagnostic()?;
Ok(())
}
/// Load per-job SSH keys; returns None if absent or persistence disabled.
pub async fn get_job_ssh_keys(&self, request_id: Uuid) -> Result<Option<(String, String)>> {
let Some(db) = self.db.as_ref() else { return Ok(None); };
let row = job_ssh_keys::Entity::find_by_id(request_id).one(db).await.into_diagnostic()?;
Ok(row.map(|r| (r.public_key, r.private_key)))
}
/// Initialize persistence.
/// - If `database_url` is Some(non-empty), attempt to connect and run migrations.
/// On any failure, return an error to fail-fast so operators notice misconfiguration.
/// - If `database_url` is None or empty, persistence is disabled (no-op).
pub async fn new(database_url: Option<String>) -> Result<Self> {
if let Some(url) = database_url.filter(|s| !s.trim().is_empty()) {
// Use a single connection for SQLite in-memory to avoid separate empty DBs per checkout
let mut opts = sea_orm::ConnectOptions::new(url.clone());
// One connection is enough for tests and avoids in-memory SQLite pitfalls
opts.max_connections(1)
.min_connections(1)
.sqlx_logging(false);
let db = Database::connect(opts).await
.map_err(|e| OrchestratorError::DbConnect(e.into()))
.into_diagnostic()?;
migration::Migrator::up(&db, None)
.await
.map_err(|e| OrchestratorError::DbMigrate(e.into()))
.into_diagnostic()?;
info!("persistence initialized (connected and migrated)");
Ok(Self { db: Some(db) })
} else {
warn!("persistence disabled (no DATABASE_URL or skipped)");
Ok(Self { db: None })
}
}
2025-11-01 14:56:46 +01:00
pub fn is_enabled(&self) -> bool {
self.db.is_some()
}
/// Return aggregated logs for a job as a single text blob. Newline-terminated.
/// When persistence is disabled or no logs are found, returns Ok(None).
pub async fn get_logs_text(&self, request_id: Uuid) -> Result<Option<String>> {
let Some(db) = self.db.as_ref() else {
return Ok(None);
};
let mut out = String::new();
let rows = job_logs::Entity::find()
.filter(job_logs::Column::RequestId.eq(request_id))
.order_by_asc(job_logs::Column::Seq)
.all(db)
.await
.into_diagnostic()?;
if rows.is_empty() {
return Ok(None);
}
for r in rows {
if r.stderr {
out.push_str("[stderr] ");
}
out.push_str(&r.line);
if !out.ends_with('\n') {
out.push('\n');
}
}
Ok(Some(out))
}
/// Record a single log line for a job. No-op when persistence is disabled.
pub async fn record_log_line(
&self,
request_id: Uuid,
seq: i64,
stderr: bool,
line: &str,
) -> Result<()> {
let Some(db) = self.db.as_ref() else {
debug!(%request_id, %seq, stderr, "record_log_line (noop)");
return Ok(());
};
// Try to parse structured JSON lines to extract common fields
let mut category = "default".to_string();
let mut level_str: Option<String> = None;
let mut msg_line = line.to_string();
let mut fields_json: Option<String> = None;
if let Ok(val) = serde_json::from_str::<serde_json::Value>(line) {
// Keep original JSON for reference
fields_json = Some(line.to_string());
if let Some(c) = val.get("category").and_then(|v| v.as_str()) {
if !c.is_empty() { category = c.to_string(); }
}
if let Some(l) = val.get("level").and_then(|v| v.as_str()) {
level_str = Some(l.to_string());
}
// Prefer common keys for message
if let Some(m) = val.get("msg").or_else(|| val.get("message")).and_then(|v| v.as_str()) {
msg_line = m.to_string();
}
}
let level = level_str.or_else(|| if stderr { Some("error".to_string()) } else { Some("info".to_string()) });
let has_error = stderr || level.as_deref() == Some("error");
let am = job_logs::ActiveModel {
request_id: Set(request_id),
seq: Set(seq),
ts: Set(Utc::now()),
stderr: Set(stderr),
line: Set(msg_line),
category: Set(category),
level: Set(level),
fields: Set(fields_json),
has_error: Set(has_error),
};
job_logs::Entity::insert(am).exec(db).await.into_diagnostic()?;
Ok(())
}
/// Upsert a job row by request_id.
pub async fn record_job_state(
&self,
request_id: Uuid,
repo_url: &str,
commit_sha: &str,
runs_on: Option<&str>,
state: JobState,
) -> Result<()> {
let Some(db) = self.db.as_ref() else {
debug!(%request_id, state = state.as_str(), "record_job_state (noop)");
return Ok(());
};
let now = Utc::now();
let am = jobs::ActiveModel {
request_id: Set(request_id),
repo_url: Set(repo_url.to_string()),
commit_sha: Set(commit_sha.to_string()),
runs_on: Set(runs_on.map(|s| s.to_string())),
state: Set(state.as_str().to_string()),
created_at: Set(now),
updated_at: Set(now),
};
jobs::Entity::insert(am)
.on_conflict(
OnConflict::column(jobs::Column::RequestId)
.update_columns([
jobs::Column::RepoUrl,
jobs::Column::CommitSha,
jobs::Column::RunsOn,
jobs::Column::State,
jobs::Column::UpdatedAt,
])
.to_owned(),
)
.exec(db)
.await
.into_diagnostic()?;
Ok(())
}
/// Update a VM row by (request_id, domain_name) or insert when missing.
pub async fn record_vm_event(
&self,
request_id: Uuid,
domain_name: &str,
overlay_path: Option<&str>,
seed_path: Option<&str>,
backend: Option<&str>,
state: VmPersistState,
) -> Result<()> {
let Some(db) = self.db.as_ref() else {
debug!(%request_id, domain = %domain_name, state = state.as_str(), "record_vm_event (noop)");
return Ok(());
};
let now = Utc::now();
let backend_val = backend.unwrap_or("unknown");
let state_val = state.as_str();
let res = vms::Entity::update_many()
2025-11-01 14:56:46 +01:00
.col_expr(
vms::Column::OverlayPath,
Expr::value(overlay_path.map(|s| s.to_string())),
)
.col_expr(
vms::Column::SeedPath,
Expr::value(seed_path.map(|s| s.to_string())),
)
.col_expr(vms::Column::Backend, Expr::value(backend_val))
.col_expr(vms::Column::State, Expr::value(state_val))
.col_expr(vms::Column::UpdatedAt, Expr::value(now))
.filter(vms::Column::RequestId.eq(request_id))
.filter(vms::Column::DomainName.eq(domain_name))
.exec(db)
.await
.into_diagnostic()?;
if res.rows_affected == 0 {
let am = vms::ActiveModel {
request_id: Set(request_id),
domain_name: Set(domain_name.to_string()),
overlay_path: Set(overlay_path.map(|s| s.to_string())),
seed_path: Set(seed_path.map(|s| s.to_string())),
backend: Set(backend_val.to_string()),
state: Set(state_val.to_string()),
created_at: Set(now),
updated_at: Set(now),
};
vms::Entity::insert(am).exec(db).await.into_diagnostic()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
async fn sqlite_memory_db() -> DatabaseConnection {
let mut opts = sea_orm::ConnectOptions::new("sqlite::memory:".to_string());
2025-11-01 14:56:46 +01:00
opts.max_connections(1)
.min_connections(1)
.sqlx_logging(false);
let db = Database::connect(opts)
.await
.expect("sqlite memory connect");
// Create tables from entities to avoid using migrator (faster and avoids migration bookkeeping table)
let backend = db.get_database_backend();
let schema = sea_orm::Schema::new(backend);
let stmt_jobs = schema.create_table_from_entity(jobs::Entity);
let stmt_vms = schema.create_table_from_entity(vms::Entity);
use sea_orm_migration::prelude::SchemaManager;
let manager = SchemaManager::new(&db);
manager.create_table(stmt_jobs).await.expect("create jobs");
manager.create_table(stmt_vms).await.expect("create vms");
db
}
#[tokio::test]
async fn test_job_upsert_sqlite() {
let db = sqlite_memory_db().await;
2025-11-01 14:56:46 +01:00
let p = Persist {
db: Some(db.clone()),
};
let req = Uuid::new_v4();
let repo = "https://example.com/repo.git";
let sha = "deadbeef";
// Insert queued
2025-11-01 14:56:46 +01:00
p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Queued)
.await
.expect("insert queued");
// Fetch
2025-11-01 14:56:46 +01:00
let row = jobs::Entity::find_by_id(req)
.one(&db)
.await
.expect("query")
.expect("row exists");
assert_eq!(row.state, "queued");
// Update to running
2025-11-01 14:56:46 +01:00
p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Running)
.await
.expect("update running");
let row2 = jobs::Entity::find_by_id(req)
.one(&db)
.await
.expect("query")
.expect("row exists");
assert_eq!(row2.state, "running");
assert!(row2.updated_at >= row.created_at);
}
#[tokio::test]
async fn test_vm_event_upsert_sqlite() {
let db = sqlite_memory_db().await;
2025-11-01 14:56:46 +01:00
let p = Persist {
db: Some(db.clone()),
};
let req = Uuid::new_v4();
let domain = format!("job-{}", req);
// prepared -> running -> destroyed
2025-11-01 14:56:46 +01:00
p.record_vm_event(
req,
&domain,
Some("/tmp/ovl.qcow2"),
Some("/tmp/seed.iso"),
Some("noop"),
VmPersistState::Prepared,
)
.await
.expect("prepared");
p.record_vm_event(
req,
&domain,
Some("/tmp/ovl.qcow2"),
Some("/tmp/seed.iso"),
Some("noop"),
VmPersistState::Running,
)
.await
.expect("running");
p.record_vm_event(
req,
&domain,
Some("/tmp/ovl.qcow2"),
Some("/tmp/seed.iso"),
Some("noop"),
VmPersistState::Destroyed,
)
.await
.expect("destroyed");
let rows = vms::Entity::find()
.filter(vms::Column::RequestId.eq(req))
.filter(vms::Column::DomainName.eq(domain.clone()))
.all(&db)
.await
.expect("query vms");
assert_eq!(rows.len(), 1, "should have exactly one vm row");
let row = &rows[0];
assert_eq!(row.state, "destroyed");
assert_eq!(row.backend, "noop");
assert_eq!(row.overlay_path.as_deref(), Some("/tmp/ovl.qcow2"));
assert_eq!(row.seed_path.as_deref(), Some("/tmp/seed.iso"));
}
}
#[cfg(test)]
mod noop_tests {
use super::*;
#[tokio::test]
async fn record_methods_noop_when_disabled() {
let p = Persist::new(None).await.expect("init");
assert!(!p.is_enabled());
// Calls should succeed without DB
let req = Uuid::new_v4();
2025-11-01 14:56:46 +01:00
p.record_job_state(req, "https://x", "sha", Some("illumos"), JobState::Queued)
.await
.expect("job noop ok");
p.record_vm_event(
req,
"job-x",
None,
None,
Some("noop"),
VmPersistState::Prepared,
)
.await
.expect("vm noop ok");
}
}