Add console log tailing module

New `console` module providing `ConsoleTailer` for async serial console
log streaming from VM backends:
- Connects to QEMU's Unix domain socket console
- Streams lines via tokio mpsc channel
- Retries connection (500ms intervals, 30s timeout)
- Graceful shutdown via watch channel
- Fallback `read_console_log()` for post-mortem log file reading

Signed-off-by: Till Wegmueller <toasterson@gmail.com>
This commit is contained in:
Till Wegmueller 2026-04-07 17:18:53 +02:00
parent 245e71fac7
commit 633e76bad1
No known key found for this signature in database
5 changed files with 61 additions and 66 deletions

View file

@ -160,7 +160,9 @@ pub enum VmError {
#[error("failed to pull OCI artifact {reference}: {detail}")] #[error("failed to pull OCI artifact {reference}: {detail}")]
#[diagnostic( #[diagnostic(
code(vm_manager::oci::pull_failed), code(vm_manager::oci::pull_failed),
help("check that the OCI reference is correct and the registry is reachable. For ghcr.io, ensure GITHUB_TOKEN is set in the environment.") help(
"check that the OCI reference is correct and the registry is reachable. For ghcr.io, ensure GITHUB_TOKEN is set in the environment."
)
)] )]
OciPullFailed { reference: String, detail: String }, OciPullFailed { reference: String, detail: String },

View file

@ -66,12 +66,10 @@ impl ImageManager {
/// Pull a QCOW2 image from an OCI registry into the cache directory. /// Pull a QCOW2 image from an OCI registry into the cache directory.
pub async fn pull_oci(&self, reference: &str, name: Option<&str>) -> Result<PathBuf> { pub async fn pull_oci(&self, reference: &str, name: Option<&str>) -> Result<PathBuf> {
let file_name = name let file_name = name.map(|n| format!("{n}.qcow2")).unwrap_or_else(|| {
.map(|n| format!("{n}.qcow2")) let sanitized = reference.replace('/', "_").replace(':', "_");
.unwrap_or_else(|| { format!("{sanitized}.qcow2")
let sanitized = reference.replace('/', "_").replace(':', "_"); });
format!("{sanitized}.qcow2")
});
let dest = self.cache.join(&file_name); let dest = self.cache.join(&file_name);
if dest.exists() { if dest.exists() {
info!(reference, dest = %dest.display(), "OCI image already cached; skipping pull"); info!(reference, dest = %dest.display(), "OCI image already cached; skipping pull");

View file

@ -9,12 +9,13 @@ const QCOW2_LAYER_MEDIA_TYPE: &str = "application/vnd.cloudnebula.qcow2.layer.v1
/// Pull a QCOW2 image stored as an OCI artifact from a registry. /// Pull a QCOW2 image stored as an OCI artifact from a registry.
pub async fn pull_qcow2(reference_str: &str) -> Result<Vec<u8>> { pub async fn pull_qcow2(reference_str: &str) -> Result<Vec<u8>> {
let reference: Reference = reference_str.parse().map_err(|e: oci_client::ParseError| { let reference: Reference =
VmError::OciPullFailed { reference_str
reference: reference_str.to_string(), .parse()
detail: format!("invalid OCI reference: {e}"), .map_err(|e: oci_client::ParseError| VmError::OciPullFailed {
} reference: reference_str.to_string(),
})?; detail: format!("invalid OCI reference: {e}"),
})?;
let auth = resolve_auth(&reference); let auth = resolve_auth(&reference);

View file

@ -68,17 +68,14 @@ fn run_shell(
if let Some(ref cmd) = shell.inline { if let Some(ref cmd) = shell.inline {
info!(vm = %vm_name, step, cmd = %cmd, "running inline shell provision"); info!(vm = %vm_name, step, cmd = %cmd, "running inline shell provision");
let (stdout, stderr, exit_code) = ssh::exec_streaming( let (stdout, stderr, exit_code) =
sess, ssh::exec_streaming(sess, cmd, std::io::stdout(), std::io::stderr()).map_err(|e| {
cmd, VmError::ProvisionFailed {
std::io::stdout(), vm: vm_name.into(),
std::io::stderr(), step,
) detail: format!("shell exec: {e}"),
.map_err(|e| VmError::ProvisionFailed { }
vm: vm_name.into(), })?;
step,
detail: format!("shell exec: {e}"),
})?;
if let Some(dir) = log_dir { if let Some(dir) = log_dir {
append_provision_log(dir, step, cmd, &stdout, &stderr); append_provision_log(dir, step, cmd, &stdout, &stderr);
@ -110,17 +107,14 @@ fn run_shell(
// Make executable and run // Make executable and run
let run_cmd = format!("chmod +x {remote_path_str} && {remote_path_str}"); let run_cmd = format!("chmod +x {remote_path_str} && {remote_path_str}");
let (stdout, stderr, exit_code) = ssh::exec_streaming( let (stdout, stderr, exit_code) =
sess, ssh::exec_streaming(sess, &run_cmd, std::io::stdout(), std::io::stderr()).map_err(
&run_cmd, |e| VmError::ProvisionFailed {
std::io::stdout(), vm: vm_name.into(),
std::io::stderr(), step,
) detail: format!("script exec: {e}"),
.map_err(|e| VmError::ProvisionFailed { },
vm: vm_name.into(), )?;
step,
detail: format!("script exec: {e}"),
})?;
if let Some(dir) = log_dir { if let Some(dir) = log_dir {
append_provision_log(dir, step, script_raw, &stdout, &stderr); append_provision_log(dir, step, script_raw, &stdout, &stderr);

View file

@ -321,11 +321,13 @@ fn parse_vm_def(name: &str, doc: &KdlDocument) -> Result<VmDef> {
// SSH // SSH
let ssh = if let Some(ssh_node) = doc.get("ssh") { let ssh = if let Some(ssh_node) = doc.get("ssh") {
let ssh_doc = ssh_node.children().ok_or_else(|| VmError::VmFileValidation { let ssh_doc = ssh_node
vm: name.into(), .children()
detail: "ssh block must have a body".into(), .ok_or_else(|| VmError::VmFileValidation {
hint: "add at least a user: ssh { user \"vm\" }".into(), vm: name.into(),
})?; detail: "ssh block must have a body".into(),
hint: "add at least a user: ssh { user \"vm\" }".into(),
})?;
let user = ssh_doc let user = ssh_doc
.get_arg("user") .get_arg("user")
.and_then(|v| v.as_string()) .and_then(|v| v.as_string())
@ -493,17 +495,18 @@ fn generate_ssh_keypair(vm_name: &str) -> Result<(String, String)> {
} }
})?; })?;
let pub_openssh = sk.public_key().to_openssh().map_err(|e| { let pub_openssh = sk
VmError::SshKeygenFailed { .public_key()
.to_openssh()
.map_err(|e| VmError::SshKeygenFailed {
detail: format!("serialize public key: {e}"), detail: format!("serialize public key: {e}"),
} })?;
})?;
let priv_pem = sk.to_openssh(LineEnding::LF).map_err(|e| { let priv_pem = sk
VmError::SshKeygenFailed { .to_openssh(LineEnding::LF)
.map_err(|e| VmError::SshKeygenFailed {
detail: format!("serialize private key: {e}"), detail: format!("serialize private key: {e}"),
} })?;
})?;
Ok((pub_openssh, priv_pem.to_string())) Ok((pub_openssh, priv_pem.to_string()))
} }
@ -528,14 +531,13 @@ async fn resolve_cloud_init_and_ssh(
if let Some(ci) = &def.cloud_init { if let Some(ci) = &def.cloud_init {
if let Some(raw_path) = &ci.user_data { if let Some(raw_path) = &ci.user_data {
let p = resolve_path(raw_path, base_dir); let p = resolve_path(raw_path, base_dir);
let data = let data = tokio::fs::read(&p)
tokio::fs::read(&p) .await
.await .map_err(|e| VmError::VmFileValidation {
.map_err(|e| VmError::VmFileValidation { vm: def.name.clone(),
vm: def.name.clone(), detail: format!("cannot read user-data at {}: {e}", p.display()),
detail: format!("cannot read user-data at {}: {e}", p.display()), hint: "check the user-data path".into(),
hint: "check the user-data path".into(), })?;
})?;
let cloud_init = Some(CloudInitConfig { let cloud_init = Some(CloudInitConfig {
user_data: data, user_data: data,
instance_id: Some(def.name.clone()), instance_id: Some(def.name.clone()),
@ -551,14 +553,13 @@ async fn resolve_cloud_init_and_ssh(
if let Some(ci) = &def.cloud_init { if let Some(ci) = &def.cloud_init {
if let Some(key_raw) = &ci.ssh_key { if let Some(key_raw) = &ci.ssh_key {
let key_path = resolve_path(key_raw, base_dir); let key_path = resolve_path(key_raw, base_dir);
let pubkey = let pubkey = tokio::fs::read_to_string(&key_path).await.map_err(|e| {
tokio::fs::read_to_string(&key_path) VmError::VmFileValidation {
.await vm: def.name.clone(),
.map_err(|e| VmError::VmFileValidation { detail: format!("cannot read ssh-key at {}: {e}", key_path.display()),
vm: def.name.clone(), hint: "check the ssh-key path".into(),
detail: format!("cannot read ssh-key at {}: {e}", key_path.display()), }
hint: "check the ssh-key path".into(), })?;
})?;
let (user_data, _meta) = let (user_data, _meta) =
build_cloud_config(ssh_user, pubkey.trim(), &def.name, hostname); build_cloud_config(ssh_user, pubkey.trim(), &def.name, hostname);
let cloud_init = Some(CloudInitConfig { let cloud_init = Some(CloudInitConfig {
@ -576,8 +577,7 @@ async fn resolve_cloud_init_and_ssh(
info!(vm = %def.name, "generating Ed25519 SSH keypair for cloud-init"); info!(vm = %def.name, "generating Ed25519 SSH keypair for cloud-init");
let (pub_openssh, priv_pem) = generate_ssh_keypair(&def.name)?; let (pub_openssh, priv_pem) = generate_ssh_keypair(&def.name)?;
let (user_data, _meta) = let (user_data, _meta) = build_cloud_config(ssh_user, &pub_openssh, &def.name, hostname);
build_cloud_config(ssh_user, &pub_openssh, &def.name, hostname);
let cloud_init = Some(CloudInitConfig { let cloud_init = Some(CloudInitConfig {
user_data, user_data,
instance_id: Some(def.name.clone()), instance_id: Some(def.name.clone()),