Add VM suspend handling, persistence updates, and orchestrator enhancements

This commit introduces:
- VM suspend support for timeout scenarios, allowing investigation of frozen states.
- Enhanced orchestrator persistence initialization with skip option for faster startup.
- Improvements to orchestrator logging, job state tracking, and VM runtime monitoring.
- Updates to CI tasks for capturing job request IDs and tracking completion statuses.
- Extended hypervisor capabilities, including libvirt console logging configuration.
This commit is contained in:
Till Wegmueller 2025-11-01 18:38:17 +01:00
parent f753265a79
commit 9597bbf64d
No known key found for this signature in database
7 changed files with 172 additions and 63 deletions

View file

@ -72,6 +72,8 @@ ILLUMOS_URL="http://$HOST_IP:$SOL_RUNNER_PORT_ILLUMOS/solstice-runner-illumos"
export SOLSTICE_RUNNER_URLS="$LINUX_URL $ILLUMOS_URL"
# Start orchestrator in background (inherits env including SOLSTICE_RUNNER_URLS/ORCH_CONTACT_ADDR)
# Speed up startup by skipping persistence unless explicitly disabled
export ORCH_SKIP_PERSIST=${ORCH_SKIP_PERSIST:-true}
LOGFILE=${SOL_ORCH_LOG:-"$ROOT_DIR/target/orchestrator.vm-build.log"}
echo "Starting orchestrator... (logs: $LOGFILE)" >&2
(
@ -84,10 +86,14 @@ echo " Linux: $LINUX_URL" >&2
echo " Illumos: $ILLUMOS_URL" >&2
echo "Orchestrator contact: $ORCH_CONTACT_ADDR" >&2
# Give it a moment to start
sleep 3
echo "Waiting for orchestrator to start..." >&2
# shellcheck disable=SC2034
for i in {1..20}; do
if grep -q "orchestrator starting" "$LOGFILE" 2>/dev/null; then break; fi
sleep 0.5
done
# Enqueue two jobs: one Linux, one Illumos
# Enqueue two jobs: one Linux, one Illumos and capture their request IDs
SOL_REPO_URL=${SOL_REPO_URL:-$(git -C "$ROOT_DIR" remote get-url origin 2>/dev/null || true)}
SOL_COMMIT_SHA=${SOL_COMMIT_SHA:-$(git -C "$ROOT_DIR" rev-parse HEAD 2>/dev/null || true)}
if [[ -z "${SOL_REPO_URL}" || -z "${SOL_COMMIT_SHA}" ]]; then
@ -95,22 +101,51 @@ if [[ -z "${SOL_REPO_URL}" || -z "${SOL_COMMIT_SHA}" ]]; then
fi
# Linux (Ubuntu image in example config)
SOL_RUNS_ON=ubuntu-22.04 "$ROOT_DIR/.mise/tasks/run/forge-enqueue"
REQ_LINUX=$(SOL_RUNS_ON=ubuntu-22.04 "$ROOT_DIR/.mise/tasks/run/forge-enqueue")
# Illumos (default label / alias)
SOL_RUNS_ON=illumos-latest "$ROOT_DIR/.mise/tasks/run/forge-enqueue"
REQ_ILLUMOS=$(SOL_RUNS_ON=illumos-latest "$ROOT_DIR/.mise/tasks/run/forge-enqueue")
# Tail orchestrator logs for a while
TAIL_SECS=${SOL_TAIL_SECS:-30}
echo "Tailing orchestrator logs for ${TAIL_SECS}s..." >&2
if command -v timeout >/dev/null 2>&1; then
(timeout ${TAIL_SECS}s tail -f "$LOGFILE" || true) 2>/dev/null
elif command -v gtimeout >/dev/null 2>&1; then
(gtimeout ${TAIL_SECS}s tail -f "$LOGFILE" || true) 2>/dev/null
else
tail -f "$LOGFILE" &
TAIL_PID=$!
sleep "$TAIL_SECS" || true
kill "$TAIL_PID" 2>/dev/null || true
echo "Enqueued request IDs:" >&2
echo " Linux: $REQ_LINUX" >&2
echo " Illumos: $REQ_ILLUMOS" >&2
# Wait for both jobs to finish by watching orchestrator logs
TIMEOUT_SECS=${SOL_WAIT_TIMEOUT_SECS:-3600}
DEADLINE=$(( $(date +%s) + TIMEOUT_SECS ))
DONE_LINUX=""
DONE_ILLUMOS=""
STATUS_LINUX=""
STATUS_ILLUMOS=""
echo "Waiting up to ${TIMEOUT_SECS}s for jobs to finish..." >&2
while :; do
now=$(date +%s)
if [[ $now -ge $DEADLINE ]]; then
echo "Timeout waiting for jobs to finish" >&2
exit 124
fi
if [[ -z "$DONE_LINUX" ]]; then
if grep -q "job finished: ${REQ_LINUX} succeeded" "$LOGFILE"; then DONE_LINUX=1; STATUS_LINUX=ok; fi
if grep -q "job finished: ${REQ_LINUX} failed" "$LOGFILE"; then DONE_LINUX=1; STATUS_LINUX=failed; fi
fi
if [[ -z "$DONE_ILLUMOS" ]]; then
if grep -q "job finished: ${REQ_ILLUMOS} succeeded" "$LOGFILE"; then DONE_ILLUMOS=1; STATUS_ILLUMOS=ok; fi
if grep -q "job finished: ${REQ_ILLUMOS} failed" "$LOGFILE"; then DONE_ILLUMOS=1; STATUS_ILLUMOS=failed; fi
fi
if [[ -n "$DONE_LINUX" && -n "$DONE_ILLUMOS" ]]; then
break
fi
sleep 2
done
echo "Job results:" >&2
echo " Linux: $STATUS_LINUX" >&2
echo " Illumos: $STATUS_ILLUMOS" >&2
# Exit non-zero if any failed
if [[ "$STATUS_LINUX" != "ok" || "$STATUS_ILLUMOS" != "ok" ]]; then
echo "One or more jobs failed" >&2
exit 1
fi
echo "Done. Logs at $LOGFILE" >&2

View file

@ -10,6 +10,6 @@ export AMQP_ROUTING_KEY=${AMQP_ROUTING_KEY:-jobrequest.v1}
export AMQP_PREFETCH=${AMQP_PREFETCH:-2}
export GRPC_ADDR=${GRPC_ADDR:-0.0.0.0:50051}
# For Linux + libvirt users, customize via LIBVIRT_URI and LIBVIRT_NETWORK
exec cargo run -p orchestrator -- \
exec cargo run -p orchestrator --features libvirt -- \
--config "$ORCH_CONFIG" \
--grpc-addr "$GRPC_ADDR"

View file

@ -113,6 +113,8 @@ async fn main() -> Result<()> {
let mut jr = common::JobRequest::new(common::SourceSystem::Manual, repo_url, commit_sha);
jr.runs_on = runs_on;
common::publish_job(&mq_cfg, &jr).await?;
// Print just the request_id on stdout so scripts can capture it reliably.
println!("{}", jr.request_id);
info!(request_id = %jr.request_id, "enqueued job request");
return Ok(());
}

View file

@ -56,6 +56,7 @@ pub trait Hypervisor: Send + Sync {
async fn prepare(&self, spec: &VmSpec, ctx: &JobContext) -> Result<VmHandle>;
async fn start(&self, vm: &VmHandle) -> Result<()>;
async fn stop(&self, vm: &VmHandle, graceful_timeout: Duration) -> Result<()>;
async fn suspend(&self, vm: &VmHandle) -> Result<()>;
async fn destroy(&self, vm: VmHandle) -> Result<()>;
async fn state(&self, _vm: &VmHandle) -> Result<VmState> {
Ok(VmState::Prepared)
@ -162,6 +163,27 @@ impl Hypervisor for RouterHypervisor {
_ => self.noop.stop(vm, t).await,
}
}
async fn suspend(&self, vm: &VmHandle) -> Result<()> {
match vm.backend {
#[cfg(all(target_os = "linux", feature = "libvirt"))]
BackendTag::Libvirt => {
if let Some(ref hv) = self.libvirt {
hv.suspend(vm).await
} else {
self.noop.suspend(vm).await
}
}
#[cfg(target_os = "illumos")]
BackendTag::Zones => {
if let Some(ref hv) = self.zones {
hv.suspend(vm).await
} else {
self.noop.suspend(vm).await
}
}
_ => self.noop.suspend(vm).await,
}
}
async fn destroy(&self, vm: VmHandle) -> Result<()> {
match vm.backend {
#[cfg(all(target_os = "linux", feature = "libvirt"))]
@ -235,6 +257,10 @@ impl Hypervisor for NoopHypervisor {
info!(id = %vm.id, "noop stop");
Ok(())
}
async fn suspend(&self, vm: &VmHandle) -> Result<()> {
info!(id = %vm.id, "noop suspend");
Ok(())
}
async fn destroy(&self, vm: VmHandle) -> Result<()> {
info!(id = %vm.id, "noop destroy");
Ok(())
@ -389,6 +415,11 @@ impl Hypervisor for LibvirtHypervisor {
seed_iso = Some(iso_path);
}
// Serial console log file path
let console_log = work_dir.join("console.log");
let console_log_str = console_log.display().to_string();
info!(domain = %id, console = %console_log_str, "serial console will be logged to file");
// Domain XML
let xml = {
let mem = spec.ram_mb;
@ -398,8 +429,8 @@ impl Hypervisor for LibvirtHypervisor {
let net = self.network.clone();
let cdrom = seed_str.map(|p| format!("<disk type='file' device='cdrom'>\n <driver name='qemu' type='raw'/>\n <source file='{}'/>\n <target dev='hdb' bus='ide'/>\n <readonly/>\n</disk>", p)).unwrap_or_default();
format!(
"<domain type='kvm'>\n<name>{}</name>\n<memory unit='MiB'>{}</memory>\n<vcpu>{}</vcpu>\n<os>\n <type arch='x86_64' machine='pc'>hvm</type>\n <boot dev='hd'/>\n</os>\n<features><acpi/></features>\n<devices>\n <disk type='file' device='disk'>\n <driver name='qemu' type='qcow2' cache='none'/>\n <source file='{}'/>\n <target dev='vda' bus='virtio'/>\n </disk>\n {}\n <interface type='network'>\n <source network='{}'/>\n <model type='virtio'/>\n </interface>\n <graphics type='vnc' autoport='yes' listen='127.0.0.1'/>\n <serial type='pty'>\n <target port='0'/>\n </serial>\n <console type='pty'>\n <target type='serial' port='0'/>\n </console>\n</devices>\n<on_poweroff>destroy</on_poweroff>\n<on_crash>destroy</on_crash>\n</domain>",
id, mem, vcpus, overlay_str, cdrom, net
"<domain type='kvm'>\n<name>{}</name>\n<memory unit='MiB'>{}</memory>\n<vcpu>{}</vcpu>\n<os>\n <type arch='x86_64' machine='pc'>hvm</type>\n <boot dev='hd'/>\n</os>\n<features><acpi/></features>\n<devices>\n <disk type='file' device='disk'>\n <driver name='qemu' type='qcow2' cache='none'/>\n <source file='{}'/>\n <target dev='vda' bus='virtio'/>\n </disk>\n {}\n <interface type='network'>\n <source network='{}'/>\n <model type='virtio'/>\n </interface>\n <graphics type='vnc' autoport='yes' listen='127.0.0.1'/>\n <serial type='pty'>\n <target port='0'/>\n </serial>\n <serial type='file'>\n <source path='{}'/>\n <target port='1'/>\n </serial>\n <console type='pty'>\n <target type='serial' port='0'/>\n </console>\n</devices>\n<on_poweroff>destroy</on_poweroff>\n<on_crash>destroy</on_crash>\n</domain>",
id, mem, vcpus, overlay_str, cdrom, net, console_log_str
)
};
@ -474,6 +505,23 @@ impl Hypervisor for LibvirtHypervisor {
info!(domain = %vm.id, "libvirt stopped");
Ok(())
}
async fn suspend(&self, vm: &VmHandle) -> Result<()> {
let id = vm.id.clone();
let uri = self.uri.clone();
tokio::task::spawn_blocking(move || -> miette::Result<()> {
use virt::{connect::Connect, domain::Domain};
let conn = Connect::open(Some(&uri))
.map_err(|e| miette::miette!("libvirt connect failed: {e}"))?;
let dom = Domain::lookup_by_name(&conn, &id)
.map_err(|e| miette::miette!("lookup domain failed: {e}"))?;
dom.suspend().map_err(|e| miette::miette!("domain suspend failed: {e}"))?;
Ok(())
})
.await
.into_diagnostic()??;
info!(domain = %vm.id, "libvirt suspended");
Ok(())
}
async fn destroy(&self, vm: VmHandle) -> Result<()> {
let id = vm.id.clone();
@ -617,4 +665,7 @@ impl Hypervisor for ZonesHypervisor {
async fn destroy(&self, _vm: VmHandle) -> Result<()> {
Ok(())
}
async fn suspend(&self, _vm: &VmHandle) -> Result<()> {
Ok(())
}
}

View file

@ -32,6 +32,10 @@ struct Opts {
#[arg(long, env = "MAX_CONCURRENCY", default_value_t = 2)]
max_concurrency: usize,
/// Skip persistence initialization (faster startup; disables DB writes)
#[arg(long = "skip-persistence", env = "ORCH_SKIP_PERSIST", default_value_t = false)]
skip_persistence: bool,
/// Per-label capacity map (e.g., illumos-latest=2,ubuntu-22.04=4)
#[arg(long, env = "CAPACITY_MAP")]
capacity_map: Option<String>,
@ -40,11 +44,11 @@ struct Opts {
#[arg(long, env = "GRPC_ADDR", default_value = "0.0.0.0:50051")]
grpc_addr: String,
/// Postgres connection string
/// Postgres connection string (if empty, persistence is disabled)
#[arg(
long,
env = "DATABASE_URL",
default_value = "postgres://user:pass@localhost:5432/solstice"
default_value = ""
)]
database_url: String,
@ -81,7 +85,7 @@ struct Opts {
otlp: Option<String>,
/// Placeholder VM run time in seconds (temporary until agent wiring)
#[arg(long, env = "VM_PLACEHOLDER_RUN_SECS", default_value_t = 300)]
#[arg(long, env = "VM_PLACEHOLDER_RUN_SECS", default_value_t = 3600)]
vm_placeholder_run_secs: u64,
}
@ -101,8 +105,12 @@ async fn main() -> Result<()> {
// Build hypervisor router
let router = RouterHypervisor::build(opts.libvirt_uri.clone(), opts.libvirt_network.clone());
// Initialize persistence
let persist = Arc::new(Persist::new(Some(opts.database_url.clone())).await?);
// Initialize persistence (optional). Skip when requested for faster startup.
let persist = if opts.skip_persistence {
Arc::new(Persist::new(None).await?)
} else {
Arc::new(Persist::new(Some(opts.database_url.clone())).await?)
};
// Build MQ config and start consumer
let mq_cfg = common::MqConfig {

View file

@ -2,8 +2,8 @@ use chrono::Utc;
use miette::{IntoDiagnostic as _, Result};
use sea_orm::sea_query::{Expr, OnConflict};
use sea_orm::{
ActiveModelTrait, ColumnTrait, Database, DatabaseConnection, QueryFilter, Set,
entity::prelude::*,
entity::prelude::*, ActiveModelTrait, ColumnTrait, Database, DatabaseConnection, QueryFilter,
Set,
};
use sea_orm_migration::MigratorTrait;
use tracing::{debug, info, warn};
@ -104,9 +104,10 @@ mod vms {
}
impl Persist {
/// Initialize persistence. If `database_url` is provided and the connection
/// succeeds, migrations are applied and the connection is retained.
/// If not provided or connection fails, persistence is disabled (no-op).
/// Initialize persistence.
/// - If `database_url` is Some(non-empty), attempt to connect and run migrations.
/// On any failure, return an error to fail-fast so operators notice misconfiguration.
/// - If `database_url` is None or empty, persistence is disabled (no-op).
pub async fn new(database_url: Option<String>) -> Result<Self> {
if let Some(url) = database_url.filter(|s| !s.trim().is_empty()) {
// Use a single connection for SQLite in-memory to avoid separate empty DBs per checkout
@ -115,30 +116,18 @@ impl Persist {
opts.max_connections(1)
.min_connections(1)
.sqlx_logging(false);
match Database::connect(opts).await.into_diagnostic() {
Ok(db) => {
// Apply migrations idempotently
if let Err(e) = migration::Migrator::up(&db, None).await.into_diagnostic() {
#[cfg(test)]
{
// In tests, surface the migration failure to help debugging
return Err(miette::miette!("migration failed: {e}"));
}
#[cfg(not(test))]
{
warn!(error = %e, "failed to apply migrations; proceeding without persistence");
return Ok(Self { db: None });
}
}
info!("persistence initialized (connected and migrated)");
return Ok(Self { db: Some(db) });
}
Err(e) => warn!(error = %e, "failed to connect to database; persistence disabled"),
}
let db = Database::connect(opts).await.into_diagnostic()
.map_err(|e| miette::miette!("failed to connect to database: {e}"))?;
migration::Migrator::up(&db, None)
.await
.into_diagnostic()
.map_err(|e| miette::miette!("failed to apply migrations: {e}"))?;
info!("persistence initialized (connected and migrated)");
Ok(Self { db: Some(db) })
} else {
warn!("DATABASE_URL not set; persistence disabled");
warn!("persistence disabled (no DATABASE_URL or skipped)");
Ok(Self { db: None })
}
Ok(Self { db: None })
}
pub fn is_enabled(&self) -> bool {
@ -335,7 +324,6 @@ mod tests {
)
.await
.expect("destroyed");
use sea_orm::QuerySelect;
let rows = vms::Entity::find()
.filter(vms::Column::RequestId.eq(req))
.filter(vms::Column::DomainName.eq(domain.clone()))

View file

@ -105,6 +105,7 @@ impl<H: Hypervisor + 'static> Scheduler<H> {
if let Err(e) = hv.start(&h).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to start VM");
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Failed).await;
info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, "failed");
return;
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Running).await;
@ -112,13 +113,19 @@ impl<H: Hypervisor + 'static> Scheduler<H> {
info!(request_id = %item.ctx.request_id, label = %label_key, "vm started (monitoring for completion)");
// Monitor VM state until it stops or until placeholder_runtime elapses; end early on shutdown
let start_time = std::time::Instant::now();
let mut timed_out = false;
let mut natural_stop = false;
loop {
// Check current state first
if let Ok(crate::hypervisor::VmState::Stopped) = hv.state(&h).await {
info!(request_id = %item.ctx.request_id, label = %label_key, "vm reported stopped");
natural_stop = true;
break;
}
if start_time.elapsed() >= placeholder_runtime {
timed_out = true;
break;
}
if start_time.elapsed() >= placeholder_runtime { break; }
// Wait either for shutdown signal or a short delay before next poll
tokio::select! {
_ = shutdown.notified() => {
@ -128,20 +135,35 @@ impl<H: Hypervisor + 'static> Scheduler<H> {
_ = tokio::time::sleep(Duration::from_secs(2)) => {}
}
}
// Stop and destroy
if let Err(e) = hv.stop(&h, Duration::from_secs(10)).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to stop VM");
if timed_out {
// Freeze/suspend VM for debugging; keep artifacts and domain defined
if let Err(e) = hv.suspend(&h).await {
warn!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to suspend VM after timeout");
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Stopped).await;
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Failed).await;
// Log where to find console log for libvirt guests
let console_hint = h.work_dir.join("console.log");
info!(request_id = %item.ctx.request_id, label = %label_key, domain = %h.id, console = %console_hint.display(), "vm timeout: suspended and kept for debugging");
info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, "failed");
} else {
// Stop and destroy on natural completion
if let Err(e) = hv.stop(&h, Duration::from_secs(10)).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to stop VM");
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Stopped).await;
if let Err(e) = hv.destroy(h.clone()).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to destroy VM");
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Destroyed).await;
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Succeeded).await;
info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, "succeeded");
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Stopped).await;
if let Err(e) = hv.destroy(h.clone()).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to destroy VM");
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Destroyed).await;
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Succeeded).await;
}
Err(e) => {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to prepare VM");
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Failed).await;
info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, "failed");
return;
}
}
@ -243,6 +265,9 @@ mod tests {
async fn stop(&self, _vm: &VmHandle, _t: Duration) -> miette::Result<()> {
Ok(())
}
async fn suspend(&self, _vm: &VmHandle) -> miette::Result<()> {
Ok(())
}
async fn destroy(&self, vm: VmHandle) -> miette::Result<()> {
// decrement overall current
self.active_all.fetch_sub(1, Ordering::SeqCst);