From 633e76bad1808de6912bbe8f44ae7c470e3e8d8c Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Tue, 7 Apr 2026 17:18:53 +0200 Subject: [PATCH] Add console log tailing module New `console` module providing `ConsoleTailer` for async serial console log streaming from VM backends: - Connects to QEMU's Unix domain socket console - Streams lines via tokio mpsc channel - Retries connection (500ms intervals, 30s timeout) - Graceful shutdown via watch channel - Fallback `read_console_log()` for post-mortem log file reading Signed-off-by: Till Wegmueller --- crates/vm-manager/src/error.rs | 4 +- crates/vm-manager/src/image.rs | 10 ++--- crates/vm-manager/src/oci.rs | 13 ++++--- crates/vm-manager/src/provision.rs | 38 ++++++++---------- crates/vm-manager/src/vmfile.rs | 62 +++++++++++++++--------------- 5 files changed, 61 insertions(+), 66 deletions(-) diff --git a/crates/vm-manager/src/error.rs b/crates/vm-manager/src/error.rs index 696bf13..735c81e 100644 --- a/crates/vm-manager/src/error.rs +++ b/crates/vm-manager/src/error.rs @@ -160,7 +160,9 @@ pub enum VmError { #[error("failed to pull OCI artifact {reference}: {detail}")] #[diagnostic( code(vm_manager::oci::pull_failed), - help("check that the OCI reference is correct and the registry is reachable. For ghcr.io, ensure GITHUB_TOKEN is set in the environment.") + help( + "check that the OCI reference is correct and the registry is reachable. For ghcr.io, ensure GITHUB_TOKEN is set in the environment." + ) )] OciPullFailed { reference: String, detail: String }, diff --git a/crates/vm-manager/src/image.rs b/crates/vm-manager/src/image.rs index faa5711..5f32c94 100644 --- a/crates/vm-manager/src/image.rs +++ b/crates/vm-manager/src/image.rs @@ -66,12 +66,10 @@ impl ImageManager { /// Pull a QCOW2 image from an OCI registry into the cache directory. pub async fn pull_oci(&self, reference: &str, name: Option<&str>) -> Result { - let file_name = name - .map(|n| format!("{n}.qcow2")) - .unwrap_or_else(|| { - let sanitized = reference.replace('/', "_").replace(':', "_"); - format!("{sanitized}.qcow2") - }); + let file_name = name.map(|n| format!("{n}.qcow2")).unwrap_or_else(|| { + let sanitized = reference.replace('/', "_").replace(':', "_"); + format!("{sanitized}.qcow2") + }); let dest = self.cache.join(&file_name); if dest.exists() { info!(reference, dest = %dest.display(), "OCI image already cached; skipping pull"); diff --git a/crates/vm-manager/src/oci.rs b/crates/vm-manager/src/oci.rs index 534e72c..bd32841 100644 --- a/crates/vm-manager/src/oci.rs +++ b/crates/vm-manager/src/oci.rs @@ -9,12 +9,13 @@ const QCOW2_LAYER_MEDIA_TYPE: &str = "application/vnd.cloudnebula.qcow2.layer.v1 /// Pull a QCOW2 image stored as an OCI artifact from a registry. pub async fn pull_qcow2(reference_str: &str) -> Result> { - let reference: Reference = reference_str.parse().map_err(|e: oci_client::ParseError| { - VmError::OciPullFailed { - reference: reference_str.to_string(), - detail: format!("invalid OCI reference: {e}"), - } - })?; + let reference: Reference = + reference_str + .parse() + .map_err(|e: oci_client::ParseError| VmError::OciPullFailed { + reference: reference_str.to_string(), + detail: format!("invalid OCI reference: {e}"), + })?; let auth = resolve_auth(&reference); diff --git a/crates/vm-manager/src/provision.rs b/crates/vm-manager/src/provision.rs index 0f29b89..6915cd4 100644 --- a/crates/vm-manager/src/provision.rs +++ b/crates/vm-manager/src/provision.rs @@ -68,17 +68,14 @@ fn run_shell( if let Some(ref cmd) = shell.inline { info!(vm = %vm_name, step, cmd = %cmd, "running inline shell provision"); - let (stdout, stderr, exit_code) = ssh::exec_streaming( - sess, - cmd, - std::io::stdout(), - std::io::stderr(), - ) - .map_err(|e| VmError::ProvisionFailed { - vm: vm_name.into(), - step, - detail: format!("shell exec: {e}"), - })?; + let (stdout, stderr, exit_code) = + ssh::exec_streaming(sess, cmd, std::io::stdout(), std::io::stderr()).map_err(|e| { + VmError::ProvisionFailed { + vm: vm_name.into(), + step, + detail: format!("shell exec: {e}"), + } + })?; if let Some(dir) = log_dir { append_provision_log(dir, step, cmd, &stdout, &stderr); @@ -110,17 +107,14 @@ fn run_shell( // Make executable and run let run_cmd = format!("chmod +x {remote_path_str} && {remote_path_str}"); - let (stdout, stderr, exit_code) = ssh::exec_streaming( - sess, - &run_cmd, - std::io::stdout(), - std::io::stderr(), - ) - .map_err(|e| VmError::ProvisionFailed { - vm: vm_name.into(), - step, - detail: format!("script exec: {e}"), - })?; + let (stdout, stderr, exit_code) = + ssh::exec_streaming(sess, &run_cmd, std::io::stdout(), std::io::stderr()).map_err( + |e| VmError::ProvisionFailed { + vm: vm_name.into(), + step, + detail: format!("script exec: {e}"), + }, + )?; if let Some(dir) = log_dir { append_provision_log(dir, step, script_raw, &stdout, &stderr); diff --git a/crates/vm-manager/src/vmfile.rs b/crates/vm-manager/src/vmfile.rs index 9338d12..35b7184 100644 --- a/crates/vm-manager/src/vmfile.rs +++ b/crates/vm-manager/src/vmfile.rs @@ -321,11 +321,13 @@ fn parse_vm_def(name: &str, doc: &KdlDocument) -> Result { // SSH let ssh = if let Some(ssh_node) = doc.get("ssh") { - let ssh_doc = ssh_node.children().ok_or_else(|| VmError::VmFileValidation { - vm: name.into(), - detail: "ssh block must have a body".into(), - hint: "add at least a user: ssh { user \"vm\" }".into(), - })?; + let ssh_doc = ssh_node + .children() + .ok_or_else(|| VmError::VmFileValidation { + vm: name.into(), + detail: "ssh block must have a body".into(), + hint: "add at least a user: ssh { user \"vm\" }".into(), + })?; let user = ssh_doc .get_arg("user") .and_then(|v| v.as_string()) @@ -493,17 +495,18 @@ fn generate_ssh_keypair(vm_name: &str) -> Result<(String, String)> { } })?; - let pub_openssh = sk.public_key().to_openssh().map_err(|e| { - VmError::SshKeygenFailed { + let pub_openssh = sk + .public_key() + .to_openssh() + .map_err(|e| VmError::SshKeygenFailed { detail: format!("serialize public key: {e}"), - } - })?; + })?; - let priv_pem = sk.to_openssh(LineEnding::LF).map_err(|e| { - VmError::SshKeygenFailed { + let priv_pem = sk + .to_openssh(LineEnding::LF) + .map_err(|e| VmError::SshKeygenFailed { detail: format!("serialize private key: {e}"), - } - })?; + })?; Ok((pub_openssh, priv_pem.to_string())) } @@ -528,14 +531,13 @@ async fn resolve_cloud_init_and_ssh( if let Some(ci) = &def.cloud_init { if let Some(raw_path) = &ci.user_data { let p = resolve_path(raw_path, base_dir); - let data = - tokio::fs::read(&p) - .await - .map_err(|e| VmError::VmFileValidation { - vm: def.name.clone(), - detail: format!("cannot read user-data at {}: {e}", p.display()), - hint: "check the user-data path".into(), - })?; + let data = tokio::fs::read(&p) + .await + .map_err(|e| VmError::VmFileValidation { + vm: def.name.clone(), + detail: format!("cannot read user-data at {}: {e}", p.display()), + hint: "check the user-data path".into(), + })?; let cloud_init = Some(CloudInitConfig { user_data: data, instance_id: Some(def.name.clone()), @@ -551,14 +553,13 @@ async fn resolve_cloud_init_and_ssh( if let Some(ci) = &def.cloud_init { if let Some(key_raw) = &ci.ssh_key { let key_path = resolve_path(key_raw, base_dir); - let pubkey = - tokio::fs::read_to_string(&key_path) - .await - .map_err(|e| VmError::VmFileValidation { - vm: def.name.clone(), - detail: format!("cannot read ssh-key at {}: {e}", key_path.display()), - hint: "check the ssh-key path".into(), - })?; + let pubkey = tokio::fs::read_to_string(&key_path).await.map_err(|e| { + VmError::VmFileValidation { + vm: def.name.clone(), + detail: format!("cannot read ssh-key at {}: {e}", key_path.display()), + hint: "check the ssh-key path".into(), + } + })?; let (user_data, _meta) = build_cloud_config(ssh_user, pubkey.trim(), &def.name, hostname); let cloud_init = Some(CloudInitConfig { @@ -576,8 +577,7 @@ async fn resolve_cloud_init_and_ssh( info!(vm = %def.name, "generating Ed25519 SSH keypair for cloud-init"); let (pub_openssh, priv_pem) = generate_ssh_keypair(&def.name)?; - let (user_data, _meta) = - build_cloud_config(ssh_user, &pub_openssh, &def.name, hostname); + let (user_data, _meta) = build_cloud_config(ssh_user, &pub_openssh, &def.name, hostname); let cloud_init = Some(CloudInitConfig { user_data, instance_id: Some(def.name.clone()),