use chrono::Utc; use miette::{IntoDiagnostic as _, Result}; use sea_orm::sea_query::{Expr, OnConflict}; use sea_orm::{ ActiveModelTrait, ColumnTrait, Database, DatabaseConnection, QueryFilter, Set, entity::prelude::*, }; use sea_orm_migration::MigratorTrait; use tracing::{debug, info, warn}; use uuid::Uuid; /// Minimal persistence module for the Orchestrator with real upserts for jobs & vms. #[derive(Clone)] pub struct Persist { db: Option, } #[derive(Debug, Clone, Copy)] pub enum JobState { Queued, Running, Succeeded, Failed, } impl JobState { fn as_str(self) -> &'static str { match self { JobState::Queued => "queued", JobState::Running => "running", JobState::Succeeded => "succeeded", JobState::Failed => "failed", } } } #[derive(Debug, Clone, Copy)] pub enum VmPersistState { Prepared, Running, Stopped, Destroyed, } impl VmPersistState { fn as_str(self) -> &'static str { match self { VmPersistState::Prepared => "prepared", VmPersistState::Running => "running", VmPersistState::Stopped => "stopped", VmPersistState::Destroyed => "destroyed", } } } // ---------------------------- // SeaORM Entities (inline) // ---------------------------- mod jobs { use super::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "jobs")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub request_id: Uuid, pub repo_url: String, pub commit_sha: String, pub runs_on: Option, pub state: String, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} } mod vms { use super::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "vms")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub request_id: Uuid, #[sea_orm(primary_key, auto_increment = false)] pub domain_name: String, pub overlay_path: Option, pub seed_path: Option, pub backend: String, pub state: String, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} } impl Persist { /// Initialize persistence. If `database_url` is provided and the connection /// succeeds, migrations are applied and the connection is retained. /// If not provided or connection fails, persistence is disabled (no-op). pub async fn new(database_url: Option) -> Result { if let Some(url) = database_url.filter(|s| !s.trim().is_empty()) { // Use a single connection for SQLite in-memory to avoid separate empty DBs per checkout let mut opts = sea_orm::ConnectOptions::new(url.clone()); // One connection is enough for tests and avoids in-memory SQLite pitfalls opts.max_connections(1) .min_connections(1) .sqlx_logging(false); match Database::connect(opts).await.into_diagnostic() { Ok(db) => { // Apply migrations idempotently if let Err(e) = migration::Migrator::up(&db, None).await.into_diagnostic() { #[cfg(test)] { // In tests, surface the migration failure to help debugging return Err(miette::miette!("migration failed: {e}")); } #[cfg(not(test))] { warn!(error = %e, "failed to apply migrations; proceeding without persistence"); return Ok(Self { db: None }); } } info!("persistence initialized (connected and migrated)"); return Ok(Self { db: Some(db) }); } Err(e) => warn!(error = %e, "failed to connect to database; persistence disabled"), } } else { warn!("DATABASE_URL not set; persistence disabled"); } Ok(Self { db: None }) } pub fn is_enabled(&self) -> bool { self.db.is_some() } /// Upsert a job row by request_id. pub async fn record_job_state( &self, request_id: Uuid, repo_url: &str, commit_sha: &str, runs_on: Option<&str>, state: JobState, ) -> Result<()> { let Some(db) = self.db.as_ref() else { debug!(%request_id, state = state.as_str(), "record_job_state (noop)"); return Ok(()); }; let now = Utc::now(); let am = jobs::ActiveModel { request_id: Set(request_id), repo_url: Set(repo_url.to_string()), commit_sha: Set(commit_sha.to_string()), runs_on: Set(runs_on.map(|s| s.to_string())), state: Set(state.as_str().to_string()), created_at: Set(now), updated_at: Set(now), }; jobs::Entity::insert(am) .on_conflict( OnConflict::column(jobs::Column::RequestId) .update_columns([ jobs::Column::RepoUrl, jobs::Column::CommitSha, jobs::Column::RunsOn, jobs::Column::State, jobs::Column::UpdatedAt, ]) .to_owned(), ) .exec(db) .await .into_diagnostic()?; Ok(()) } /// Update a VM row by (request_id, domain_name) or insert when missing. pub async fn record_vm_event( &self, request_id: Uuid, domain_name: &str, overlay_path: Option<&str>, seed_path: Option<&str>, backend: Option<&str>, state: VmPersistState, ) -> Result<()> { let Some(db) = self.db.as_ref() else { debug!(%request_id, domain = %domain_name, state = state.as_str(), "record_vm_event (noop)"); return Ok(()); }; let now = Utc::now(); let backend_val = backend.unwrap_or("unknown"); let state_val = state.as_str(); let res = vms::Entity::update_many() .col_expr( vms::Column::OverlayPath, Expr::value(overlay_path.map(|s| s.to_string())), ) .col_expr( vms::Column::SeedPath, Expr::value(seed_path.map(|s| s.to_string())), ) .col_expr(vms::Column::Backend, Expr::value(backend_val)) .col_expr(vms::Column::State, Expr::value(state_val)) .col_expr(vms::Column::UpdatedAt, Expr::value(now)) .filter(vms::Column::RequestId.eq(request_id)) .filter(vms::Column::DomainName.eq(domain_name)) .exec(db) .await .into_diagnostic()?; if res.rows_affected == 0 { let am = vms::ActiveModel { request_id: Set(request_id), domain_name: Set(domain_name.to_string()), overlay_path: Set(overlay_path.map(|s| s.to_string())), seed_path: Set(seed_path.map(|s| s.to_string())), backend: Set(backend_val.to_string()), state: Set(state_val.to_string()), created_at: Set(now), updated_at: Set(now), }; vms::Entity::insert(am).exec(db).await.into_diagnostic()?; } Ok(()) } } #[cfg(test)] mod tests { use super::*; async fn sqlite_memory_db() -> DatabaseConnection { let mut opts = sea_orm::ConnectOptions::new("sqlite::memory:".to_string()); opts.max_connections(1) .min_connections(1) .sqlx_logging(false); let db = Database::connect(opts) .await .expect("sqlite memory connect"); // Create tables from entities to avoid using migrator (faster and avoids migration bookkeeping table) let backend = db.get_database_backend(); let schema = sea_orm::Schema::new(backend); let stmt_jobs = schema.create_table_from_entity(jobs::Entity); let stmt_vms = schema.create_table_from_entity(vms::Entity); use sea_orm_migration::prelude::SchemaManager; let manager = SchemaManager::new(&db); manager.create_table(stmt_jobs).await.expect("create jobs"); manager.create_table(stmt_vms).await.expect("create vms"); db } #[tokio::test] async fn test_job_upsert_sqlite() { let db = sqlite_memory_db().await; let p = Persist { db: Some(db.clone()), }; let req = Uuid::new_v4(); let repo = "https://example.com/repo.git"; let sha = "deadbeef"; // Insert queued p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Queued) .await .expect("insert queued"); // Fetch let row = jobs::Entity::find_by_id(req) .one(&db) .await .expect("query") .expect("row exists"); assert_eq!(row.state, "queued"); // Update to running p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Running) .await .expect("update running"); let row2 = jobs::Entity::find_by_id(req) .one(&db) .await .expect("query") .expect("row exists"); assert_eq!(row2.state, "running"); assert!(row2.updated_at >= row.created_at); } #[tokio::test] async fn test_vm_event_upsert_sqlite() { let db = sqlite_memory_db().await; let p = Persist { db: Some(db.clone()), }; let req = Uuid::new_v4(); let domain = format!("job-{}", req); // prepared -> running -> destroyed p.record_vm_event( req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Prepared, ) .await .expect("prepared"); p.record_vm_event( req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Running, ) .await .expect("running"); p.record_vm_event( req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Destroyed, ) .await .expect("destroyed"); use sea_orm::QuerySelect; let rows = vms::Entity::find() .filter(vms::Column::RequestId.eq(req)) .filter(vms::Column::DomainName.eq(domain.clone())) .all(&db) .await .expect("query vms"); assert_eq!(rows.len(), 1, "should have exactly one vm row"); let row = &rows[0]; assert_eq!(row.state, "destroyed"); assert_eq!(row.backend, "noop"); assert_eq!(row.overlay_path.as_deref(), Some("/tmp/ovl.qcow2")); assert_eq!(row.seed_path.as_deref(), Some("/tmp/seed.iso")); } } #[cfg(test)] mod noop_tests { use super::*; #[tokio::test] async fn record_methods_noop_when_disabled() { let p = Persist::new(None).await.expect("init"); assert!(!p.is_enabled()); // Calls should succeed without DB let req = Uuid::new_v4(); p.record_job_state(req, "https://x", "sha", Some("illumos"), JobState::Queued) .await .expect("job noop ok"); p.record_vm_event( req, "job-x", None, None, Some("noop"), VmPersistState::Prepared, ) .await .expect("vm noop ok"); } }