Refactor VM lifecycle handling and improve guest IP discovery, bump version to 0.1.6

- Adjust stopping, destroying, and persisting VM lifecycle events to ensure better sequencing and avoid races.
- Enhance `discover_guest_ip_virsh` with detailed logging, structured attempt tracking, and robust fallback mechanisms.
- Introduce `Attempt` struct to capture detailed command execution context for debugging.
- Update console log handling to snapshot logs early, minimizing race conditions.
- Bump orchestrator version to 0.1.6.

Signed-off-by: Till Wegmueller <toasterson@gmail.com>
This commit is contained in:
Till Wegmueller 2025-11-17 21:34:19 +01:00
parent d5faf319ab
commit bf94664a30
No known key found for this signature in database
2 changed files with 69 additions and 23 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "orchestrator" name = "orchestrator"
version = "0.1.5" version = "0.1.6"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View file

@ -217,17 +217,13 @@ impl<H: Hypervisor + 'static> Scheduler<H> {
warn!(request_id = %item.ctx.request_id, label = %label_key, "SSH execution not supported on this platform/backend; skipping"); warn!(request_id = %item.ctx.request_id, label = %label_key, "SSH execution not supported on this platform/backend; skipping");
} }
// Stop and destroy VM after attempting execution // Stop VM after attempting execution
if let Err(e) = hv.stop(&h, Duration::from_secs(10)).await { if let Err(e) = hv.stop(&h, Duration::from_secs(10)).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to stop VM"); error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to stop VM");
} }
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Stopped).await; let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Stopped).await;
if let Err(e) = hv.destroy(h.clone()).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to destroy VM");
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Destroyed).await;
// Stop console tailer // Stop console tailer before we snapshot the file to avoid races
if let Some(t) = tailer_opt.take() { t.abort(); } if let Some(t) = tailer_opt.take() { t.abort(); }
// If no logs were captured (e.g., SSH never connected), snapshot the final console log // If no logs were captured (e.g., SSH never connected), snapshot the final console log
@ -238,6 +234,12 @@ impl<H: Hypervisor + 'static> Scheduler<H> {
} }
} }
// Destroy VM and then persist final destroyed state
if let Err(e) = hv.destroy(h.clone()).await {
error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to destroy VM");
}
let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Destroyed).await;
// Persist final state and publish result // Persist final state and publish result
let final_state = if success { JobState::Succeeded } else { JobState::Failed }; let final_state = if success { JobState::Succeeded } else { JobState::Failed };
let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), final_state).await; let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), final_state).await;
@ -339,7 +341,7 @@ async fn snapshot_console_to_joblog(persist: Arc<Persist>, request_id: Uuid, con
async fn discover_guest_ip_virsh(domain: &str, timeout: Duration) -> Option<String> { async fn discover_guest_ip_virsh(domain: &str, timeout: Duration) -> Option<String> {
use tokio::{task, time::{sleep, Instant, Duration}}; use tokio::{task, time::{sleep, Instant, Duration}};
use std::process::Command; use std::process::Command;
use tracing::debug; use tracing::{debug, warn};
fn parse_ipv4_from_text(s: &str) -> Option<String> { fn parse_ipv4_from_text(s: &str) -> Option<String> {
for line in s.lines() { for line in s.lines() {
@ -362,33 +364,66 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration) -> Option<Stri
None None
} }
async fn run_cmd(args: &[&str]) -> Option<String> { fn preview_bytes(b: &[u8]) -> String {
let s = String::from_utf8_lossy(b);
let s = s.trim();
let mut out = s.to_string();
if out.len() > 800 { out.truncate(800); out.push_str(""); }
out
}
#[derive(Debug, Clone)]
struct Attempt {
cmd: String,
ok: bool,
status: Option<i32>,
stdout: String,
stderr: String,
}
async fn run_cmd(args: &[&str]) -> Attempt {
let args_vec = args.iter().map(|s| s.to_string()).collect::<Vec<_>>(); let args_vec = args.iter().map(|s| s.to_string()).collect::<Vec<_>>();
task::spawn_blocking(move || { let cmd_desc = format!("virsh {}", args_vec.join(" "));
match task::spawn_blocking(move || {
Command::new("virsh").args(&args_vec).output() Command::new("virsh").args(&args_vec).output()
}).await.ok()?.ok().and_then(|out| { }).await {
if out.status.success() { Ok(Ok(out)) => {
Some(String::from_utf8_lossy(&out.stdout).to_string()) let ok = out.status.success();
} else { None } let status = out.status.code();
}) let stdout = preview_bytes(&out.stdout);
let stderr = preview_bytes(&out.stderr);
Attempt { cmd: cmd_desc, ok, status, stdout, stderr }
}
other => {
// spawn or io error
Attempt { cmd: cmd_desc, ok: false, status: None, stdout: String::new(), stderr: format!("spawn error: {:?}", other) }
}
}
} }
let deadline = Instant::now() + timeout; let deadline = Instant::now() + timeout;
let mut last_attempts: Vec<Attempt> = Vec::new();
while Instant::now() < deadline { while Instant::now() < deadline {
last_attempts.clear();
// 1) Try domifaddr via agent then lease then default // 1) Try domifaddr via agent then lease then default
for source in [Some("agent"), Some("lease"), None] { for source in [Some("agent"), Some("lease"), None] {
let mut args = vec!["domifaddr", domain]; let mut args = vec!["domifaddr", domain];
if let Some(src) = source { args.push("--source"); args.push(src); } if let Some(src) = source { args.push("--source"); args.push(src); }
if let Some(out) = run_cmd(&args).await { let att = run_cmd(&args).await;
if let Some(ip) = parse_ipv4_from_text(&out) { debug!(domain=%domain, method=%format!("domifaddr/{:?}", source), ip=%ip, "discovered IP"); return Some(ip); } debug!(domain=%domain, method=%format!("domifaddr/{:?}", source), ok=att.ok, status=?att.status, stdout=%att.stdout, stderr=%att.stderr, cmd=%att.cmd, "virsh attempt");
if att.ok {
if let Some(ip) = parse_ipv4_from_text(&att.stdout) { debug!(domain=%domain, method=%format!("domifaddr/{:?}", source), ip=%ip, "discovered IP"); return Some(ip); }
} }
last_attempts.push(att);
} }
// 2) Try domiflist to get MAC and possibly network name // 2) Try domiflist to get MAC and possibly network name
let mut mac: Option<String> = None; let mut mac: Option<String> = None;
let mut net_name: Option<String> = None; let mut net_name: Option<String> = None;
if let Some(out) = run_cmd(&["domiflist", domain]).await { let att_domiflist = run_cmd(&["domiflist", domain]).await;
for line in out.lines().skip(2) { // skip header lines debug!(domain=%domain, method="domiflist", ok=att_domiflist.ok, status=?att_domiflist.status, stdout=%att_domiflist.stdout, stderr=%att_domiflist.stderr, cmd=%att_domiflist.cmd, "virsh attempt");
if att_domiflist.ok {
for line in att_domiflist.stdout.lines().skip(2) { // skip header lines
let cols: Vec<&str> = line.split_whitespace().collect(); let cols: Vec<&str> = line.split_whitespace().collect();
if cols.len() >= 5 { if cols.len() >= 5 {
// columns: Interface Type Source Model MAC // columns: Interface Type Source Model MAC
@ -398,6 +433,7 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration) -> Option<Stri
} }
} }
} }
last_attempts.push(att_domiflist);
// Fallback to env if domiflist didn't give a Source network // Fallback to env if domiflist didn't give a Source network
if net_name.is_none() { if net_name.is_none() {
if let Ok(env_net) = std::env::var("LIBVIRT_NETWORK") { if !env_net.is_empty() { net_name = Some(env_net); } } if let Ok(env_net) = std::env::var("LIBVIRT_NETWORK") { if !env_net.is_empty() { net_name = Some(env_net); } }
@ -419,21 +455,31 @@ async fn discover_guest_ip_virsh(domain: &str, timeout: Duration) -> Option<Stri
} }
} }
// 2b) Try virsh net-dhcp-leases <network> // 2b) Try virsh net-dhcp-leases <network>
if let Some(net) = net_name { if let Some(net) = net_name.clone() {
if let Some(out) = run_cmd(&["net-dhcp-leases", &net]).await { let att_leases = run_cmd(&["net-dhcp-leases", &net]).await;
debug!(domain=%domain, method="net-dhcp-leases", ok=att_leases.ok, status=?att_leases.status, stdout=%att_leases.stdout, stderr=%att_leases.stderr, cmd=%att_leases.cmd, "virsh attempt");
if att_leases.ok {
if let Some(ref mac_s) = mac { if let Some(ref mac_s) = mac {
for line in out.lines() { for line in att_leases.stdout.lines() {
if line.to_ascii_lowercase().contains(mac_s.as_str()) { if line.to_ascii_lowercase().contains(mac_s.as_str()) {
if let Some(ip) = parse_ipv4_from_text(line) { debug!(domain=%domain, method="net-dhcp-leases", ip=%ip, "discovered IP"); return Some(ip); } if let Some(ip) = parse_ipv4_from_text(line) { debug!(domain=%domain, method="net-dhcp-leases", ip=%ip, "discovered IP"); return Some(ip); }
} }
} }
} }
if let Some(ip) = parse_ipv4_from_text(&out) { debug!(domain=%domain, method="net-dhcp-leases-any", ip=%ip, "discovered IP"); return Some(ip); } if let Some(ip) = parse_ipv4_from_text(&att_leases.stdout) { debug!(domain=%domain, method="net-dhcp-leases-any", ip=%ip, "discovered IP"); return Some(ip); }
} }
last_attempts.push(att_leases);
} }
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
} }
// No IP found; emit a summary at warn level with the last attempts' details
if !last_attempts.is_empty() {
for att in last_attempts {
warn!(domain=%domain, cmd=%att.cmd, ok=att.ok, status=?att.status, stdout=%att.stdout, stderr=%att.stderr, "virsh attempt summary");
}
}
None None
} }