diff --git a/.idea/data_source_mapping.xml b/.idea/data_source_mapping.xml deleted file mode 100644 index 1c4d39c..0000000 --- a/.idea/data_source_mapping.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/crates/migration/src/lib.rs b/crates/migration/src/lib.rs index f7fbd59..27bd15e 100644 --- a/crates/migration/src/lib.rs +++ b/crates/migration/src/lib.rs @@ -9,6 +9,7 @@ impl MigratorTrait for Migrator { Box::new(m2025_10_25_000001_create_jobs::Migration), Box::new(m2025_10_25_000002_create_vms::Migration), Box::new(m2025_11_02_000003_create_job_logs::Migration), + Box::new(m2025_11_15_000004_create_job_ssh_keys::Migration), ] } } @@ -206,3 +207,54 @@ mod m2025_11_02_000003_create_job_logs { } } } + + +mod m2025_11_15_000004_create_job_ssh_keys { + use super::*; + + pub struct Migration; + + impl sea_orm_migration::prelude::MigrationName for Migration { + fn name(&self) -> &str { + "m2025_11_15_000004_create_job_ssh_keys" + } + } + + #[derive(Iden)] + enum JobSshKeys { + Table, + RequestId, + PublicKey, + PrivateKey, + CreatedAt, + } + + #[async_trait::async_trait] + impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(JobSshKeys::Table) + .if_not_exists() + .col(ColumnDef::new(JobSshKeys::RequestId).uuid().not_null().primary_key()) + .col(ColumnDef::new(JobSshKeys::PublicKey).text().not_null()) + .col(ColumnDef::new(JobSshKeys::PrivateKey).text().not_null()) + .col( + ColumnDef::new(JobSshKeys::CreatedAt) + .timestamp_with_time_zone() + .not_null() + .default(Expr::current_timestamp()), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(JobSshKeys::Table).to_owned()) + .await + } + } +} diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index 29ef72d..070ef0f 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -13,16 +13,18 @@ common = { path = "../common" } clap = { version = "4", features = ["derive", "env"] } miette = { version = "7", features = ["fancy"] } tracing = "0.1" -tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "fs", "io-util"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "fs", "io-util", "process", "net"] } serde = { version = "1", features = ["derive"] } serde_json = "1" serde_yaml = "0.9" config = { version = "0.15", default-features = false, features = ["yaml"] } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2", "gzip", "brotli", "zstd"] } -# HTTP server for logs +# HTTP server for logs (runner serving removed) axum = { version = "0.8", features = ["macros"] } -# gRPC server -tonic = { version = "0.14", features = ["transport"] } +# SSH client for upload/exec/logs +ssh2 = "0.9" +ssh-key = { version = "0.6", features = ["ed25519"] } +rand_core = "0.6" # Compression/decompression zstd = "0.13" # DB (optional basic persistence) diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index afa864f..56574e2 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -47,6 +47,8 @@ pub struct ImageDefaults { pub cpu: Option, pub ram_mb: Option, pub disk_gb: Option, + /// Default SSH username for this image (e.g., ubuntu, root) + pub ssh_user: Option, } #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] diff --git a/crates/orchestrator/src/http.rs b/crates/orchestrator/src/http.rs index 2d37dbe..16ffc02 100644 --- a/crates/orchestrator/src/http.rs +++ b/crates/orchestrator/src/http.rs @@ -1,9 +1,7 @@ use axum::{extract::Path, http::StatusCode, response::{IntoResponse, Response}, routing::get, Router}; use std::net::SocketAddr; -use std::path::PathBuf; use std::sync::Arc; -use tokio::fs; -use tracing::{debug, info, warn}; +use tracing::{info, warn}; use uuid::Uuid; use crate::persist::Persist; @@ -11,14 +9,12 @@ use crate::persist::Persist; #[derive(Clone)] pub struct HttpState { persist: Arc, - runner_dir: Option, } -pub fn build_router(persist: Arc, runner_dir: Option) -> Router { - let state = HttpState { persist, runner_dir }; +pub fn build_router(persist: Arc) -> Router { + let state = HttpState { persist }; Router::new() .route("/jobs/{request_id}/logs", get(get_logs)) - .route("/runners/{name}", get(get_runner)) .with_state(state) } @@ -47,43 +43,8 @@ async fn get_logs( } } -async fn get_runner( - Path(name): Path, - axum::extract::State(state): axum::extract::State, -) -> Response { - let Some(dir) = state.runner_dir.as_ref() else { - return (StatusCode::SERVICE_UNAVAILABLE, "runner serving disabled").into_response(); - }; - // Basic validation: prevent path traversal; allow only simple file names - if name.contains('/') || name.contains('\\') || name.starts_with('.') { - return StatusCode::BAD_REQUEST.into_response(); - } - let path = dir.join(&name); - if !path.starts_with(dir) { - return StatusCode::BAD_REQUEST.into_response(); - } - match fs::read(&path).await { - Ok(bytes) => { - debug!(path = %path.display(), size = bytes.len(), "serving runner binary"); - ( - StatusCode::OK, - [ - (axum::http::header::CONTENT_TYPE, "application/octet-stream"), - (axum::http::header::CONTENT_DISPOSITION, &format!("attachment; filename=\"{}\"", name)), - ], - bytes, - ) - .into_response() - } - Err(e) => { - debug!(error = %e, path = %path.display(), "runner file not found"); - StatusCode::NOT_FOUND.into_response() - } - } -} - -pub async fn serve(addr: SocketAddr, persist: Arc, runner_dir: Option, shutdown: impl std::future::Future) { - let app = build_router(persist, runner_dir); +pub async fn serve(addr: SocketAddr, persist: Arc, shutdown: impl std::future::Future) { + let app = build_router(persist); info!(%addr, "http server starting"); let listener = tokio::net::TcpListener::bind(addr).await.expect("bind http"); let server = axum::serve(listener, app); diff --git a/crates/orchestrator/src/hypervisor.rs b/crates/orchestrator/src/hypervisor.rs index a9cad78..d503d9a 100644 --- a/crates/orchestrator/src/hypervisor.rs +++ b/crates/orchestrator/src/hypervisor.rs @@ -33,6 +33,14 @@ pub struct JobContext { pub repo_url: String, pub commit_sha: String, pub workflow_job_id: Option, + /// Username to SSH into the guest. If None, fall back to ExecConfig.ssh_user + pub ssh_user: Option, + /// Path to private key file for this job (preferred over in-memory) + pub ssh_private_key_path: Option, + /// OpenSSH-formatted private key PEM used for SSH auth for this job + pub ssh_private_key_pem: Option, + /// OpenSSH-formatted public key (authorized_keys) for this job + pub ssh_public_key: Option, } #[derive(Debug, Clone)] @@ -371,16 +379,34 @@ impl Hypervisor for LibvirtHypervisor { // Relax permissions on overlay so host qemu can access it let _ = std::fs::set_permissions(&overlay, std::fs::Permissions::from_mode(0o666)); - // Build NoCloud seed ISO if user_data provided + // Build NoCloud seed ISO if user_data provided, or synthesize from per-job SSH key let mut seed_iso: Option = None; - if let Some(ref user_data) = spec.user_data { + // Prefer spec-provided user-data; otherwise, if we have a per-job SSH public key, generate minimal cloud-config + let user_data_opt: Option> = if let Some(ref ud) = spec.user_data { + Some(ud.clone()) + } else if let Some(ref pubkey) = ctx.ssh_public_key { + let s = format!( + r#"#cloud-config +users: + - default +ssh_authorized_keys: + - {ssh_pubkey} +"#, + ssh_pubkey = pubkey.trim() + ); + Some(s.into_bytes()) + } else { + None + }; + + if let Some(user_data) = user_data_opt { let seed_dir = work_dir.join("seed"); tokio::fs::create_dir_all(&seed_dir) .await .into_diagnostic()?; let ud_path = seed_dir.join("user-data"); let md_path = seed_dir.join("meta-data"); - tokio::fs::write(&ud_path, user_data) + tokio::fs::write(&ud_path, &user_data) .await .into_diagnostic()?; let meta = format!("instance-id: {}\nlocal-hostname: {}\n", id, id); diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 335974d..ed38051 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -1,11 +1,12 @@ mod config; -mod grpc; mod hypervisor; mod persist; mod scheduler; mod http; use std::{collections::HashMap, path::PathBuf, time::Duration}; +#[cfg(unix)] +use std::os::unix::fs::PermissionsExt; use clap::Parser; use miette::{IntoDiagnostic as _, Result}; @@ -14,7 +15,7 @@ use tracing::{info, warn}; use crate::persist::{JobState, Persist}; use config::OrchestratorConfig; use hypervisor::{JobContext, RouterHypervisor, VmSpec}; -use scheduler::{SchedItem, Scheduler}; +use scheduler::{SchedItem, Scheduler, ExecConfig}; use std::sync::Arc; use tokio::sync::Notify; @@ -29,10 +30,6 @@ struct Opts { #[arg(long, env = "ORCH_CONFIG")] config: Option, - /// Directory to serve runner binaries from (filenames are requested via /runners/{name}) - #[arg(long, env = "RUNNER_DIR")] - runner_dir: Option, - /// Global max concurrency for VM provisioning/execution #[arg(long, env = "MAX_CONCURRENCY", default_value_t = 2)] max_concurrency: usize, @@ -45,20 +42,6 @@ struct Opts { #[arg(long, env = "CAPACITY_MAP")] capacity_map: Option, - /// gRPC listen address (e.g., 0.0.0.0:50051) - #[arg(long, env = "GRPC_ADDR", default_value = "0.0.0.0:50051")] - grpc_addr: String, - - /// Public contact address for runners to stream logs to (host:port). Overrides detection. - #[arg(long = "orch-contact-addr", env = "ORCH_CONTACT_ADDR")] - orch_contact_addr: Option, - - /// Public base URL where runner binaries are served (preferred). Example: https://runner.svc.domain - /// The orchestrator will append /runners/{filename} to construct full URLs. - #[arg(long = "runner-base-url", env = "SOLSTICE_RUNNER_BASE_URL")] - runner_base_url: Option, - - /// Postgres connection string (if empty, persistence is disabled) #[arg( long, @@ -99,13 +82,26 @@ struct Opts { #[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")] otlp: Option, - /// Placeholder VM run time in seconds (temporary until agent wiring) - #[arg(long, env = "VM_PLACEHOLDER_RUN_SECS", default_value_t = 3600)] - vm_placeholder_run_secs: u64, - /// HTTP listen address for exposing basic endpoints (logs) #[arg(long, env = "HTTP_ADDR", default_value = "0.0.0.0:8081")] http_addr: String, + + + /// Local path to Linux runner binary to upload + #[arg(long, env = "RUNNER_LINUX_PATH", default_value = "./target/release/solstice-runner-linux")] + runner_linux_path: String, + + /// Local path to illumos runner binary to upload + #[arg(long, env = "RUNNER_ILLUMOS_PATH", default_value = "./target/release/solstice-runner-illumos")] + runner_illumos_path: String, + + /// Remote path where runner will be uploaded and executed + #[arg(long, env = "REMOTE_RUNNER_PATH", default_value = "/usr/local/bin/solstice-runner")] + remote_runner_path: String, + + /// SSH connect timeout (seconds) + #[arg(long, env = "SSH_CONNECT_TIMEOUT_SECS", default_value_t = 300)] + ssh_connect_timeout_secs: u64, } #[tokio::main(flavor = "multi_thread")] @@ -114,7 +110,7 @@ async fn main() -> Result<()> { let app_cfg = common::AppConfig::load("orchestrator")?; let _t = common::init_tracing("solstice-orchestrator")?; let opts = Opts::parse(); - info!(grpc_addr = %opts.grpc_addr, db = %opts.database_url, amqp = %opts.amqp_url, "orchestrator starting"); + info!(db = %opts.database_url, amqp = %opts.amqp_url, "orchestrator starting"); // Load orchestrator config (image map) and ensure images are present locally let cfg = OrchestratorConfig::load(opts.config.as_deref()).await?; @@ -149,54 +145,24 @@ async fn main() -> Result<()> { // prefetch: if not provided, default to max_concurrency mq_cfg.prefetch = opts.amqp_prefetch.unwrap_or(opts.max_concurrency as u16); - // Start gRPC server for runner log streaming - let grpc_addr: std::net::SocketAddr = opts.grpc_addr.parse().into_diagnostic()?; - let mq_cfg_for_grpc = mq_cfg.clone(); - let persist_for_grpc = persist.clone(); - let (grpc_shutdown_tx, grpc_shutdown_rx) = tokio::sync::oneshot::channel::<()>(); - let grpc_task = tokio::spawn(async move { - let _ = crate::grpc::serve_with_shutdown(grpc_addr, mq_cfg_for_grpc, persist_for_grpc, async move { - let _ = grpc_shutdown_rx.await; - }) - .await; - }); - - // Orchestrator contact address for runner to dial back (auto-detect if not provided) - let orch_contact = match &opts.orch_contact_addr { - Some(v) if !v.trim().is_empty() => v.clone(), - _ => detect_contact_addr(&opts), - }; - info!(contact = %orch_contact, "orchestrator contact address determined"); - - // Compose default runner URLs served by this orchestrator (if runner_dir configured) - let (runner_url_default, runner_urls_default) = if opts.runner_dir.is_some() { - // Derive host from ORCH_CONTACT_ADDR (host:port) and port from HTTP_ADDR - let http_host = orch_contact.split(':').next().unwrap_or("127.0.0.1"); - let http_port = opts - .http_addr - .rsplit(':') - .next() - .unwrap_or("8081"); - let base = format!("http://{}:{}", http_host, http_port); - let (single_url, multi_urls) = build_runner_urls(&base); - // Log concrete OS URLs for local serving - let mut parts = multi_urls.split_whitespace(); - let linux_url = parts.next().unwrap_or(""); - let illumos_url = parts.next().unwrap_or(""); - info!(linux = %linux_url, illumos = %illumos_url, "serving runner binaries via orchestrator HTTP"); - (single_url, multi_urls) - } else { - (String::new(), String::new()) - }; + // No gRPC runner log streaming; logs are collected via SSH session or serial console. // Scheduler + let exec_cfg = ExecConfig { + remote_runner_path: opts.remote_runner_path.clone(), + runner_linux_path: opts.runner_linux_path.clone(), + runner_illumos_path: opts.runner_illumos_path.clone(), + ssh_connect_timeout_secs: opts.ssh_connect_timeout_secs, + }; + let sched = Scheduler::new( router, opts.max_concurrency, &capacity_map, persist.clone(), - Duration::from_secs(opts.vm_placeholder_run_secs), + Duration::from_secs(3600), Arc::new(mq_cfg.clone()), + exec_cfg, ); let sched_shutdown = Arc::new(Notify::new()); let sched_tx = sched.sender(); @@ -207,28 +173,18 @@ async fn main() -> Result<()> { } }); - // Determine runner URLs to inject into cloud-init (prefer base URL) - let (runner_url_env, runner_urls_env) = if let Some(base) = opts - .runner_base_url - .as_ref() - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - { - let (u1, u2) = build_runner_urls(base); - info!(base = %base, url = %u1, urls = %u2, "using public runner base URL"); - (u1, u2) - } else { - // Fall back to URLs served by this orchestrator's HTTP (when RUNNER_DIR configured) - (runner_url_default.clone(), runner_urls_default.clone()) - }; + // Determine runner URLs to inject into cloud-init with precedence: + // 1) Explicit SOLSTICE_RUNNER_URL(S) overrides (used verbatim) + // 2) Public base URL (SOLSTICE_RUNNER_BASE_URL) -> append /runners/{filename} + // 3) Local serving via orchestrator HTTP when RUNNER_DIR is configured + // Runner URLs removed; SSH will be used to upload and execute the runner. // Consumer: enqueue and ack-on-accept let cfg_clone = cfg.clone(); let mq_cfg_clone = mq_cfg.clone(); let tx_for_consumer = sched_tx.clone(); let persist_for_consumer = persist.clone(); - let runner_url_for_consumer = runner_url_env.clone(); - let runner_urls_for_consumer = runner_urls_env.clone(); + let default_ssh_user = std::sync::Arc::new(String::from("ubuntu")); // Start consumer that can be shut down cooperatively on ctrl-c let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let consumer_task = tokio::spawn(async move { @@ -238,9 +194,7 @@ async fn main() -> Result<()> { let sched_tx = tx_for_consumer.clone(); let cfg = cfg_clone.clone(); let persist = persist_for_consumer.clone(); - let orch_contact_val = orch_contact.clone(); - let runner_url_in = runner_url_for_consumer.clone(); - let runner_urls_in = runner_urls_for_consumer.clone(); + let default_ssh_user = default_ssh_user.clone(); async move { let label_resolved = cfg.resolve_label(job.runs_on.as_deref()).unwrap_or(&cfg.default_label).to_string(); let image = match cfg.image_for(&label_resolved) { @@ -252,6 +206,64 @@ async fn main() -> Result<()> { }; // Record job queued state let _ = persist.record_job_state(job.request_id, &job.repo_url, &job.commit_sha, job.runs_on.as_deref(), JobState::Queued).await; + + // Load per-job SSH keypair from DB when available; otherwise generate and persist + let (pubkey_text, privkey_text) = match persist.get_job_ssh_keys(job.request_id).await { + Ok(Some((pubk, privk))) => { + info!(request_id = %job.request_id, "loaded existing per-job SSH keys from DB"); + (pubk, privk) + } + Ok(None) | Err(_) => { + // Generate per-job Ed25519 SSH keypair (OpenSSH formats) and persist it + use ssh_key::{Algorithm, PrivateKey, rand_core::OsRng}; + let mut rng = OsRng; + let sk = PrivateKey::random(&mut rng, Algorithm::Ed25519) + .map_err(|e| miette::miette!("ssh keygen failed: {e}"))?; + let pk = sk.public_key(); + let pub_txt = pk.to_openssh().map_err(|e| miette::miette!("encode pubkey: {e}"))?; + // Encode private key in OpenSSH format (Zeroizing -> String) + let priv_txt_z = sk.to_openssh(ssh_key::LineEnding::LF).map_err(|e| miette::miette!("encode privkey: {e}"))?; + let priv_txt: String = priv_txt_z.to_string(); + let _ = persist.save_job_ssh_keys(job.request_id, &pub_txt, &priv_txt).await; + info!(request_id = %job.request_id, "generated and saved new per-job SSH keys"); + (pub_txt, priv_txt) + } + }; + + // Determine SSH username from image defaults or fall back to CLI option + let ssh_user = image + .defaults + .as_ref() + .and_then(|d| d.ssh_user.clone()) + .unwrap_or_else(|| (*default_ssh_user).clone()); + + // Create per-job key directory and write keys to disk + let keys_dir = { + let id = job.request_id.to_string(); + let preferred = std::path::Path::new("/var/lib/solstice-ci/keys").join(&id); + if std::fs::create_dir_all(&preferred).is_ok() { + // lock down directory if possible + #[cfg(unix)] + let _ = std::fs::set_permissions(&preferred, std::fs::Permissions::from_mode(0o700)); + preferred + } else { + let fallback = std::env::temp_dir().join("solstice-keys").join(&id); + let _ = std::fs::create_dir_all(&fallback); + #[cfg(unix)] + let _ = std::fs::set_permissions(&fallback, std::fs::Permissions::from_mode(0o700)); + fallback + } + }; + let priv_path = keys_dir.join("id_ed25519"); + let pub_path = keys_dir.join("id_ed25519.pub"); + // Best-effort writes; errors are non-fatal since we still keep the key in-memory and in DB + let _ = std::fs::write(&priv_path, privkey_text.as_bytes()); + #[cfg(unix)] + let _ = std::fs::set_permissions(&priv_path, std::fs::Permissions::from_mode(0o600)); + let _ = std::fs::write(&pub_path, pubkey_text.as_bytes()); + #[cfg(unix)] + let _ = std::fs::set_permissions(&pub_path, std::fs::Permissions::from_mode(0o644)); + // Build spec let (cpu, ram_mb, disk_gb) = ( image.defaults.as_ref().and_then(|d| d.cpu).unwrap_or(2), @@ -268,7 +280,7 @@ async fn main() -> Result<()> { disk_gb, network: None, // libvirt network handled in backend nocloud: image.nocloud, - user_data: Some(make_cloud_init_userdata(&job.repo_url, &job.commit_sha, job.request_id, &orch_contact_val, &runner_url_in, &runner_urls_in)), + user_data: Some(make_cloud_init_userdata(&job.repo_url, &job.commit_sha, job.request_id, &pubkey_text)), }; if !spec.nocloud { warn!(label = %label_resolved, "image is not marked nocloud=true; cloud-init may not work"); @@ -278,6 +290,10 @@ async fn main() -> Result<()> { repo_url: job.repo_url, commit_sha: job.commit_sha, workflow_job_id: job.workflow_job_id, + ssh_user: Some(ssh_user), + ssh_private_key_path: Some(priv_path.display().to_string()), + ssh_private_key_pem: Some(privkey_text.to_string()), + ssh_public_key: Some(pubkey_text), }; sched_tx.send(SchedItem { spec, ctx, original }).await.into_diagnostic()?; Ok(()) // ack on accept @@ -288,10 +304,9 @@ async fn main() -> Result<()> { // Start HTTP server let http_addr: std::net::SocketAddr = opts.http_addr.parse().into_diagnostic()?; let persist_for_http = persist.clone(); - let runner_dir_for_http = opts.runner_dir.clone(); let (http_shutdown_tx, http_shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let http_task = tokio::spawn(async move { - crate::http::serve(http_addr, persist_for_http, runner_dir_for_http, async move { + http::serve(http_addr, persist_for_http, async move { let _ = http_shutdown_rx.await; }).await; }); @@ -305,17 +320,14 @@ async fn main() -> Result<()> { // Ask scheduler to shut down cooperatively (interrupt placeholders) sched_shutdown.notify_waiters(); - // Stop gRPC server - let _ = grpc_shutdown_tx.send(()); - // Stop HTTP server let _ = http_shutdown_tx.send(()); // Drop sender to let scheduler drain and exit drop(sched_tx); - // Wait for consumer, scheduler and grpc to finish concurrently - let (_c_res, _s_res, _g_res, _h_res) = tokio::join!(consumer_task, scheduler_task, grpc_task, http_task); + // Wait for consumer, scheduler and http to finish concurrently + let (_c_res, _s_res, _h_res) = tokio::join!(consumer_task, scheduler_task, http_task); Ok(()) } @@ -341,147 +353,19 @@ fn parse_capacity_map(s: Option<&str>) -> HashMap { m } -fn detect_contact_addr(opts: &Opts) -> String { - // Extract host and port from GRPC_ADDR (format host:port). - let (host_part, port_part) = match opts.grpc_addr.rsplit_once(':') { - Some((h, p)) => (h.to_string(), p.to_string()), - None => (opts.grpc_addr.clone(), String::from("")), - }; - - // If host is already a specific address (not any/unspecified), keep it. - let host_trim = host_part.trim(); - let is_unspecified = host_trim == "0.0.0.0" || host_trim == "::" || host_trim == "[::]" || host_trim.is_empty(); - if !is_unspecified { - return opts.grpc_addr.clone(); - } - - // Try platform-specific detection - #[cfg(all(target_os = "linux"))] - { - // Attempt to read libvirt network XML to obtain the NAT gateway IP (reachable from guests). - if let Some(ip) = detect_libvirt_network_ip(&opts.libvirt_network) { - let port = if port_part.is_empty() { String::from("50051") } else { port_part.clone() }; - return format!("{}:{}", ip, port); - } - // Fallback to external IP detection - if let Some(ip) = detect_external_ip() { - let port = if port_part.is_empty() { String::from("50051") } else { port_part.clone() }; - return format!("{}:{}", ip, port); - } - // Last resort - return format!("127.0.0.1:{}", if port_part.is_empty() { "50051" } else { &port_part }); - } - - #[cfg(target_os = "illumos")] - { - if let Some(ip) = detect_external_ip() { - let port = if port_part.is_empty() { String::from("50051") } else { port_part.clone() }; - return format!("{}:{}", ip, port); - } - return format!("127.0.0.1:{}", if port_part.is_empty() { "50051" } else { &port_part }); - } - - // Other platforms: best-effort external IP - if let Some(ip) = detect_external_ip() { - let port = if port_part.is_empty() { String::from("50051") } else { port_part }; - return format!("{}:{}", ip, port); - } - opts.grpc_addr.clone() -} - -#[cfg(any(target_os = "linux", target_os = "illumos"))] -fn detect_external_ip() -> Option { - use std::net::{SocketAddr, UdpSocket}; - // UDP connect trick: no packets are actually sent, but OS picks a route and local addr. - let target: SocketAddr = "1.1.1.1:80".parse().ok()?; - let sock = UdpSocket::bind("0.0.0.0:0").ok()?; - sock.connect(target).ok()?; - let local = sock.local_addr().ok()?; - Some(local.ip().to_string()) -} - -#[cfg(target_os = "linux")] -fn detect_libvirt_network_ip(name: &str) -> Option { - use std::fs; - let candidates = vec![ - format!("/etc/libvirt/qemu/networks/{}.xml", name), - format!("/etc/libvirt/qemu/networks/autostart/{}.xml", name), - format!("/var/lib/libvirt/network/{}.xml", name), - format!("/var/lib/libvirt/qemu/networks/{}.xml", name), - ]; - for p in candidates { - if let Ok(xml) = fs::read_to_string(&p) { - // Look for or with double quotes - if let Some(ip) = extract_ip_from_libvirt_network_xml(&xml) { - return Some(ip); - } - } - } - None -} - -#[cfg(target_os = "linux")] -fn extract_ip_from_libvirt_network_xml(xml: &str) -> Option { - // Very small string-based parser to avoid extra dependencies - // Find "').map(|e| s + e).unwrap_or(xml.len()); - let segment = &xml[s..end]; - if let Some(val) = extract_attr_value(segment, "address") { - return Some(val.to_string()); - } - idx = end + 1; - if idx >= xml.len() { break; } - } - None -} - -#[cfg(target_os = "linux")] -fn extract_attr_value<'a>(tag: &'a str, key: &'a str) -> Option<&'a str> { - // Search for key='value' or key="value" - if let Some(pos) = tag.find(key) { - let rest = &tag[pos + key.len()..]; - let rest = rest.trim_start(); - if rest.starts_with('=') { - let rest = &rest[1..].trim_start(); - let quote = rest.chars().next()?; - if quote == '\'' || quote == '"' { - let rest2 = &rest[1..]; - if let Some(end) = rest2.find(quote) { - return Some(&rest2[..end]); - } - } - } - } - None -} - -fn build_runner_urls(base: &str) -> (String, String) { - // Normalize base and ensure it ends with /runners - let trimmed = base.trim_end_matches('/'); - let base = if trimmed.ends_with("/runners") { - trimmed.to_string() - } else { - format!("{}/runners", trimmed) - }; - let single = format!("{}/{}", base, "solstice-runner"); - let linux = format!("{}/{}", base, "solstice-runner-linux"); - let illumos = format!("{}/{}", base, "solstice-runner-illumos"); - (single, format!("{} {}", linux, illumos)) -} fn make_cloud_init_userdata( repo_url: &str, commit_sha: &str, - request_id: uuid::Uuid, - orch_addr: &str, - runner_url: &str, - runner_urls: &str, + _request_id: uuid::Uuid, + ssh_pubkey: &str, ) -> Vec { let s = format!( r#"#cloud-config +users: + - default +ssh_authorized_keys: + - {ssh_pubkey} write_files: - path: /etc/solstice/job.yaml permissions: '0644' @@ -489,82 +373,10 @@ write_files: content: | repo_url: {repo} commit_sha: {sha} - - path: /usr/local/bin/solstice-bootstrap.sh - permissions: '0755' - owner: root:root - content: | - #!/bin/sh - set -eu - echo "Solstice: bootstrapping workflow runner for {sha}" | tee /dev/console - RUNNER="/usr/local/bin/solstice-runner" - # Runner URL(s) provided by orchestrator (local dev) if set - RUNNER_SINGLE='{runner_url}' - RUNNER_URLS='{runner_urls}' - if [ ! -x "$RUNNER" ]; then - mkdir -p /usr/local/bin - # Helper to download from a URL to $RUNNER - fetch_runner() {{ - U="$1" - [ -z "$U" ] && return 1 - if command -v curl >/dev/null 2>&1; then - curl -fSL "$U" -o "$RUNNER" || return 1 - elif command -v wget >/dev/null 2>&1; then - wget -O "$RUNNER" "$U" || return 1 - else - return 1 - fi - chmod +x "$RUNNER" 2>/dev/null || true - return 0 - }} - OS=$(uname -s 2>/dev/null || echo unknown) - # Prefer single URL if provided - if [ -n "$RUNNER_SINGLE" ]; then - fetch_runner "$RUNNER_SINGLE" || true - fi - # If still missing, iterate URLs with a basic OS-based preference - if [ ! -x "$RUNNER" ] && [ -n "$RUNNER_URLS" ]; then - for U in $RUNNER_URLS; do - case "$OS" in - Linux) - echo "$U" | grep -qi linux || continue ;; - SunOS) - echo "$U" | grep -qi illumos || continue ;; - *) ;; - esac - fetch_runner "$U" && break || true - done - fi - # As a final fallback, try all URLs regardless of OS tag - if [ ! -x "$RUNNER" ] && [ -n "$RUNNER_URLS" ]; then - for U in $RUNNER_URLS; do - fetch_runner "$U" && break || true - done - fi - if [ ! -x "$RUNNER" ]; then - echo 'runner URL(s) not provided or curl/wget missing' | tee /dev/console - fi - fi - export SOLSTICE_REPO_URL='{repo}' - export SOLSTICE_COMMIT_SHA='{sha}' - export SOLSTICE_JOB_FILE='/etc/solstice/job.yaml' - export SOLSTICE_ORCH_ADDR='{orch_addr}' - export SOLSTICE_REQUEST_ID='{req_id}' - if [ -x "$RUNNER" ]; then - "$RUNNER" || true - else - echo 'solstice-runner not found; nothing to execute' | tee /dev/console - fi - echo 'Solstice: runner complete, powering off' | tee /dev/console - (command -v poweroff >/dev/null 2>&1 && poweroff) || (command -v shutdown >/dev/null 2>&1 && shutdown -y -i5 -g0) || true -runcmd: - - [ /usr/local/bin/solstice-bootstrap.sh ] "#, repo = repo_url, sha = commit_sha, - req_id = request_id, - orch_addr = orch_addr, - runner_url = runner_url, - runner_urls = runner_urls + ssh_pubkey = ssh_pubkey.trim(), ); s.into_bytes() } @@ -586,18 +398,6 @@ mod tests { assert!(m.get("other").is_none()); } - #[test] - fn test_build_runner_urls_variants() { - let (s, m) = super::build_runner_urls("https://runner.svc.example"); - assert_eq!(s, "https://runner.svc.example/runners/solstice-runner"); - assert_eq!(m, "https://runner.svc.example/runners/solstice-runner-linux https://runner.svc.example/runners/solstice-runner-illumos"); - let (s2, m2) = super::build_runner_urls("https://runner.svc.example/"); - assert_eq!(s2, s); - assert_eq!(m2, m); - let (s3, m3) = super::build_runner_urls("https://runner.svc.example/runners"); - assert_eq!(s3, s); - assert_eq!(m3, m); - } #[test] fn test_make_cloud_init_userdata_includes_fields() { @@ -606,19 +406,13 @@ mod tests { "https://example.com/repo.git", "deadbeef", req_id, - "127.0.0.1:50051", - "http://127.0.0.1:8081/runners/solstice-runner", - "http://127.0.0.1:8081/runners/solstice-runner-linux http://127.0.0.1:8081/runners/solstice-runner-illumos", + "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIEfakepubkey user@example", ); let s = String::from_utf8(data).unwrap(); assert!(s.contains("#cloud-config")); assert!(s.contains("repo_url: https://example.com/repo.git")); assert!(s.contains("commit_sha: deadbeef")); - assert!(s.contains("write_files:")); + assert!(s.contains("ssh_authorized_keys:")); assert!(s.contains("/etc/solstice/job.yaml")); - assert!(s.contains("runcmd:")); - assert!(s.contains("powering off")); - assert!(s.contains("SOLSTICE_ORCH_ADDR")); - assert!(s.contains(&req_id.to_string())); } } diff --git a/crates/orchestrator/src/persist.rs b/crates/orchestrator/src/persist.rs index 8936b66..a8c86a2 100644 --- a/crates/orchestrator/src/persist.rs +++ b/crates/orchestrator/src/persist.rs @@ -2,7 +2,7 @@ use chrono::Utc; use miette::{IntoDiagnostic as _, Result}; use sea_orm::sea_query::{Expr, OnConflict}; use sea_orm::{ - entity::prelude::*, ActiveModelTrait, ColumnTrait, Database, DatabaseConnection, QueryFilter, + entity::prelude::*, ColumnTrait, Database, DatabaseConnection, QueryFilter, Set, }; use sea_orm::QueryOrder; @@ -125,7 +125,58 @@ mod job_logs { impl ActiveModelBehavior for ActiveModel {} } +mod job_ssh_keys { + use super::*; + + #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] + #[sea_orm(table_name = "job_ssh_keys")] + pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub request_id: Uuid, + pub public_key: String, + pub private_key: String, + pub created_at: chrono::DateTime, + } + + #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] + pub enum Relation {} + + impl ActiveModelBehavior for ActiveModel {} +} + impl Persist { + /// Save per-job SSH keys (public + private OpenSSH format). No-op if persistence disabled. + pub async fn save_job_ssh_keys(&self, request_id: Uuid, public_key: &str, private_key: &str) -> Result<()> { + let Some(db) = self.db.as_ref() else { return Ok(()); }; + let now = Utc::now(); + let am = job_ssh_keys::ActiveModel { + request_id: Set(request_id), + public_key: Set(public_key.to_string()), + private_key: Set(private_key.to_string()), + created_at: Set(now), + }; + job_ssh_keys::Entity::insert(am) + .on_conflict( + OnConflict::column(job_ssh_keys::Column::RequestId) + .update_columns([ + job_ssh_keys::Column::PublicKey, + job_ssh_keys::Column::PrivateKey, + job_ssh_keys::Column::CreatedAt, + ]) + .to_owned(), + ) + .exec(db) + .await + .into_diagnostic()?; + Ok(()) + } + + /// Load per-job SSH keys; returns None if absent or persistence disabled. + pub async fn get_job_ssh_keys(&self, request_id: Uuid) -> Result> { + let Some(db) = self.db.as_ref() else { return Ok(None); }; + let row = job_ssh_keys::Entity::find_by_id(request_id).one(db).await.into_diagnostic()?; + Ok(row.map(|r| (r.public_key, r.private_key))) + } /// Initialize persistence. /// - If `database_url` is Some(non-empty), attempt to connect and run migrations. /// On any failure, return an error to fail-fast so operators notice misconfiguration. diff --git a/crates/orchestrator/src/scheduler.rs b/crates/orchestrator/src/scheduler.rs index 280ffaf..19ea2a0 100644 --- a/crates/orchestrator/src/scheduler.rs +++ b/crates/orchestrator/src/scheduler.rs @@ -1,14 +1,17 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; - +use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use dashmap::DashMap; -use miette::Result; +use miette::{IntoDiagnostic as _, Result}; use tokio::sync::{Notify, Semaphore, mpsc}; use tracing::{error, info, warn}; -use common::{publish_deadletter, DeadLetter, JobRequest}; +use common::{publish_deadletter, publish_job_result, DeadLetter, JobRequest, messages::JobResult}; use crate::hypervisor::{BackendTag, Hypervisor, JobContext, VmSpec}; use crate::persist::{JobState, Persist, VmPersistState}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use uuid::Uuid; +use tokio::fs::File; + pub struct Scheduler { mq_cfg: Arc, hv: Arc, @@ -18,6 +21,7 @@ pub struct Scheduler { label_sems: Arc, persist: Arc, placeholder_runtime: Duration, + exec: Arc, } type DashmapType = DashMap>; @@ -29,6 +33,13 @@ pub struct SchedItem { pub original: Option, } +pub struct ExecConfig { + pub remote_runner_path: String, + pub runner_linux_path: String, + pub runner_illumos_path: String, + pub ssh_connect_timeout_secs: u64, +} + impl Scheduler { pub fn new( hv: H, @@ -37,6 +48,7 @@ impl Scheduler { persist: Arc, placeholder_runtime: Duration, mq_cfg: Arc, + exec: ExecConfig, ) -> Self { let (tx, rx) = mpsc::channel::(max_concurrency * 4); let label_sems = DashMap::new(); @@ -52,6 +64,7 @@ impl Scheduler { label_sems: Arc::new(label_sems), persist, placeholder_runtime, + exec: Arc::new(exec), } } @@ -67,16 +80,16 @@ impl Scheduler { global_sem, label_sems, persist, - placeholder_runtime, + placeholder_runtime: _placeholder_runtime, + exec, .. } = self; let mut handles = Vec::new(); - let mut shutting_down = false; + let exec_shared = exec.clone(); 'scheduler: loop { tokio::select! { - // Only react once to shutdown and then focus on draining the channel - _ = shutdown.notified(), if !shutting_down => { - shutting_down = true; + // React to shutdown and break; tasks in-flight will finish and be awaited below + _ = shutdown.notified() => { info!("scheduler: shutdown requested; will drain queue and interrupt placeholders"); break 'scheduler; } @@ -87,8 +100,8 @@ impl Scheduler { let global = global_sem.clone(); let label_sems = label_sems.clone(); let persist = persist.clone(); - let shutdown = shutdown.clone(); let mq_cfg_in = mq_cfg.clone(); + let exec_shared_inner = exec_shared.clone(); let handle = tokio::spawn(async move { // Acquire global and label permits (owned permits so they live inside the task) let _g = match global.acquire_owned().await { Ok(p) => p, Err(_) => return }; @@ -125,76 +138,86 @@ impl Scheduler { } let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Running).await; let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Running).await; - info!(request_id = %item.ctx.request_id, label = %label_key, "vm started (monitoring for completion)"); - // Monitor VM state until it stops or until placeholder_runtime elapses; end early on shutdown - let start_time = std::time::Instant::now(); - let mut timed_out = false; - let mut natural_stop = false; - loop { - // Check current state first - if let Ok(crate::hypervisor::VmState::Stopped) = hv.state(&h).await { - info!(request_id = %item.ctx.request_id, label = %label_key, "vm reported stopped"); - natural_stop = true; - break; - } - if start_time.elapsed() >= placeholder_runtime { - timed_out = true; - break; - } - // Wait either for shutdown signal or a short delay before next poll - tokio::select! { - _ = shutdown.notified() => { - info!(request_id = %item.ctx.request_id, label = %label_key, "shutdown: ending early"); - break; + info!(request_id = %item.ctx.request_id, label = %label_key, "vm started; establishing SSH session to run workflow"); + + // Start serial console tailer to capture early boot logs into job log + let console_path = h.work_dir.join("console.log"); + let persist_for_tailer = persist.clone(); + let req_for_tailer = item.ctx.request_id; + let mut tailer_opt = Some(tokio::spawn(async move { + let _ = tail_console_to_joblog(persist_for_tailer, req_for_tailer, console_path).await; + })); + + // Attempt to discover guest IP (libvirt only) and run the runner over SSH + let mut success = false; + let mut exit_code: i32 = 1; + + #[cfg(all(target_os = "linux", feature = "libvirt"))] + { + let exec_cfg = exec_shared_inner.clone(); + match discover_guest_ip_virsh(&h.id, Duration::from_secs(exec_cfg.ssh_connect_timeout_secs.min(300))).await { + Some(ip) => { + let ip_owned = ip.clone(); + let user = item.ctx.ssh_user.clone().unwrap_or_else(|| "ubuntu".to_string()); + let per_job_key_path = item.ctx.ssh_private_key_path.as_ref().map(|s| expand_tilde(s)); + let key_mem_opt = item.ctx.ssh_private_key_pem.clone(); + // Choose correct runner binary based on label (illumos vs linux) + let runner_path = if is_illumos_label(&item.spec.label) { + &exec_cfg.runner_illumos_path + } else { + &exec_cfg.runner_linux_path + }; + let local_runner = expand_tilde(runner_path); + info!(label = %item.spec.label, runner = %runner_path, "selected runner binary"); + let remote_path = PathBuf::from(&exec_cfg.remote_runner_path); + let repo_url = item.ctx.repo_url.clone(); + let commit_sha = item.ctx.commit_sha.clone(); + let request_id = item.ctx.request_id; + match run_job_via_ssh_owned(ip_owned, user, key_mem_opt, per_job_key_path, local_runner, remote_path, repo_url, commit_sha, request_id).await { + Ok((ok, code, lines)) => { + success = ok; exit_code = code; + // Persist lines + let mut seq: i64 = 0; + for (is_stderr, line) in lines { + let _ = persist.record_log_line(item.ctx.request_id, seq, is_stderr, &line).await; + seq += 1; + } + }, + Err(e) => { warn!(error = %e, request_id = %item.ctx.request_id, ip = %ip, "ssh runner execution failed"); } + } + } + None => { + warn!(request_id = %item.ctx.request_id, label = %label_key, "failed to determine guest IP via virsh"); } - _ = tokio::time::sleep(Duration::from_secs(2)) => {} } } - if timed_out { - // Freeze/suspend VM for debugging; keep artifacts and domain defined - if let Err(e) = hv.suspend(&h).await { - warn!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to suspend VM after timeout"); - } - let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Stopped).await; - let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), JobState::Failed).await; - // Log where to find console log for libvirt guests - let console_hint = h.work_dir.join("console.log"); - info!(request_id = %item.ctx.request_id, label = %label_key, domain = %h.id, console = %console_hint.display(), "vm timeout: suspended and kept for debugging"); - info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, "failed"); - } else { - // Stop and destroy on natural completion - if let Err(e) = hv.stop(&h, Duration::from_secs(10)).await { - error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to stop VM"); - } - let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Stopped).await; - // Heuristic: if the VM stopped very quickly and we have no runner logs, treat as failure - let ran_secs = start_time.elapsed().as_secs(); - let logs_text = match persist.get_logs_text(item.ctx.request_id).await { - Ok(opt) => opt.unwrap_or_default(), - Err(e) => { warn!(error = %e, request_id = %item.ctx.request_id, "failed to fetch logs for completion check"); String::new() } - }; - let logs_nonempty = logs_text.trim().chars().next().is_some(); - let mut succeeded = true; - let mut reason: &str = "succeeded"; - if ran_secs < 15 && !logs_nonempty { - // Likely cloud-init/runner never started; mark failed - succeeded = false; - reason = "failed"; - warn!(request_id = %item.ctx.request_id, label = %label_key, ran_secs, "vm stopped quickly with no logs; marking job failed"); - } - if let Err(e) = hv.destroy(h.clone()).await { - error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to destroy VM"); - } - let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Destroyed).await; - let _ = persist.record_job_state( - item.ctx.request_id, - &item.ctx.repo_url, - &item.ctx.commit_sha, - Some(&item.spec.label), - if succeeded { JobState::Succeeded } else { JobState::Failed } - ).await; - info!(request_id = %item.ctx.request_id, label = %label_key, "job finished: {} {}", item.ctx.request_id, reason); + + #[cfg(not(all(target_os = "linux", feature = "libvirt")))] + { + warn!(request_id = %item.ctx.request_id, label = %label_key, "SSH execution not supported on this platform/backend; skipping"); } + + // Stop and destroy VM after attempting execution + if let Err(e) = hv.stop(&h, Duration::from_secs(10)).await { + error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to stop VM"); + } + let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Stopped).await; + if let Err(e) = hv.destroy(h.clone()).await { + error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to destroy VM"); + } + let _ = persist.record_vm_event(item.ctx.request_id, &h.id, overlay, seed, backend, VmPersistState::Destroyed).await; + + // Stop console tailer + if let Some(t) = tailer_opt.take() { t.abort(); } + + // Persist final state and publish result + let final_state = if success { JobState::Succeeded } else { JobState::Failed }; + let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), final_state).await; + let mut result = JobResult::new(item.ctx.request_id, item.ctx.repo_url.clone(), item.ctx.commit_sha.clone(), success, exit_code, None); + if let Err(e) = publish_job_result(&mq_cfg_in, &result).await { + warn!(error = %e, request_id = %item.ctx.request_id, "failed to publish JobResult"); + } + info!(request_id = %item.ctx.request_id, label = %label_key, exit_code, success, "job finished"); } Err(e) => { error!(error = %e, request_id = %item.ctx.request_id, label = %label_key, "failed to prepare VM"); @@ -225,13 +248,177 @@ impl Scheduler { info!("scheduler: completed"); Ok(()) } +} - pub async fn run(self) -> Result<()> { - // Backward-compat: run until channel closed (no external shutdown) - self.run_with_shutdown(Arc::new(Notify::new())).await +// Tail the VM serial console file and record lines into the job log until cancelled. +async fn tail_console_to_joblog(persist: Arc, request_id: Uuid, console_path: PathBuf) -> miette::Result<()> { + use tokio::time::{sleep, Duration}; + use miette::IntoDiagnostic as _; + + // Negative sequence numbers for early console logs so they sort before runner logs (which start at 0). + let mut seq: i64 = -1_000_000; // ample headroom + + loop { + match File::open(&console_path).await { + Ok(file) => { + let mut reader = BufReader::new(file); + let mut buf = String::new(); + loop { + buf.clear(); + let n = reader.read_line(&mut buf).await.into_diagnostic()?; + if n == 0 { + // No new data yet; yield and retry + sleep(Duration::from_millis(200)).await; + continue; + } + let trimmed = buf.trim_end_matches(['\n', '\r']); + // Prefix to indicate source is VM console before the workflow-runner started streaming. + let line = if trimmed.is_empty() { String::from("[boot]") } else { format!("[boot] {}", trimmed) }; + let _ = persist.record_log_line(request_id, seq, false, &line).await; + seq += 1; + } + } + Err(_) => { + // File may not exist yet; back off briefly before retrying + sleep(Duration::from_millis(200)).await; + } + } } } +#[cfg(all(target_os = "linux", feature = "libvirt"))] +async fn discover_guest_ip_virsh(domain: &str, _timeout: Duration) -> Option { + use tokio::task; + // Best-effort: call `virsh domifaddr --source lease` + let dom = domain.to_string(); + let out = task::spawn_blocking(move || { + std::process::Command::new("virsh") + .args(["domifaddr", &dom, "--source", "lease"]) + .output() + }) + .await + .ok()? + .ok()?; + if !out.status.success() { + return None; + } + let s = String::from_utf8_lossy(&out.stdout); + for line in s.lines() { + // crude parse: look for IPv4 address in the line + if line.contains("ipv4") { + // Line format often contains '192.168.x.y/24' + if let Some(slash) = line.find('/') { + // find last whitespace before slash + let start = line[..slash].rfind(' ').map(|i| i + 1).unwrap_or(0); + let ip = &line[start..slash]; + if ip.chars().all(|c| c.is_ascii_digit() || c == '.') { + return Some(ip.to_string()); + } + } + } + // Fallback: extract first a.b.c.d sequence + let mut cur = String::new(); + for ch in line.chars() { + if ch.is_ascii_digit() || ch == '.' { + cur.push(ch); + } else { + if cur.split('.').count() == 4 { return Some(cur.clone()); } + cur.clear(); + } + } + if cur.split('.').count() == 4 { return Some(cur); } + } + None +} + +#[cfg(all(target_os = "linux", feature = "libvirt"))] +async fn run_job_via_ssh_owned( + ip: String, + user: String, + key_mem_opt: Option, + per_job_key_path: Option, + local_runner: PathBuf, + remote_path: PathBuf, + repo_url: String, + commit_sha: String, + request_id: uuid::Uuid, +) -> miette::Result<(bool, i32, Vec<(bool, String)>)> { + use ssh2::Session; + use std::fs::File as StdFile; + use std::io::Read; + use std::net::TcpStream; + use tokio::task; + + let (ok, code, lines) = task::spawn_blocking(move || -> miette::Result<(bool, i32, Vec<(bool, String)>)> { + // Connect TCP + let tcp = TcpStream::connect(format!("{}:22", ip)).map_err(|e| miette::miette!("tcp connect failed: {e}"))?; + let mut sess = Session::new().or_else(|_| Err(miette::miette!("ssh session new failed")))?; + sess.set_tcp_stream(tcp); + sess.handshake().map_err(|e| miette::miette!("ssh handshake: {e}"))?; + // Authenticate priority: memory key -> per-job file -> global file + if let Some(ref key_pem) = key_mem_opt { + sess.userauth_pubkey_memory(&user, None, key_pem, None) + .map_err(|e| miette::miette!("ssh auth failed for {user} (memory key): {e}"))?; + } else if let Some(ref p) = per_job_key_path { + sess.userauth_pubkey_file(&user, None, p, None) + .map_err(|e| miette::miette!("ssh auth failed for {user} (per-job file): {e}"))?; + } else { + return Err(miette::miette!("no SSH key available for job (neither in-memory nor per-job file)")); + } + if !sess.authenticated() { + return Err(miette::miette!("ssh not authenticated")); + } + // Upload runner via SFTP + let sftp = sess.sftp().map_err(|e| miette::miette!("sftp init failed: {e}"))?; + let mut local = StdFile::open(&local_runner).map_err(|e| miette::miette!("open local runner: {e}"))?; + let mut buf = Vec::new(); + local.read_to_end(&mut buf).map_err(|e| miette::miette!("read runner: {e}"))?; + let mut remote = sftp.create(&remote_path).map_err(|e| miette::miette!("sftp create: {e}"))?; + use std::io::Write as _; + remote.write_all(&buf).map_err(|e| miette::miette!("sftp write: {e}"))?; + let remote_file_stat = ssh2::FileStat { size: None, uid: None, gid: None, perm: Some(0o755), atime: None, mtime: None }; + let _ = sftp.setstat(&remote_path, remote_file_stat); + + // Build command + let cmd = format!( + "export SOLSTICE_REPO_URL=\"{}\" SOLSTICE_COMMIT_SHA=\"{}\" SOLSTICE_REQUEST_ID=\"{}\"; {}", + repo_url, commit_sha, request_id, remote_path.display() + ); + + let mut channel = sess.channel_session().map_err(|e| miette::miette!("channel session: {e}"))?; + channel.exec(&format!("sh -lc '{}'", cmd)).map_err(|e| miette::miette!("exec: {e}"))?; + + let mut out = String::new(); + let mut err = String::new(); + channel.read_to_string(&mut out).ok(); + channel.stderr().read_to_string(&mut err).ok(); + channel.wait_close().ok(); + let exit_code = channel.exit_status().unwrap_or(1); + let mut lines: Vec<(bool, String)> = Vec::new(); + for l in out.lines() { lines.push((false, l.to_string())); } + for l in err.lines() { lines.push((true, l.to_string())); } + Ok((exit_code == 0, exit_code, lines)) + }).await.into_diagnostic()??; + + Ok((ok, code, lines)) +} + +#[cfg(all(target_os = "linux", feature = "libvirt"))] +fn is_illumos_label(label: &str) -> bool { + let l = label.to_ascii_lowercase(); + l.contains("illumos") || l.contains("omnios") || l.contains("openindiana") || l.contains("oi-hipster") +} + +#[cfg(all(target_os = "linux", feature = "libvirt"))] +fn expand_tilde(path: &str) -> PathBuf { + if let Some(rest) = path.strip_prefix("~/") { + if let Ok(home) = std::env::var("HOME") { + return PathBuf::from(home).join(rest); + } + } + PathBuf::from(path) +} + #[cfg(test)] mod tests { use super::*; @@ -346,6 +533,10 @@ mod tests { repo_url: "https://example.com/r.git".into(), commit_sha: "deadbeef".into(), workflow_job_id: None, + ssh_user: None, + ssh_private_key_path: None, + ssh_private_key_pem: None, + ssh_public_key: None, } } @@ -360,10 +551,16 @@ mod tests { let mut caps = HashMap::new(); caps.insert("x".to_string(), 10); - let sched = Scheduler::new(hv, 2, &caps, persist, Duration::from_millis(10), Arc::new(common::MqConfig::default())); + let exec = ExecConfig { + remote_runner_path: "/usr/local/bin/solstice-runner".into(), + runner_linux_path: "/tmp/runner-linux".into(), + runner_illumos_path: "/tmp/runner-illumos".into(), + ssh_connect_timeout_secs: 30, + }; + let sched = Scheduler::new(hv, 2, &caps, persist, Duration::from_millis(10), Arc::new(common::MqConfig::default()), exec); let tx = sched.sender(); let run = tokio::spawn(async move { - let _ = sched.run().await; + let _ = sched.run_with_shutdown(Arc::new(Notify::new())).await; }); for _ in 0..5 { @@ -398,10 +595,16 @@ mod tests { caps.insert("a".to_string(), 1); caps.insert("b".to_string(), 2); - let sched = Scheduler::new(hv, 4, &caps, persist, Duration::from_millis(10), Arc::new(common::MqConfig::default())); + let exec = ExecConfig { + remote_runner_path: "/usr/local/bin/solstice-runner".into(), + runner_linux_path: "/tmp/runner-linux".into(), + runner_illumos_path: "/tmp/runner-illumos".into(), + ssh_connect_timeout_secs: 30, + }; + let sched = Scheduler::new(hv, 4, &caps, persist, Duration::from_millis(10), Arc::new(common::MqConfig::default()), exec); let tx = sched.sender(); let run = tokio::spawn(async move { - let _ = sched.run().await; + let _ = sched.run_with_shutdown(Arc::new(Notify::new())).await; }); for _ in 0..3 { diff --git a/crates/workflow-runner/src/main.rs b/crates/workflow-runner/src/main.rs index 2170a95..58256bd 100644 --- a/crates/workflow-runner/src/main.rs +++ b/crates/workflow-runner/src/main.rs @@ -188,6 +188,8 @@ async fn run_job_script_streamed( })), }) .await; + } else { + eprintln!("[runner] job script not found at {}", script); } return Ok(1); } @@ -202,6 +204,8 @@ async fn run_job_script_streamed( })), }) .await; + } else { + println!("[runner] executing {}", script); } let _ = run_shell(&format!("chmod +x {} || true", script)).await?; @@ -224,6 +228,8 @@ async fn run_job_script_streamed( Ok(0) => break, Ok(_) => { let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); + // Always echo to console so serial captures logs even without gRPC + println!("{}", line); let _ = tx2 .send(LogItem { request_id: req.clone(), @@ -322,36 +328,38 @@ async fn main() -> Result<()> { let mut tx_opt: Option> = None; let mut stream_handle: Option> = None; if let Some(addr) = orch_addr.clone() { - let (tx, rx) = mpsc::channel::(512); - let stream = ReceiverStream::new(rx); - // Spawn client task and keep a handle so we can await graceful flush on shutdown. - let handle = tokio::spawn(async move { - match RunnerClient::connect(format!("http://{addr}")).await { - Ok(mut client) => { + match RunnerClient::connect(format!("http://{addr}")).await { + Ok(mut client) => { + let (tx, rx) = mpsc::channel::(512); + let stream = ReceiverStream::new(rx); + // Spawn client task and keep a handle so we can await graceful flush on shutdown. + let handle = tokio::spawn(async move { if let Err(e) = client.stream_logs(stream).await { warn!(error = %e, "log stream to orchestrator terminated with error"); } - } - Err(e) => { - warn!(error = %e, "failed to connect to orchestrator gRPC; logs will not be streamed"); - } + }); + stream_handle = Some(handle); + tx_opt = Some(tx); + } + Err(e) => { + // Explicitly inform console so serial capture gets this + eprintln!("[runner] failed to connect to orchestrator gRPC at {}: {e}. Falling back to serial console only.", addr); } - }); - stream_handle = Some(handle); - tx_opt = Some(tx); - // Send a first line - if let Some(ref tx) = tx_opt { - let _ = tx - .send(LogItem { - request_id: request_id_effective.clone(), - event: Some(Event::Log(LogChunk { - line: format!("runner starting: repo={repo} sha={sha}"), - stderr: false, - })), - }) - .await; } } + // Emit a first line visibly regardless of streaming + println!("runner starting: repo={} sha={}", repo, sha); + if let Some(ref tx) = tx_opt { + let _ = tx + .send(LogItem { + request_id: request_id_effective.clone(), + event: Some(Event::Log(LogChunk { + line: format!("runner starting: repo={repo} sha={sha}"), + stderr: false, + })), + }) + .await; + } let code = match ensure_repo(&repo, &sha, &workdir).await { Ok(_) => { diff --git a/deploy/podman/.env.sample b/deploy/podman/.env.sample index fe2171a..3f4962d 100644 --- a/deploy/podman/.env.sample +++ b/deploy/podman/.env.sample @@ -52,20 +52,14 @@ ORCH_IMAGES_DIR=/var/lib/solstice/images # Host working directory for per-VM overlays and logs; mounted read-write # The libvirt backend will use /var/lib/solstice-ci inside the container; map it to a persistent host path. ORCH_WORK_DIR=/var/lib/solstice-ci -# Host directory containing workflow runner binaries to be served by the orchestrator -# Files in this directory are served read-only at http(s)://runner.svc.${DOMAIN}/runners/{filename} +# Host directory containing workflow runner binaries +# These files are mounted into the orchestrator at /opt/solstice/runners and uploaded into VMs over SSH. # Default points to the workspace target/runners where mise tasks may place built artifacts. RUNNER_DIR_HOST=../../target/runners -# When orchestrator runs behind NAT or in containers, set the public contact address -# that VMs can reach for gRPC log streaming (host:port). This overrides autodetection. -# Example: grpc.${ENV}.${DOMAIN}:443 (when terminated by Traefik) or a public IP:port -ORCH_CONTACT_ADDR= - -# Preferred: Provide a public base URL for runner binaries; the orchestrator will construct -# full URLs like ${SOLSTICE_RUNNER_BASE_URL}/runners/solstice-runner(-linux|-illumos) -# Example: https://runner.svc.${DOMAIN} -SOLSTICE_RUNNER_BASE_URL= +# SSH connectivity tuning for orchestrator -> VM +# Timeout (seconds) for establishing SSH connection to newly started VMs +SSH_CONNECT_TIMEOUT_SECS=300 # Forge Integration secrets (set per deployment) # Shared secret used to validate Forgejo/Gitea webhooks (X-Gitea-Signature HMAC-SHA256) diff --git a/deploy/podman/compose.yml b/deploy/podman/compose.yml index b8803e0..aa5004b 100644 --- a/deploy/podman/compose.yml +++ b/deploy/podman/compose.yml @@ -188,17 +188,17 @@ services: AMQP_EXCHANGE: solstice.jobs AMQP_QUEUE: solstice.jobs.v1 AMQP_ROUTING_KEY: jobrequest.v1 - GRPC_ADDR: 0.0.0.0:50051 HTTP_ADDR: 0.0.0.0:8081 - # Directory inside the container to serve runner binaries from - RUNNER_DIR: /runners + # Paths inside the container to runner binaries that will be uploaded over SSH + RUNNER_LINUX_PATH: /opt/solstice/runners/solstice-runner-linux + RUNNER_ILLUMOS_PATH: /opt/solstice/runners/solstice-runner-illumos + # Remote path on the VM where the runner will be uploaded and executed + REMOTE_RUNNER_PATH: /usr/local/bin/solstice-runner + # SSH connect timeout for reaching the VM (seconds) + SSH_CONNECT_TIMEOUT_SECS: ${SSH_CONNECT_TIMEOUT_SECS:-300} # Libvirt configuration for Linux/KVM LIBVIRT_URI: ${LIBVIRT_URI:-qemu:///system} LIBVIRT_NETWORK: ${LIBVIRT_NETWORK:-default} - # Public contact address for runners to stream logs to (host:port); overrides autodetection - ORCH_CONTACT_ADDR: ${ORCH_CONTACT_ADDR} - # Preferred: public base URL for runner binaries - SOLSTICE_RUNNER_BASE_URL: ${SOLSTICE_RUNNER_BASE_URL} depends_on: postgres: condition: service_healthy @@ -214,8 +214,8 @@ services: - ${ORCH_IMAGES_DIR:-/var/lib/solstice/images}:/var/lib/solstice/images:Z # Writable bind for per-VM overlays and console logs (used by libvirt backend) - ${ORCH_WORK_DIR:-/var/lib/solstice-ci}:/var/lib/solstice-ci:Z - # Read-only bind for locally built workflow runner binaries to be served by the orchestrator - - ${RUNNER_DIR_HOST:-../../target/runners}:/runners:ro,Z + # Read-only bind for locally built workflow runner binaries; orchestrator will upload over SSH + - ${RUNNER_DIR_HOST:-../../target/runners}:/opt/solstice/runners:ro,Z # Libvirt control sockets (ro is sufficient for read-only, but write is needed to create domains) - /var/run/libvirt/libvirt-sock:/var/run/libvirt/libvirt-sock:Z - /var/run/libvirt/libvirt-sock-ro:/var/run/libvirt/libvirt-sock-ro:Z @@ -235,12 +235,6 @@ services: - traefik.http.routers.api.entrypoints=websecure - traefik.http.routers.api.tls.certresolver=le - traefik.http.services.api.loadbalancer.server.port=8081 - # gRPC on grpc.${ENV}.${DOMAIN} (TLS, h2) - - traefik.tcp.routers.grpc.rule=HostSNI(`grpc.${ENV}.${DOMAIN}`) - - traefik.tcp.routers.grpc.entrypoints=websecure - - traefik.tcp.routers.grpc.tls=true - - traefik.tcp.routers.grpc.tls.certresolver=le - - traefik.tcp.services.grpc.loadbalancer.server.port=50051 forge-integration: build: diff --git a/examples/orchestrator-image-map.yaml b/examples/orchestrator-image-map.yaml index 43bd293..fead7f8 100644 --- a/examples/orchestrator-image-map.yaml +++ b/examples/orchestrator-image-map.yaml @@ -8,7 +8,7 @@ # and which backend it targets. All images should support NoCloud metadata. # Default label to use when a job doesn't specify runs_on -default_label: illumos-latest +default_label: ubuntu-22.04 # Optional label aliases aliases: @@ -41,6 +41,7 @@ images: cpu: 2 ram_mb: 2048 disk_gb: 40 + ssh_user: root # OpenIndiana Hipster cloud image kept for reference (no longer default) openindiana-hipster: @@ -63,3 +64,4 @@ images: cpu: 2 ram_mb: 2048 disk_gb: 40 + ssh_user: ubuntu