use crate::error::OrchestratorError; use chrono::Utc; use miette::{IntoDiagnostic as _, Result}; use sea_orm::QueryOrder; use sea_orm::sea_query::{Expr, OnConflict}; use sea_orm::{ColumnTrait, Database, DatabaseConnection, QueryFilter, Set, entity::prelude::*}; use sea_orm_migration::MigratorTrait; use tracing::{debug, info, warn}; use uuid::Uuid; /// Minimal persistence module for the Orchestrator with real upserts for jobs & vms. #[derive(Clone)] pub struct Persist { db: Option, } #[derive(Debug, Clone, Copy)] pub enum JobState { Queued, Running, Succeeded, Failed, } impl JobState { fn as_str(self) -> &'static str { match self { JobState::Queued => "queued", JobState::Running => "running", JobState::Succeeded => "succeeded", JobState::Failed => "failed", } } } #[derive(Debug, Clone, Copy)] pub enum VmPersistState { Prepared, Running, Stopped, Destroyed, } impl VmPersistState { fn as_str(self) -> &'static str { match self { VmPersistState::Prepared => "prepared", VmPersistState::Running => "running", VmPersistState::Stopped => "stopped", VmPersistState::Destroyed => "destroyed", } } } // ---------------------------- // SeaORM Entities (inline) // ---------------------------- mod jobs { use super::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "jobs")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub request_id: Uuid, pub repo_url: String, pub commit_sha: String, pub runs_on: Option, pub state: String, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} } mod vms { use super::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "vms")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub request_id: Uuid, #[sea_orm(primary_key, auto_increment = false)] pub domain_name: String, pub overlay_path: Option, pub seed_path: Option, pub backend: String, pub state: String, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} } mod job_logs { use super::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "job_logs")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub request_id: Uuid, #[sea_orm(primary_key, auto_increment = false)] pub seq: i64, pub ts: chrono::DateTime, pub stderr: bool, pub line: String, /// Logical category to group logs (e.g., "setup", "build", "test") pub category: String, /// Log level (e.g., "info", "warn", "error") pub level: Option, /// Arbitrary structured fields from NDJSON, stored as raw JSON string pub fields: Option, /// Convenience flag for quick error presence checks pub has_error: bool, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} } mod job_ssh_keys { use super::*; #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] #[sea_orm(table_name = "job_ssh_keys")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub request_id: Uuid, pub public_key: String, pub private_key: String, pub created_at: chrono::DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation {} impl ActiveModelBehavior for ActiveModel {} } impl Persist { /// Save per-job SSH keys (public + private OpenSSH format). No-op if persistence disabled. pub async fn save_job_ssh_keys( &self, request_id: Uuid, public_key: &str, private_key: &str, ) -> Result<()> { let Some(db) = self.db.as_ref() else { return Ok(()); }; let now = Utc::now(); let am = job_ssh_keys::ActiveModel { request_id: Set(request_id), public_key: Set(public_key.to_string()), private_key: Set(private_key.to_string()), created_at: Set(now), }; job_ssh_keys::Entity::insert(am) .on_conflict( OnConflict::column(job_ssh_keys::Column::RequestId) .update_columns([ job_ssh_keys::Column::PublicKey, job_ssh_keys::Column::PrivateKey, job_ssh_keys::Column::CreatedAt, ]) .to_owned(), ) .exec(db) .await .into_diagnostic()?; Ok(()) } /// Load per-job SSH keys; returns None if absent or persistence disabled. pub async fn get_job_ssh_keys(&self, request_id: Uuid) -> Result> { let Some(db) = self.db.as_ref() else { return Ok(None); }; let row = job_ssh_keys::Entity::find_by_id(request_id) .one(db) .await .into_diagnostic()?; Ok(row.map(|r| (r.public_key, r.private_key))) } /// Initialize persistence. /// - If `database_url` is Some(non-empty), attempt to connect and run migrations. /// On any failure, return an error to fail-fast so operators notice misconfiguration. /// - If `database_url` is None or empty, persistence is disabled (no-op). pub async fn new(database_url: Option) -> Result { if let Some(url) = database_url.filter(|s| !s.trim().is_empty()) { // Use a single connection for SQLite in-memory to avoid separate empty DBs per checkout let mut opts = sea_orm::ConnectOptions::new(url.clone()); // One connection is enough for tests and avoids in-memory SQLite pitfalls opts.max_connections(1) .min_connections(1) .sqlx_logging(false); let db = Database::connect(opts) .await .map_err(|e| OrchestratorError::DbConnect(e.into())) .into_diagnostic()?; migration::Migrator::up(&db, None) .await .map_err(|e| OrchestratorError::DbMigrate(e.into())) .into_diagnostic()?; info!("persistence initialized (connected and migrated)"); Ok(Self { db: Some(db) }) } else { warn!("persistence disabled (no DATABASE_URL or skipped)"); Ok(Self { db: None }) } } pub fn is_enabled(&self) -> bool { self.db.is_some() } /// Return aggregated logs for a job as a single text blob. Newline-terminated. /// When persistence is disabled or no logs are found, returns Ok(None). pub async fn get_logs_text(&self, request_id: Uuid) -> Result> { let Some(db) = self.db.as_ref() else { return Ok(None); }; let mut out = String::new(); let rows = job_logs::Entity::find() .filter(job_logs::Column::RequestId.eq(request_id)) .order_by_asc(job_logs::Column::Seq) .all(db) .await .into_diagnostic()?; if rows.is_empty() { return Ok(None); } for r in rows { if r.stderr { out.push_str("[stderr] "); } out.push_str(&r.line); if !out.ends_with('\n') { out.push('\n'); } } Ok(Some(out)) } /// Record a single log line for a job. No-op when persistence is disabled. pub async fn record_log_line( &self, request_id: Uuid, seq: i64, stderr: bool, line: &str, ) -> Result<()> { let Some(db) = self.db.as_ref() else { debug!(%request_id, %seq, stderr, "record_log_line (noop)"); return Ok(()); }; // Try to parse structured JSON lines to extract common fields let mut category = "default".to_string(); let mut level_str: Option = None; let mut msg_line = line.to_string(); let mut fields_json: Option = None; if let Ok(val) = serde_json::from_str::(line) { // Keep original JSON for reference fields_json = Some(line.to_string()); if let Some(c) = val.get("category").and_then(|v| v.as_str()) { if !c.is_empty() { category = c.to_string(); } } if let Some(l) = val.get("level").and_then(|v| v.as_str()) { level_str = Some(l.to_string()); } // Prefer common keys for message if let Some(m) = val .get("msg") .or_else(|| val.get("message")) .and_then(|v| v.as_str()) { msg_line = m.to_string(); } } let level = level_str.or_else(|| { if stderr { Some("error".to_string()) } else { Some("info".to_string()) } }); let has_error = stderr || level.as_deref() == Some("error"); let am = job_logs::ActiveModel { request_id: Set(request_id), seq: Set(seq), ts: Set(Utc::now()), stderr: Set(stderr), line: Set(msg_line), category: Set(category), level: Set(level), fields: Set(fields_json), has_error: Set(has_error), }; job_logs::Entity::insert(am) .exec(db) .await .into_diagnostic()?; Ok(()) } /// Upsert a job row by request_id. pub async fn record_job_state( &self, request_id: Uuid, repo_url: &str, commit_sha: &str, runs_on: Option<&str>, state: JobState, ) -> Result<()> { let Some(db) = self.db.as_ref() else { debug!(%request_id, state = state.as_str(), "record_job_state (noop)"); return Ok(()); }; let now = Utc::now(); let am = jobs::ActiveModel { request_id: Set(request_id), repo_url: Set(repo_url.to_string()), commit_sha: Set(commit_sha.to_string()), runs_on: Set(runs_on.map(|s| s.to_string())), state: Set(state.as_str().to_string()), created_at: Set(now), updated_at: Set(now), }; jobs::Entity::insert(am) .on_conflict( OnConflict::column(jobs::Column::RequestId) .update_columns([ jobs::Column::RepoUrl, jobs::Column::CommitSha, jobs::Column::RunsOn, jobs::Column::State, jobs::Column::UpdatedAt, ]) .to_owned(), ) .exec(db) .await .into_diagnostic()?; Ok(()) } /// Update a VM row by (request_id, domain_name) or insert when missing. pub async fn record_vm_event( &self, request_id: Uuid, domain_name: &str, overlay_path: Option<&str>, seed_path: Option<&str>, backend: Option<&str>, state: VmPersistState, ) -> Result<()> { let Some(db) = self.db.as_ref() else { debug!(%request_id, domain = %domain_name, state = state.as_str(), "record_vm_event (noop)"); return Ok(()); }; let now = Utc::now(); let backend_val = backend.unwrap_or("unknown"); let state_val = state.as_str(); let res = vms::Entity::update_many() .col_expr( vms::Column::OverlayPath, Expr::value(overlay_path.map(|s| s.to_string())), ) .col_expr( vms::Column::SeedPath, Expr::value(seed_path.map(|s| s.to_string())), ) .col_expr(vms::Column::Backend, Expr::value(backend_val)) .col_expr(vms::Column::State, Expr::value(state_val)) .col_expr(vms::Column::UpdatedAt, Expr::value(now)) .filter(vms::Column::RequestId.eq(request_id)) .filter(vms::Column::DomainName.eq(domain_name)) .exec(db) .await .into_diagnostic()?; if res.rows_affected == 0 { let am = vms::ActiveModel { request_id: Set(request_id), domain_name: Set(domain_name.to_string()), overlay_path: Set(overlay_path.map(|s| s.to_string())), seed_path: Set(seed_path.map(|s| s.to_string())), backend: Set(backend_val.to_string()), state: Set(state_val.to_string()), created_at: Set(now), updated_at: Set(now), }; vms::Entity::insert(am).exec(db).await.into_diagnostic()?; } Ok(()) } } #[cfg(test)] mod tests { use super::*; async fn sqlite_memory_db() -> DatabaseConnection { let mut opts = sea_orm::ConnectOptions::new("sqlite::memory:".to_string()); opts.max_connections(1) .min_connections(1) .sqlx_logging(false); let db = Database::connect(opts) .await .expect("sqlite memory connect"); // Create tables from entities to avoid using migrator (faster and avoids migration bookkeeping table) let backend = db.get_database_backend(); let schema = sea_orm::Schema::new(backend); let stmt_jobs = schema.create_table_from_entity(jobs::Entity); let stmt_vms = schema.create_table_from_entity(vms::Entity); use sea_orm_migration::prelude::SchemaManager; let manager = SchemaManager::new(&db); manager.create_table(stmt_jobs).await.expect("create jobs"); manager.create_table(stmt_vms).await.expect("create vms"); db } #[tokio::test] async fn test_job_upsert_sqlite() { let db = sqlite_memory_db().await; let p = Persist { db: Some(db.clone()), }; let req = Uuid::new_v4(); let repo = "https://example.com/repo.git"; let sha = "deadbeef"; // Insert queued p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Queued) .await .expect("insert queued"); // Fetch let row = jobs::Entity::find_by_id(req) .one(&db) .await .expect("query") .expect("row exists"); assert_eq!(row.state, "queued"); // Update to running p.record_job_state(req, repo, sha, Some("illumos-latest"), JobState::Running) .await .expect("update running"); let row2 = jobs::Entity::find_by_id(req) .one(&db) .await .expect("query") .expect("row exists"); assert_eq!(row2.state, "running"); assert!(row2.updated_at >= row.created_at); } #[tokio::test] async fn test_vm_event_upsert_sqlite() { let db = sqlite_memory_db().await; let p = Persist { db: Some(db.clone()), }; let req = Uuid::new_v4(); let domain = format!("job-{}", req); // prepared -> running -> destroyed p.record_vm_event( req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Prepared, ) .await .expect("prepared"); p.record_vm_event( req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Running, ) .await .expect("running"); p.record_vm_event( req, &domain, Some("/tmp/ovl.qcow2"), Some("/tmp/seed.iso"), Some("noop"), VmPersistState::Destroyed, ) .await .expect("destroyed"); let rows = vms::Entity::find() .filter(vms::Column::RequestId.eq(req)) .filter(vms::Column::DomainName.eq(domain.clone())) .all(&db) .await .expect("query vms"); assert_eq!(rows.len(), 1, "should have exactly one vm row"); let row = &rows[0]; assert_eq!(row.state, "destroyed"); assert_eq!(row.backend, "noop"); assert_eq!(row.overlay_path.as_deref(), Some("/tmp/ovl.qcow2")); assert_eq!(row.seed_path.as_deref(), Some("/tmp/seed.iso")); } } #[cfg(test)] mod noop_tests { use super::*; #[tokio::test] async fn record_methods_noop_when_disabled() { let p = Persist::new(None).await.expect("init"); assert!(!p.is_enabled()); // Calls should succeed without DB let req = Uuid::new_v4(); p.record_job_state(req, "https://x", "sha", Some("illumos"), JobState::Queued) .await .expect("job noop ok"); p.record_vm_event( req, "job-x", None, None, Some("noop"), VmPersistState::Prepared, ) .await .expect("vm noop ok"); } }