diff --git a/.cargo/config.toml b/.cargo/config.toml index 256028a..73ff438 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,5 @@ -# Cargo configuration for Solstice CI -# vm-manager is referenced via path dep in orchestrator's Cargo.toml. -# For local development, ensure the vm-manager repo is available at: -# ../vm-manager (relative to solstice-ci root) -# or create a symlink: ln -s /path/to/vm-manager ../vm-manager +# Local path override for development — uses local vm-manager checkout +# instead of fetching from GitHub on every build. +# Remove or comment out to use the git dependency from Cargo.toml. +[patch."https://github.com/CloudNebulaProject/vm-manager.git"] +vm-manager = { path = "../vm-manager/crates/vm-manager" } diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 65f1c04..25eafd1 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -1,6 +1,5 @@ use std::{ collections::BTreeMap, - fs, path::{Path, PathBuf}, }; @@ -53,17 +52,13 @@ pub struct ImageDefaults { #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] +#[derive(Default)] pub enum Decompress { Zstd, + #[default] None, } -impl Default for Decompress { - fn default() -> Self { - Decompress::None - } -} - impl OrchestratorConfig { pub async fn load(path: Option<&Path>) -> Result { let path = match path { diff --git a/crates/orchestrator/src/http.rs b/crates/orchestrator/src/http.rs index fabfed1..6ad9958 100644 --- a/crates/orchestrator/src/http.rs +++ b/crates/orchestrator/src/http.rs @@ -7,8 +7,7 @@ use axum::{ }; use std::net::SocketAddr; use std::sync::Arc; -use tracing::{info, warn}; -use uuid::Uuid; +use tracing::info; use crate::persist::Persist; @@ -61,7 +60,7 @@ pub async fn serve( .await .expect("bind http"); let server = axum::serve(listener, app); - let _ = tokio::select! { + tokio::select! { _ = server => {}, _ = shutdown => {}, }; diff --git a/crates/orchestrator/src/persist.rs b/crates/orchestrator/src/persist.rs index fa99afb..e924647 100644 --- a/crates/orchestrator/src/persist.rs +++ b/crates/orchestrator/src/persist.rs @@ -275,10 +275,10 @@ impl Persist { if let Ok(val) = serde_json::from_str::(line) { // Keep original JSON for reference fields_json = Some(line.to_string()); - if let Some(c) = val.get("category").and_then(|v| v.as_str()) { - if !c.is_empty() { - category = c.to_string(); - } + if let Some(c) = val.get("category").and_then(|v| v.as_str()) + && !c.is_empty() + { + category = c.to_string(); } if let Some(l) = val.get("level").and_then(|v| v.as_str()) { level_str = Some(l.to_string()); diff --git a/crates/orchestrator/src/scheduler.rs b/crates/orchestrator/src/scheduler.rs index a42fc5b..51cabf7 100644 --- a/crates/orchestrator/src/scheduler.rs +++ b/crates/orchestrator/src/scheduler.rs @@ -1,7 +1,6 @@ -use crate::error::OrchestratorError; use common::{DeadLetter, JobRequest, messages::JobResult, publish_deadletter, publish_job_result}; use dashmap::DashMap; -use miette::{IntoDiagnostic as _, Result}; +use miette::Result; use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use tokio::sync::{Notify, Semaphore, mpsc}; use tracing::{error, info, warn}; @@ -9,9 +8,7 @@ use tracing::{error, info, warn}; use crate::hypervisor::{BackendTag, Hypervisor, JobContext, VmSpec}; use crate::persist::{JobState, Persist, VmPersistState}; -use tokio::fs::File; -use tokio::io::{AsyncBufReadExt, BufReader}; -use uuid::Uuid; +use tokio::io::AsyncBufReadExt; pub struct Scheduler { mq_cfg: Arc, @@ -359,8 +356,8 @@ impl Scheduler { let _ = console_recorder.await; // If no logs were captured, snapshot the console log file as fallback - if let Ok(None) = persist.get_logs_text(item.ctx.request_id).await { - if let Ok(lines) = vm_manager::console::read_console_log(&h.work_dir).await { + if let Ok(None) = persist.get_logs_text(item.ctx.request_id).await + && let Ok(lines) = vm_manager::console::read_console_log(&h.work_dir).await { let mut seq: i64 = -1_000_000; for line in lines { let obj = serde_json::json!({ @@ -372,7 +369,6 @@ impl Scheduler { seq += 1; } } - } // Destroy VM and then persist final destroyed state if let Err(e) = hv.destroy(h.clone()).await { @@ -383,7 +379,7 @@ impl Scheduler { // Persist final state and publish result let final_state = if success { JobState::Succeeded } else { JobState::Failed }; let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), final_state).await; - let mut result = JobResult::new(item.ctx.request_id, item.ctx.repo_url.clone(), item.ctx.commit_sha.clone(), success, exit_code, failure_summary.clone()); + let result = JobResult::new(item.ctx.request_id, item.ctx.repo_url.clone(), item.ctx.commit_sha.clone(), success, exit_code, failure_summary.clone()); if let Err(e) = publish_job_result(&mq_cfg_in, &result).await { warn!(error = %e, request_id = %item.ctx.request_id, "failed to publish JobResult"); } @@ -400,7 +396,6 @@ impl Scheduler { } } info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, "failed"); - return; } } }); @@ -420,473 +415,6 @@ impl Scheduler { } } -// Tail the VM serial console file and record lines into the job log until cancelled. -async fn tail_console_to_joblog( - persist: Arc, - request_id: Uuid, - console_path: PathBuf, -) -> miette::Result<()> { - use miette::IntoDiagnostic as _; - use tokio::time::{Duration, sleep}; - - // Negative sequence numbers for early console logs so they sort before runner logs (which start at 0). - let mut seq: i64 = -1_000_000; // ample headroom - - loop { - match File::open(&console_path).await { - Ok(file) => { - let mut reader = BufReader::new(file); - let mut buf = String::new(); - loop { - buf.clear(); - let n = reader.read_line(&mut buf).await.into_diagnostic()?; - if n == 0 { - // No new data yet; yield and retry - sleep(Duration::from_millis(200)).await; - continue; - } - let trimmed = buf.trim_end_matches(['\n', '\r']); - // Emit as NDJSON so logs-service can categorize this as 'boot' - let obj = serde_json::json!({ - "category": "boot", - "level": "info", - "msg": trimmed - }); - let line = obj.to_string(); - let _ = persist.record_log_line(request_id, seq, false, &line).await; - seq += 1; - } - } - Err(_) => { - // File may not exist yet; back off briefly before retrying - sleep(Duration::from_millis(200)).await; - } - } - } -} - -// Snapshot the entire console log file once and persist its lines with negative seq numbers. -async fn snapshot_console_to_joblog( - persist: Arc, - request_id: Uuid, - console_path: PathBuf, -) -> miette::Result<()> { - use miette::IntoDiagnostic as _; - match tokio::fs::read_to_string(&console_path).await { - Ok(content) => { - let mut seq: i64 = -1_000_000; // keep consistent ordering before runner logs - for raw in content.lines() { - let trimmed = raw.trim_end_matches(['\n', '\r']); - let obj = serde_json::json!({ - "category": "boot", - "level": "info", - "msg": trimmed - }); - let line = obj.to_string(); - let _ = persist.record_log_line(request_id, seq, false, &line).await; - seq += 1; - } - Ok(()) - } - Err(e) => { - // If file missing or unreadable, just return error up to caller for logging - Err(e).into_diagnostic() - } - } -} - -#[cfg(all(target_os = "linux", feature = "libvirt"))] -async fn discover_guest_ip_virsh( - domain: &str, - timeout: Duration, - libvirt_uri: &str, - libvirt_network: Option<&str>, -) -> Option { - use std::process::Command; - use tokio::{ - task, - time::{Duration, Instant, sleep}, - }; - use tracing::{debug, warn}; - - fn parse_ipv4_from_text(s: &str) -> Option { - for line in s.lines() { - if let Some(slash) = line.find('/') { - let start = line[..slash].rfind(' ').map(|i| i + 1).unwrap_or(0); - let ip = &line[start..slash]; - if ip.chars().all(|c| c.is_ascii_digit() || c == '.') { - return Some(ip.to_string()); - } - } - let mut cur = String::new(); - for ch in line.chars() { - if ch.is_ascii_digit() || ch == '.' { - cur.push(ch); - } else { - if cur.split('.').count() == 4 { - return Some(cur.clone()); - } - cur.clear(); - } - } - if cur.split('.').count() == 4 { - return Some(cur); - } - } - None - } - - fn preview_bytes(b: &[u8]) -> String { - let s = String::from_utf8_lossy(b); - let s = s.trim(); - let mut out = s.to_string(); - if out.len() > 800 { - out.truncate(800); - out.push_str("…"); - } - out - } - - #[derive(Debug, Clone)] - struct Attempt { - cmd: String, - ok: bool, - status: Option, - stdout: String, - stderr: String, - } - - async fn run_cmd(args: &[&str], uri: &str) -> Attempt { - let args_vec = { - let mut v = Vec::with_capacity(args.len() + 2); - v.push("-c".to_string()); - v.push(uri.to_string()); - v.extend(args.iter().map(|s| s.to_string())); - v - }; - let cmd_desc = format!("virsh {}", args_vec.join(" ")); - match task::spawn_blocking(move || { - let mut cmd = Command::new("virsh"); - cmd.args(&args_vec); - cmd.output() - }) - .await - { - Ok(Ok(out)) => { - let ok = out.status.success(); - let status = out.status.code(); - let stdout = preview_bytes(&out.stdout); - let stderr = preview_bytes(&out.stderr); - Attempt { - cmd: cmd_desc, - ok, - status, - stdout, - stderr, - } - } - other => Attempt { - cmd: cmd_desc, - ok: false, - status: None, - stdout: String::new(), - stderr: format!("spawn error: {:?}", other), - }, - } - } - - let deadline = Instant::now() + timeout; - let mut last_attempts: Vec = Vec::new(); - while Instant::now() < deadline { - last_attempts.clear(); - // 1) Try domifaddr via agent then lease then default - for source in [Some("agent"), Some("lease"), None] { - let mut args = vec!["domifaddr", domain]; - if let Some(src) = source { - args.push("--source"); - args.push(src); - } - let att = run_cmd(&args, libvirt_uri).await; - debug!(domain=%domain, method=%format!("domifaddr/{:?}", source), ok=att.ok, status=?att.status, stdout=%att.stdout, stderr=%att.stderr, cmd=%att.cmd, "virsh attempt"); - if att.ok { - if let Some(ip) = parse_ipv4_from_text(&att.stdout) { - debug!(domain=%domain, method=%format!("domifaddr/{:?}", source), ip=%ip, "discovered IP"); - return Some(ip); - } - } - last_attempts.push(att); - } - - // 2) Try domiflist to get MAC and possibly network name - let mut mac: Option = None; - let mut net_name: Option = None; - let att_domiflist = run_cmd(&["domiflist", domain], libvirt_uri).await; - debug!(domain=%domain, method="domiflist", ok=att_domiflist.ok, status=?att_domiflist.status, stdout=%att_domiflist.stdout, stderr=%att_domiflist.stderr, cmd=%att_domiflist.cmd, "virsh attempt"); - if att_domiflist.ok { - for line in att_domiflist.stdout.lines().skip(2) { - // skip header lines - let cols: Vec<&str> = line.split_whitespace().collect(); - if cols.len() >= 5 { - // columns: Interface Type Source Model MAC - net_name = Some(cols[2].to_string()); - mac = Some(cols[4].to_ascii_lowercase()); - break; - } - } - } - last_attempts.push(att_domiflist); - // Fallback to env if domiflist didn't give a Source network - if net_name.is_none() { - if let Some(n) = libvirt_network { - if !n.is_empty() { - net_name = Some(n.to_string()); - } - } - } - // 2a) Parse leases file if we have network and MAC - if let (Some(net), Some(mac_s)) = (net_name.clone(), mac.clone()) { - let path = format!("/var/lib/libvirt/dnsmasq/{}.leases", net); - let content_opt = task::spawn_blocking(move || std::fs::read_to_string(path)) - .await - .ok() - .and_then(|r| r.ok()); - if let Some(content) = content_opt { - let mut best_ip: Option = None; - let mut best_epoch: i64 = -1; - for line in content.lines() { - // Format: epoch MAC IP hostname clientid - let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() >= 3 && parts[1].eq_ignore_ascii_case(mac_s.as_str()) { - let epoch: i64 = parts[0].parse::().unwrap_or(0); - if epoch > best_epoch { - best_epoch = epoch; - best_ip = Some(parts[2].to_string()); - } - } - } - if let Some(ip) = best_ip { - debug!(domain=%domain, method="dnsmasq.leases", ip=%ip, "discovered IP"); - return Some(ip); - } - } - } - // 2b) Try virsh net-dhcp-leases - if let Some(net) = net_name.clone() { - let att_leases = run_cmd(&["net-dhcp-leases", &net], libvirt_uri).await; - debug!(domain=%domain, method="net-dhcp-leases", ok=att_leases.ok, status=?att_leases.status, stdout=%att_leases.stdout, stderr=%att_leases.stderr, cmd=%att_leases.cmd, "virsh attempt"); - if att_leases.ok { - if let Some(ref mac_s) = mac { - for line in att_leases.stdout.lines() { - if line.to_ascii_lowercase().contains(mac_s.as_str()) { - if let Some(ip) = parse_ipv4_from_text(line) { - debug!(domain=%domain, method="net-dhcp-leases", ip=%ip, "discovered IP"); - return Some(ip); - } - } - } - } - } - last_attempts.push(att_leases); - } - - sleep(Duration::from_secs(1)).await; - } - - // No IP found; emit a summary at warn level with the last attempts' details - if !last_attempts.is_empty() { - for att in last_attempts { - warn!(domain=%domain, cmd=%att.cmd, ok=att.ok, status=?att.status, stdout=%att.stdout, stderr=%att.stderr, "virsh attempt summary"); - } - } - None -} - -#[cfg(all(target_os = "linux", feature = "libvirt"))] -async fn run_job_via_ssh_with_retry( - ip: String, - user: String, - key_mem_opt: Option, - per_job_key_path: Option, - local_runner: PathBuf, - remote_path: PathBuf, - repo_url: String, - commit_sha: String, - request_id: uuid::Uuid, - timeout: Duration, -) -> miette::Result<(bool, i32, Vec<(bool, String)>)> { - use tokio::time::{Instant, sleep}; - use tracing::warn; - - let deadline = Instant::now() + timeout; - let mut attempt: u32 = 0; - let mut backoff = Duration::from_secs(1); - loop { - attempt += 1; - match run_job_via_ssh_owned( - ip.clone(), - user.clone(), - key_mem_opt.clone(), - per_job_key_path.clone(), - local_runner.clone(), - remote_path.clone(), - repo_url.clone(), - commit_sha.clone(), - request_id, - ) - .await - { - Ok(r) => return Ok(r), - Err(e) => { - let now = Instant::now(); - if now >= deadline { - return Err(e); // give up with last error - } - warn!( - attempt, - remaining_ms = (deadline - now).as_millis() as u64, - ip = %ip, - request_id = %request_id, - error = %e, - "SSH connect/exec not ready yet; will retry" - ); - // Cap backoff at 5s and don’t sleep past the deadline - let sleep_dur = std::cmp::min(backoff, deadline.saturating_duration_since(now)); - sleep(sleep_dur).await; - backoff = std::cmp::min(backoff.saturating_mul(2), Duration::from_secs(5)); - } - } - } -} - -#[cfg(all(target_os = "linux", feature = "libvirt"))] -async fn run_job_via_ssh_owned( - ip: String, - user: String, - key_mem_opt: Option, - per_job_key_path: Option, - local_runner: PathBuf, - remote_path: PathBuf, - repo_url: String, - commit_sha: String, - request_id: uuid::Uuid, -) -> miette::Result<(bool, i32, Vec<(bool, String)>)> { - use ssh2::Session; - use std::fs::File as StdFile; - use std::io::Read; - use std::net::TcpStream; - use tokio::task; - - let (ok, code, lines) = task::spawn_blocking(move || -> miette::Result<(bool, i32, Vec<(bool, String)>)> { - // Connect TCP - let tcp = TcpStream::connect(format!("{}:22", ip)) - .map_err(OrchestratorError::TcpConnect) - .into_diagnostic()?; - let mut sess = Session::new().map_err(|_| OrchestratorError::SshSessionNew).into_diagnostic()?; - sess.set_tcp_stream(tcp); - sess.handshake().map_err(OrchestratorError::SshHandshake).into_diagnostic()?; - // Authenticate priority: memory key -> per-job file -> global file - if let Some(ref key_pem) = key_mem_opt { - sess.userauth_pubkey_memory(&user, None, key_pem, None) - .map_err(|e| OrchestratorError::SshAuth { user: user.clone(), source: e }) - .into_diagnostic()?; - } else if let Some(ref p) = per_job_key_path { - sess.userauth_pubkey_file(&user, None, p, None) - .map_err(|e| OrchestratorError::SshAuth { user: user.clone(), source: e }) - .into_diagnostic()?; - } else { - return Err(miette::miette!("no SSH key available for job (neither in-memory nor per-job file)")); - } - if !sess.authenticated() { - return Err(miette::miette!("ssh not authenticated")); - } - // Upload runner via SFTP atomically: write to temp, close, then rename over final path - let sftp = sess.sftp().map_err(OrchestratorError::SftpInit).into_diagnostic()?; - let mut local = StdFile::open(&local_runner) - .map_err(OrchestratorError::OpenLocalRunner) - .into_diagnostic()?; - let mut buf = Vec::new(); - local.read_to_end(&mut buf).map_err(OrchestratorError::ReadRunner).into_diagnostic()?; - // Temp remote path - let tmp_remote = { - let mut p = remote_path.clone(); - let tmp_name = format!("{}.tmp", p.file_name().and_then(|s| s.to_str()).unwrap_or("solstice-runner")); - p.set_file_name(tmp_name); - p - }; - let mut remote_tmp = sftp.create(&tmp_remote) - .map_err(OrchestratorError::SftpCreate) - .into_diagnostic()?; - use std::io::Write as _; - remote_tmp.write_all(&buf) - .map_err(OrchestratorError::SftpWrite) - .into_diagnostic()?; - let tmp_file_stat = ssh2::FileStat { size: None, uid: None, gid: None, perm: Some(0o755), atime: None, mtime: None }; - let _ = sftp.setstat(&tmp_remote, tmp_file_stat); - // Ensure remote file handle is closed before attempting exec/rename - drop(remote_tmp); - // Rename temp to final atomically; overwrite if exists - let _ = sftp.rename(&tmp_remote, &remote_path, Some(ssh2::RenameFlags::OVERWRITE)); - - // Build command - let cmd = format!( - "export SOLSTICE_REPO_URL=\"{}\" SOLSTICE_COMMIT_SHA=\"{}\" SOLSTICE_REQUEST_ID=\"{}\"; {}", - repo_url, commit_sha, request_id, remote_path.display() - ); - - let mut channel = sess.channel_session() - .map_err(OrchestratorError::ChannelSession) - .into_diagnostic()?; - // Best-effort: request a PTY to encourage line-buffered output from the runner - let _ = channel.request_pty("xterm", None, None); - channel.exec(&format!("sh -lc '{}'", cmd)) - .map_err(OrchestratorError::Exec) - .into_diagnostic()?; - - // Switch to non-blocking so we can interleave stdout/stderr without deadlock - sess.set_blocking(false); - - use std::io::Read as _; - let mut out_acc: Vec = Vec::new(); - let mut err_acc: Vec = Vec::new(); - let mut tmp = [0u8; 4096]; - // Read until remote closes - loop { - let mut progressed = false; - match channel.read(&mut tmp) { - Ok(0) => {}, - Ok(n) => { out_acc.extend_from_slice(&tmp[..n]); progressed = true; }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, - Err(e) => return Err(miette::miette!("ssh stdout read error: {}", e)), - } - match channel.stderr().read(&mut tmp) { - Ok(0) => {}, - Ok(n) => { err_acc.extend_from_slice(&tmp[..n]); progressed = true; }, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, - Err(e) => return Err(miette::miette!("ssh stderr read error: {}", e)), - } - if channel.eof() { break; } - if !progressed { - // avoid busy spin - std::thread::sleep(std::time::Duration::from_millis(20)); - } - } - // Restore blocking for status fetch (defensive) - sess.set_blocking(true); - let _ = channel.wait_close(); - let exit_code = channel.exit_status().unwrap_or(1); - - // Convert buffers into lines (strip ANSI color/escape sequences) - let out = strip_ansi(&String::from_utf8_lossy(&out_acc)); - let err = strip_ansi(&String::from_utf8_lossy(&err_acc)); - let mut lines: Vec<(bool, String)> = Vec::new(); - for l in out.lines() { lines.push((false, l.to_string())); } - for l in err.lines() { lines.push((true, l.to_string())); } - Ok((exit_code == 0, exit_code, lines)) - }).await.into_diagnostic()??; - - Ok((ok, code, lines)) -} - fn is_illumos_label(label: &str) -> bool { let l = label.to_ascii_lowercase(); l.contains("illumos") @@ -896,10 +424,10 @@ fn is_illumos_label(label: &str) -> bool { } fn expand_tilde(path: &str) -> PathBuf { - if let Some(rest) = path.strip_prefix("~/") { - if let Ok(home) = std::env::var("HOME") { - return PathBuf::from(home).join(rest); - } + if let Some(rest) = path.strip_prefix("~/") + && let Ok(home) = std::env::var("HOME") + { + return PathBuf::from(home).join(rest); } PathBuf::from(path) } @@ -1048,8 +576,6 @@ mod tests { runner_illumos_path: "/tmp/runner-illumos".into(), ssh_connect_timeout_secs: 30, boot_wait_secs: 0, - libvirt_uri: "qemu:///system".into(), - libvirt_network: "default".into(), }; let sched = Scheduler::new( hv, @@ -1103,8 +629,6 @@ mod tests { runner_illumos_path: "/tmp/runner-illumos".into(), ssh_connect_timeout_secs: 30, boot_wait_secs: 0, - libvirt_uri: "qemu:///system".into(), - libvirt_network: "default".into(), }; let sched = Scheduler::new( hv, diff --git a/crates/runner-integration/src/poller.rs b/crates/runner-integration/src/poller.rs index bf14816..8b6bf99 100644 --- a/crates/runner-integration/src/poller.rs +++ b/crates/runner-integration/src/poller.rs @@ -233,7 +233,7 @@ async fn report_failure( state: Some(TaskState { id: task_id, result: v1::Result::Failure as i32, - started_at: Some(now.clone()), + started_at: Some(now), stopped_at: Some(now), steps: vec![], }), diff --git a/crates/runner-integration/src/reporter.rs b/crates/runner-integration/src/reporter.rs index 559408d..7a45545 100644 --- a/crates/runner-integration/src/reporter.rs +++ b/crates/runner-integration/src/reporter.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use futures_util::StreamExt; use miette::{IntoDiagnostic, Result}; use tokio::sync::watch; -use tracing::{debug, info, warn}; +use tracing::{info, warn}; use crate::connect::ConnectClient; use crate::proto::runner::v1::{self, StepState, TaskState, UpdateLogRequest, UpdateTaskRequest}; @@ -221,8 +221,8 @@ async fn report_to_forgejo( step_states.push(StepState { id: step_idx as i64, result: result as i32, - started_at: Some(now.clone()), - stopped_at: Some(now.clone()), + started_at: Some(now), + stopped_at: Some(now), log_index: cursor, log_length: step_lines, }); @@ -239,8 +239,8 @@ async fn report_to_forgejo( step_states.push(StepState { id: 0, result: result as i32, - started_at: Some(now.clone()), - stopped_at: Some(now.clone()), + started_at: Some(now), + stopped_at: Some(now), log_index: setup_lines, log_length: work_lines, }); diff --git a/crates/runner-integration/src/streamer.rs b/crates/runner-integration/src/streamer.rs index a6733bb..73989e6 100644 --- a/crates/runner-integration/src/streamer.rs +++ b/crates/runner-integration/src/streamer.rs @@ -154,7 +154,7 @@ async fn poll_and_send( let rows: Vec = new_lines .iter() .map(|line| LogRow { - time: Some(now.clone()), + time: Some(now), content: line.clone(), }) .collect(); @@ -208,7 +208,7 @@ async fn poll_and_send( step_states.push(StepState { id: step_idx as i64, result: v1::Result::Unspecified as i32, // still running - started_at: Some(now.clone()), + started_at: Some(now), stopped_at: None, log_index: cursor, log_length: step_lines, diff --git a/crates/runner-integration/src/translator.rs b/crates/runner-integration/src/translator.rs index fab3bac..353490a 100644 --- a/crates/runner-integration/src/translator.rs +++ b/crates/runner-integration/src/translator.rs @@ -30,17 +30,17 @@ pub async fn translate_task(task: &Task, ctx: &TranslateCtx) -> Result