diff --git a/.idea/solstice-ci.iml b/.idea/solstice-ci.iml index bdfeb09..928b227 100644 --- a/.idea/solstice-ci.iml +++ b/.idea/solstice-ci.iml @@ -11,6 +11,7 @@ + diff --git a/Cargo.toml b/Cargo.toml index 8f7efb2..425f993 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,6 @@ [workspace] members = ["crates/*"] -resolver = "3" \ No newline at end of file +resolver = "3" + +[patch.crates-io] +# Ensure single sea-orm version resolution across crates if needed \ No newline at end of file diff --git a/crates/common/src/telemetry.rs b/crates/common/src/telemetry.rs index 1ae6805..e393cf7 100644 --- a/crates/common/src/telemetry.rs +++ b/crates/common/src/telemetry.rs @@ -13,7 +13,8 @@ pub fn init_tracing(_service_name: &str) -> miette::Result { let fmt_layer = fmt::layer() .with_target(false) .with_writer(nb_writer) - .with_ansi(atty::is(atty::Stream::Stderr)); + // Force-disable ANSI to keep logs plain for serial capture and gRPC forwarding + .with_ansi(false); let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); diff --git a/crates/forge-integration/src/main.rs b/crates/forge-integration/src/main.rs index 9c05703..627f1e7 100644 --- a/crates/forge-integration/src/main.rs +++ b/crates/forge-integration/src/main.rs @@ -83,10 +83,14 @@ struct Opts { #[arg(long, env = "FORGE_CONTEXT", default_value = "solstice/ci")] forge_context: String, - /// Orchestrator HTTP base for logs (e.g., http://localhost:8081) - #[arg(long, env = "ORCH_HTTP_BASE")] + /// Orchestrator HTTP base for logs (deprecated; use LOGS_BASE_URL) + #[arg(long, env = "ORCH_HTTP_BASE")] // Deprecated orch_http_base: Option, + /// Logs service base URL (e.g., http://logs.local:8082) + #[arg(long, env = "LOGS_BASE_URL")] + logs_base_url: Option, + /// S3-compatible endpoint for Garage/MinIO (e.g., http://localhost:9000) #[arg(long, env = "S3_ENDPOINT")] s3_endpoint: Option, @@ -112,7 +116,8 @@ struct AppState { forgejo_base: Option, forgejo_token: Option, forge_context: String, - orch_http_base: Option, + orch_http_base: Option, // deprecated + logs_base_url: Option, s3_endpoint: Option, s3_bucket: Option, runs_on_default: Option, @@ -179,6 +184,7 @@ async fn main() -> Result<()> { forgejo_token: opts.forgejo_token, forge_context: opts.forge_context, orch_http_base: opts.orch_http_base, + logs_base_url: opts.logs_base_url, s3_endpoint: opts.s3_endpoint, s3_bucket: opts.s3_bucket, runs_on_default: opts.runs_on_default, diff --git a/crates/logs-service/Cargo.toml b/crates/logs-service/Cargo.toml new file mode 100644 index 0000000..7b6ae46 --- /dev/null +++ b/crates/logs-service/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "logs-service" +version = "0.1.0" +edition = "2024" + +[dependencies] +common = { path = "../common" } +clap = { version = "4", features = ["derive", "env"] } +axum = { version = "0.8", features = ["macros"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +miette = { version = "7", features = ["fancy"] } +tracing = "0.1" +tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal", "net"] } +sea-orm = { version = "1.1.17", default-features = false, features = ["sqlx-postgres", "sqlx-sqlite", "runtime-tokio-rustls", "macros", "with-uuid", "with-chrono" ] } +sea-orm-migration = { version = "1.1.17" } +migration = { path = "../migration" } +uuid = { version = "1", features = ["v4", "serde"] } +chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] } diff --git a/crates/logs-service/src/main.rs b/crates/logs-service/src/main.rs new file mode 100644 index 0000000..9055f57 --- /dev/null +++ b/crates/logs-service/src/main.rs @@ -0,0 +1,143 @@ +use axum::{extract::Path, http::StatusCode, response::{IntoResponse, Response}, routing::get, Json, Router}; +use clap::Parser; +use miette::{IntoDiagnostic as _, Result}; +use sea_orm::{entity::prelude::*, Database, DatabaseConnection, QueryOrder, ColumnTrait, QueryFilter, Statement, DatabaseBackend, Value}; +use sea_orm_migration::MigratorTrait; +use serde::Serialize; +use std::net::SocketAddr; +use tracing::{info, warn}; +use uuid::Uuid; + +#[derive(Parser, Debug)] +#[command(name = "solstice-logs", version, about = "Solstice CI — Logs Service")] +struct Opts { + /// HTTP bind address + #[arg(long, env = "HTTP_ADDR", default_value = "0.0.0.0:8082")] + http_addr: String, + + /// Database URL + #[arg(long, env = "DATABASE_URL")] + database_url: String, + + /// OTLP endpoint (e.g., http://localhost:4317) + #[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")] + otlp: Option, +} + +#[derive(Clone)] +struct AppState { db: DatabaseConnection } + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<()> { + let _t = common::init_tracing("solstice-logs-service")?; + let opts = Opts::parse(); + let db = Database::connect(opts.database_url).await.into_diagnostic()?; + migration::Migrator::up(&db, None).await.into_diagnostic()?; + + let state = AppState { db }; + let router = Router::new() + .route("/jobs/{request_id}/logs", get(list_logs)) + .route("/jobs/{request_id}/logs/{category}", get(get_logs_by_category)) + .with_state(state); + + let addr: SocketAddr = opts.http_addr.parse().expect("invalid HTTP_ADDR"); + info!(%addr, "logs-service starting"); + axum::serve( + tokio::net::TcpListener::bind(addr).await.expect("bind"), + router, + ) + .await + .into_diagnostic() +} + +mod job_logs { + use super::*; + #[derive(Clone, Debug, PartialEq, DeriveEntityModel)] + #[sea_orm(table_name = "job_logs")] + pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub request_id: Uuid, + #[sea_orm(primary_key, auto_increment = false)] + pub seq: i64, + pub ts: chrono::DateTime, + pub stderr: bool, + pub line: String, + pub category: String, + pub level: Option, + pub fields: Option, + pub has_error: bool, + } + #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] + pub enum Relation {} + impl ActiveModelBehavior for ActiveModel {} +} + +#[derive(Serialize)] +struct LogCategorySummary { + category: String, + count: i64, + has_errors: bool, + first_ts: chrono::DateTime, + last_ts: chrono::DateTime, +} + +async fn list_logs(Path(request_id): Path, axum::extract::State(state): axum::extract::State) -> Response { + let Ok(id) = Uuid::parse_str(&request_id) else { return StatusCode::BAD_REQUEST.into_response(); }; + // Query per-category summaries using backend-agnostic SQL + parameter binding + let backend = state.db.get_database_backend(); + let (sql, vals): (&str, Vec) = match backend { + DatabaseBackend::Postgres => ( + "SELECT category AS category, COUNT(*) AS count, MIN(ts) AS first_ts, MAX(ts) AS last_ts, MAX(has_error) AS has_errors FROM job_logs WHERE request_id = $1 GROUP BY category ORDER BY category", + vec![Value::Uuid(Some(Box::new(id)))] + ), + _ => ( + "SELECT category AS category, COUNT(*) AS count, MIN(ts) AS first_ts, MAX(ts) AS last_ts, MAX(has_error) AS has_errors FROM job_logs WHERE request_id = ? GROUP BY category ORDER BY category", + vec![Value::Uuid(Some(Box::new(id)))] + ), + }; + let stmt = Statement::from_sql_and_values(backend, sql, vals); + let rows = match state.db.query_all(stmt).await.into_diagnostic() { + Ok(r) => r, + Err(e) => { warn!(error = %e, request_id = %id, "failed to query log categories"); return StatusCode::INTERNAL_SERVER_ERROR.into_response(); } + }; + let mut out: Vec = Vec::new(); + for row in rows { + let category: String = row.try_get_by("category").unwrap_or_else(|_| "default".into()); + let count: i64 = row.try_get_by("count").unwrap_or(0); + let first_ts: chrono::DateTime = row.try_get_by("first_ts").unwrap_or_else(|_| chrono::Utc::now()); + let last_ts: chrono::DateTime = row.try_get_by("last_ts").unwrap_or_else(|_| chrono::Utc::now()); + let has_errors: bool = row.try_get_by("has_errors").unwrap_or(false); + out.push(LogCategorySummary { category, count, has_errors, first_ts, last_ts }); + } + Json(out).into_response() +} + +async fn get_logs_by_category(Path((request_id, category)): Path<(String, String)>, axum::extract::State(state): axum::extract::State) -> Response { + let Ok(id) = Uuid::parse_str(&request_id) else { return StatusCode::BAD_REQUEST.into_response(); }; + let rows = job_logs::Entity::find() + .filter(job_logs::Column::RequestId.eq(id)) + .filter(job_logs::Column::Category.eq(category.clone())) + .order_by_asc(job_logs::Column::Seq) + .all(&state.db) + .await + .into_diagnostic(); + match rows { + Ok(items) if items.is_empty() => StatusCode::NOT_FOUND.into_response(), + Ok(items) => { + let mut text = String::new(); + for r in items { + if r.stderr || r.has_error || r.level.as_deref() == Some("error") { + text.push_str("[stderr] "); + } + text.push_str(&r.line); + if !text.ends_with('\n') { text.push('\n'); } + } + ( + StatusCode::OK, + [(axum::http::header::CONTENT_TYPE, "text/plain; charset=utf-8")], + text, + ).into_response() + } + Err(e) => { warn!(error = %e, request_id = %id, "failed to read logs"); StatusCode::INTERNAL_SERVER_ERROR.into_response() } + } +} diff --git a/crates/migration/src/lib.rs b/crates/migration/src/lib.rs index 27bd15e..620651c 100644 --- a/crates/migration/src/lib.rs +++ b/crates/migration/src/lib.rs @@ -10,6 +10,7 @@ impl MigratorTrait for Migrator { Box::new(m2025_10_25_000002_create_vms::Migration), Box::new(m2025_11_02_000003_create_job_logs::Migration), Box::new(m2025_11_15_000004_create_job_ssh_keys::Migration), + Box::new(m2025_11_18_000005_alter_job_logs_add_category_fields::Migration), ] } } @@ -258,3 +259,81 @@ mod m2025_11_15_000004_create_job_ssh_keys { } } } + + +mod m2025_11_18_000005_alter_job_logs_add_category_fields { + use super::*; + + pub struct Migration; + + impl sea_orm_migration::prelude::MigrationName for Migration { + fn name(&self) -> &str { + "m2025_11_18_000005_alter_job_logs_add_category_fields" + } + } + + #[derive(Iden)] + enum JobLogs { + Table, + RequestId, + Seq, + Ts, + Stderr, + Line, + Category, + Level, + Fields, + HasError, + } + + #[async_trait::async_trait] + impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Add new columns if they don't exist + manager + .alter_table( + Table::alter() + .table(JobLogs::Table) + .add_column_if_not_exists(ColumnDef::new(JobLogs::Category).string().not_null().default("default")) + .add_column_if_not_exists(ColumnDef::new(JobLogs::Level).string().null()) + .add_column_if_not_exists(ColumnDef::new(JobLogs::Fields).text().null()) + .add_column_if_not_exists(ColumnDef::new(JobLogs::HasError).boolean().not_null().default(false)) + .to_owned(), + ) + .await?; + + // Composite index to speed fetching per category + manager + .create_index( + Index::create() + .name("idx_job_logs_req_cat_seq") + .table(JobLogs::Table) + .col(JobLogs::RequestId) + .col(JobLogs::Category) + .col(JobLogs::Seq) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Drop the composite index + manager + .drop_index(Index::drop().name("idx_job_logs_req_cat_seq").table(JobLogs::Table).to_owned()) + .await?; + + // Drop added columns + manager + .alter_table( + Table::alter() + .table(JobLogs::Table) + .drop_column(JobLogs::Category) + .drop_column(JobLogs::Level) + .drop_column(JobLogs::Fields) + .drop_column(JobLogs::HasError) + .to_owned(), + ) + .await + } + } +} diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index ee5d988..e7513e9 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "orchestrator" -version = "0.1.11" +version = "0.1.13" edition = "2024" build = "build.rs" diff --git a/crates/orchestrator/src/http.rs b/crates/orchestrator/src/http.rs index 16ffc02..c5581ca 100644 --- a/crates/orchestrator/src/http.rs +++ b/crates/orchestrator/src/http.rs @@ -14,33 +14,29 @@ pub struct HttpState { pub fn build_router(persist: Arc) -> Router { let state = HttpState { persist }; Router::new() - .route("/jobs/{request_id}/logs", get(get_logs)) + .route("/jobs/{request_id}/logs", get(get_logs_moved)) .with_state(state) } -async fn get_logs( +async fn get_logs_moved( Path(request_id): Path, - axum::extract::State(state): axum::extract::State, + _state: axum::extract::State, ) -> Response { - let Ok(id) = Uuid::parse_str(&request_id) else { - return StatusCode::BAD_REQUEST.into_response(); + let base = std::env::var("LOGS_BASE_URL").ok(); + let msg = if let Some(b) = base.as_ref() { + format!("Logs have moved: {}/jobs/{}/logs", b.trim_end_matches('/'), request_id) + } else { + "Logs endpoint moved to logs-service; set LOGS_BASE_URL to enable 302 redirects".to_string() }; - if !state.persist.is_enabled() { - return (StatusCode::SERVICE_UNAVAILABLE, "persistence disabled").into_response(); - } - match state.persist.get_logs_text(id).await { - Ok(Some(text)) => ( - StatusCode::OK, - [(axum::http::header::CONTENT_TYPE, "text/plain; charset=utf-8")], - text, - ) - .into_response(), - Ok(None) => StatusCode::NOT_FOUND.into_response(), - Err(e) => { - warn!(error = %e, request_id = %id, "failed to read logs"); - StatusCode::INTERNAL_SERVER_ERROR.into_response() - } + if let Some(b) = base { + let loc = format!("{}/jobs/{}/logs", b.trim_end_matches('/'), request_id); + return ( + StatusCode::MOVED_PERMANENTLY, + [(axum::http::header::LOCATION, loc.as_str())], + msg, + ).into_response(); } + (StatusCode::GONE, msg).into_response() } pub async fn serve(addr: SocketAddr, persist: Arc, shutdown: impl std::future::Future) { diff --git a/crates/orchestrator/src/persist.rs b/crates/orchestrator/src/persist.rs index ebe7179..a304870 100644 --- a/crates/orchestrator/src/persist.rs +++ b/crates/orchestrator/src/persist.rs @@ -118,6 +118,14 @@ mod job_logs { pub ts: chrono::DateTime, pub stderr: bool, pub line: String, + /// Logical category to group logs (e.g., "setup", "build", "test") + pub category: String, + /// Log level (e.g., "info", "warn", "error") + pub level: Option, + /// Arbitrary structured fields from NDJSON, stored as raw JSON string + pub fields: Option, + /// Convenience flag for quick error presence checks + pub has_error: bool, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -249,12 +257,37 @@ impl Persist { debug!(%request_id, %seq, stderr, "record_log_line (noop)"); return Ok(()); }; + // Try to parse structured JSON lines to extract common fields + let mut category = "default".to_string(); + let mut level_str: Option = None; + let mut msg_line = line.to_string(); + let mut fields_json: Option = None; + if let Ok(val) = serde_json::from_str::(line) { + // Keep original JSON for reference + fields_json = Some(line.to_string()); + if let Some(c) = val.get("category").and_then(|v| v.as_str()) { + if !c.is_empty() { category = c.to_string(); } + } + if let Some(l) = val.get("level").and_then(|v| v.as_str()) { + level_str = Some(l.to_string()); + } + // Prefer common keys for message + if let Some(m) = val.get("msg").or_else(|| val.get("message")).and_then(|v| v.as_str()) { + msg_line = m.to_string(); + } + } + let level = level_str.or_else(|| if stderr { Some("error".to_string()) } else { Some("info".to_string()) }); + let has_error = stderr || level.as_deref() == Some("error"); let am = job_logs::ActiveModel { request_id: Set(request_id), seq: Set(seq), ts: Set(Utc::now()), stderr: Set(stderr), - line: Set(line.to_string()), + line: Set(msg_line), + category: Set(category), + level: Set(level), + fields: Set(fields_json), + has_error: Set(has_error), }; job_logs::Entity::insert(am).exec(db).await.into_diagnostic()?; Ok(()) diff --git a/crates/orchestrator/src/scheduler.rs b/crates/orchestrator/src/scheduler.rs index b356e7f..a88895b 100644 --- a/crates/orchestrator/src/scheduler.rs +++ b/crates/orchestrator/src/scheduler.rs @@ -44,6 +44,50 @@ pub struct ExecConfig { pub libvirt_network: String, } +// Strip ANSI escape sequences (colors/control) from a string for clean log storage. +fn strip_ansi(input: &str) -> String { + let bytes = input.as_bytes(); + let mut out = String::with_capacity(bytes.len()); + let mut i = 0; + while i < bytes.len() { + let b = bytes[i]; + if b == 0x1b { // ESC + if i + 1 < bytes.len() { + let b1 = bytes[i + 1]; + // CSI: ESC [ ... final byte in 0x40..=0x7E + if b1 == b'[' { + i += 2; + while i < bytes.len() { + let c = bytes[i]; + if (0x40..=0x7E).contains(&c) { i += 1; break; } + i += 1; + } + continue; + } + // OSC: ESC ] ... BEL (0x07) or ST (ESC \) + if b1 == b']' { + i += 2; + while i < bytes.len() { + if bytes[i] == 0x07 { i += 1; break; } + if bytes[i] == 0x1b && i + 1 < bytes.len() && bytes[i + 1] == b'\\' { i += 2; break; } + i += 1; + } + continue; + } + // Other 2-byte sequences like ESC ( B, ESC ) 0, etc.: skip ESC and next char + i += 2; + continue; + } + // trailing ESC, drop + i += 1; + continue; + } + out.push(b as char); + i += 1; + } + out +} + impl Scheduler { pub fn new( hv: H, @@ -162,6 +206,7 @@ impl Scheduler { // Attempt to discover guest IP (libvirt only) and run the runner over SSH let mut success = false; let mut exit_code: i32 = 1; + let mut failure_summary: Option = None; #[cfg(all(target_os = "linux", feature = "libvirt"))] { @@ -205,14 +250,28 @@ impl Scheduler { ).await { Ok((ok, code, lines)) => { success = ok; exit_code = code; - // Persist lines + // Persist lines and capture last non-empty stderr for summary let mut seq: i64 = 0; + let mut last_stderr: Option = None; for (is_stderr, line) in lines { - let _ = persist.record_log_line(item.ctx.request_id, seq, is_stderr, &line).await; + let line_ref = line.trim_end_matches(['\n', '\r']); + if is_stderr && !line_ref.trim().is_empty() { + last_stderr = Some(line_ref.to_string()); + } + let _ = persist.record_log_line(item.ctx.request_id, seq, is_stderr, line_ref).await; seq += 1; } + if !success { + failure_summary = Some(match last_stderr { + Some(ref msg) => format!("job script failed: exit_code={} — {}", exit_code, msg), + None => format!("job script failed: exit_code={}", exit_code), + }); + } }, - Err(e) => { warn!(error = %e, request_id = %item.ctx.request_id, ip = %ip, "ssh runner execution failed"); } + Err(e) => { + failure_summary = Some(format!("ssh/runner error: {}", e)); + warn!(error = %e, request_id = %item.ctx.request_id, ip = %ip, "ssh runner execution failed"); + } } } None => { @@ -252,7 +311,7 @@ impl Scheduler { // Persist final state and publish result let final_state = if success { JobState::Succeeded } else { JobState::Failed }; let _ = persist.record_job_state(item.ctx.request_id, &item.ctx.repo_url, &item.ctx.commit_sha, Some(&item.spec.label), final_state).await; - let mut result = JobResult::new(item.ctx.request_id, item.ctx.repo_url.clone(), item.ctx.commit_sha.clone(), success, exit_code, None); + let mut result = JobResult::new(item.ctx.request_id, item.ctx.repo_url.clone(), item.ctx.commit_sha.clone(), success, exit_code, failure_summary.clone()); if let Err(e) = publish_job_result(&mq_cfg_in, &result).await { warn!(error = %e, request_id = %item.ctx.request_id, "failed to publish JobResult"); } @@ -311,8 +370,13 @@ async fn tail_console_to_joblog(persist: Arc, request_id: Uuid, console continue; } let trimmed = buf.trim_end_matches(['\n', '\r']); - // Prefix to indicate source is VM console before the workflow-runner started streaming. - let line = if trimmed.is_empty() { String::from("[boot]") } else { format!("[boot] {}", trimmed) }; + // Emit as NDJSON so logs-service can categorize this as 'boot' + let obj = serde_json::json!({ + "category": "boot", + "level": "info", + "msg": trimmed + }); + let line = obj.to_string(); let _ = persist.record_log_line(request_id, seq, false, &line).await; seq += 1; } @@ -333,7 +397,12 @@ async fn snapshot_console_to_joblog(persist: Arc, request_id: Uuid, con let mut seq: i64 = -1_000_000; // keep consistent ordering before runner logs for raw in content.lines() { let trimmed = raw.trim_end_matches(['\n', '\r']); - let line = if trimmed.is_empty() { String::from("[boot]") } else { format!("[boot] {}", trimmed) }; + let obj = serde_json::json!({ + "category": "boot", + "level": "info", + "msg": trimmed + }); + let line = obj.to_string(); let _ = persist.record_log_line(request_id, seq, false, &line).await; seq += 1; } @@ -636,16 +705,48 @@ async fn run_job_via_ssh_owned( let mut channel = sess.channel_session() .map_err(OrchestratorError::ChannelSession) .into_diagnostic()?; + // Best-effort: request a PTY to encourage line-buffered output from the runner + let _ = channel.request_pty("xterm", None, None); channel.exec(&format!("sh -lc '{}'", cmd)) .map_err(OrchestratorError::Exec) .into_diagnostic()?; - let mut out = String::new(); - let mut err = String::new(); - channel.read_to_string(&mut out).ok(); - channel.stderr().read_to_string(&mut err).ok(); - channel.wait_close().ok(); + // Switch to non-blocking so we can interleave stdout/stderr without deadlock + sess.set_blocking(false); + + use std::io::Read as _; + let mut out_acc: Vec = Vec::new(); + let mut err_acc: Vec = Vec::new(); + let mut tmp = [0u8; 4096]; + // Read until remote closes + loop { + let mut progressed = false; + match channel.read(&mut tmp) { + Ok(0) => {}, + Ok(n) => { out_acc.extend_from_slice(&tmp[..n]); progressed = true; }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + Err(e) => return Err(miette::miette!("ssh stdout read error: {}", e)), + } + match channel.stderr().read(&mut tmp) { + Ok(0) => {}, + Ok(n) => { err_acc.extend_from_slice(&tmp[..n]); progressed = true; }, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}, + Err(e) => return Err(miette::miette!("ssh stderr read error: {}", e)), + } + if channel.eof() { break; } + if !progressed { + // avoid busy spin + std::thread::sleep(std::time::Duration::from_millis(20)); + } + } + // Restore blocking for status fetch (defensive) + sess.set_blocking(true); + let _ = channel.wait_close(); let exit_code = channel.exit_status().unwrap_or(1); + + // Convert buffers into lines (strip ANSI color/escape sequences) + let out = strip_ansi(&String::from_utf8_lossy(&out_acc)); + let err = strip_ansi(&String::from_utf8_lossy(&err_acc)); let mut lines: Vec<(bool, String)> = Vec::new(); for l in out.lines() { lines.push((false, l.to_string())); } for l in err.lines() { lines.push((true, l.to_string())); } diff --git a/crates/workflow-runner/Cargo.toml b/crates/workflow-runner/Cargo.toml index 7f50755..c3e2d69 100644 --- a/crates/workflow-runner/Cargo.toml +++ b/crates/workflow-runner/Cargo.toml @@ -12,10 +12,9 @@ common = { path = "../common" } clap = { version = "4", features = ["derive", "env"] } miette = { version = "7", features = ["fancy"] } tracing = "0.1" -tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "fs", "io-util"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "fs", "io-util", "net", "time"] } serde = { version = "1", features = ["derive"] } serde_yaml = "0.9" -# gRPC client -tonic = { version = "0.14", features = ["transport"] } -tokio-stream = "0.1" +serde_json = "1" +# Needs to be present to force aws-lc-rs to use bindgen for cross-compilation aws-lc-rs = { version = "1", features = ["bindgen"] } \ No newline at end of file diff --git a/crates/workflow-runner/src/main.rs b/crates/workflow-runner/src/main.rs index 58256bd..ab8bb14 100644 --- a/crates/workflow-runner/src/main.rs +++ b/crates/workflow-runner/src/main.rs @@ -1,17 +1,34 @@ use clap::Parser; -use common::runner::v1::{JobEnd, LogChunk, LogItem, log_item::Event, runner_client::RunnerClient}; use miette::{IntoDiagnostic as _, Result}; use serde::Deserialize; use std::process::Stdio; -use tokio::sync::mpsc; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; use tokio::{ fs, io::{AsyncBufReadExt, BufReader}, process::Command, }; -use tokio_stream::wrappers::ReceiverStream; use tracing::{error, info, warn}; +fn ndjson_line(category: &str, level: &str, msg: &str, extra: Option) -> String { + let mut obj = serde_json::json!({ + "category": category, + "level": level, + "msg": msg, + }); + if let Some(ext) = extra { + if let Some(map) = obj.as_object_mut() { + if let Some(eo) = ext.as_object() { + for (k, v) in eo.iter() { + map.insert(k.clone(), v.clone()); + } + } + } + } + obj.to_string() +} + #[derive(Parser, Debug)] #[command( name = "solstice-runner", @@ -73,7 +90,103 @@ async fn has_cmd(name: &str) -> bool { .unwrap_or(false) } +async fn check_writable(dir: &str) -> bool { + if fs::create_dir_all(dir).await.is_err() { return false; } + let test_path = format!("{}/.solstice-writecheck", dir.trim_end_matches('/')); + match fs::write(&test_path, b"ok").await { + Ok(_) => { + let _ = fs::remove_file(&test_path).await; true + } + Err(_) => false, + } +} + +fn parse_repo_host_port(repo: &str) -> Option<(String, u16)> { + let r = repo.trim(); + if let Some(rest) = r.strip_prefix("https://") { + let host = rest.split('/').next()?.split('@').last()?.split(':').next()?; // ignore embedded user + return Some((host.to_string(), 443)); + } + if let Some(rest) = r.strip_prefix("http://") { + let host = rest.split('/').next()?.split('@').last()?.split(':').next()?; + return Some((host.to_string(), 80)); + } + if let Some(rest) = r.strip_prefix("ssh://") { + // ssh://[user@]host/owner/repo + let first = rest.split('/').next()?; + let host = first.split('@').last()?.split(':').next()?; + return Some((host.to_string(), 22)); + } + // scp-like: user@host:owner/repo.git + if let Some(at) = r.find('@') { + if let Some(colon) = r[at+1..].find(':') { // ensure host: + let host = &r[at+1..at+1+colon]; + if !host.is_empty() { return Some((host.to_string(), 22)); } + } + } + None +} + +async fn check_network_connect(host: &str, port: u16, timeout_ms: u64) -> bool { + use tokio::time::{timeout, Duration}; + match timeout(Duration::from_millis(timeout_ms), tokio::net::TcpStream::connect((host, port))).await { + Ok(Ok(_stream)) => { true } + _ => false, + } +} + +async fn preflight(repo: &str, workdir: &str) -> Result<()> { + // Tool availability + let has_git = has_cmd("git").await; + let has_curl = has_cmd("curl").await; + let has_wget = has_cmd("wget").await; + let has_tar = has_cmd("tar").await; + for (tool, ok) in [("git", has_git), ("curl", has_curl), ("wget", has_wget), ("tar", has_tar)] { + let lvl = if ok { "info" } else { "warn" }; + println!("{}", ndjson_line("tool_check", lvl, tool, Some(serde_json::json!({"available": ok})))); + } + let can_clone = has_git || (has_tar && (has_curl || has_wget)); + let lvl = if can_clone { "info" } else { "error" }; + println!( + "{}", + ndjson_line( + "env_setup", + lvl, + "clone capability", + Some(serde_json::json!({ + "git": has_git, + "tar": has_tar, + "curl": has_curl, + "wget": has_wget, + "can_clone": can_clone + })) + ) + ); + if !can_clone { + return Err(miette::miette!("no available method to fetch repository: need git or (tar and (curl|wget))")); + } + + // Workdir writability + let writable = check_writable(workdir).await; + let lvl = if writable { "info" } else { "error" }; + println!("{}", ndjson_line("env_setup", lvl, "workdir writable", Some(serde_json::json!({"path": workdir, "writable": writable})))); + if !writable { return Err(miette::miette!("workdir is not writable: {}", workdir)); } + + // Network reachability (best-effort) + if let Some((host, port)) = parse_repo_host_port(repo) { + let ok = check_network_connect(&host, port, 2000).await; + let lvl = if ok { "info" } else { "warn" }; + println!("{}", ndjson_line("env", lvl, "network connectivity", Some(serde_json::json!({"host": host, "port": port, "reachable": ok})))); + } + + Ok(()) +} + async fn fetch_repo_via_archive(repo_https: &str, sha: &str, workdir: &str) -> Result<()> { + // Announce chosen method + println!("{}", ndjson_line("env_setup", "info", "fetch via http archive", Some(serde_json::json!({ + "url": format!("{}/archive/{}.tar.gz", repo_https.trim_end_matches('.').trim_end_matches(".git"), sha) + })))); // Gitea/Codeberg archive URL pattern: https://codeberg.org///archive/.tar.gz let base = repo_https.trim_end_matches('.').trim_end_matches(".git"); let url = format!("{}/archive/{}.tar.gz", base, sha); @@ -170,43 +283,15 @@ async fn ensure_repo(repo: &str, sha: &str, workdir: &str) -> Result<()> { } } -async fn run_job_script_streamed( - workdir: &str, - tx: Option>, - request_id: &str, -) -> Result { +async fn run_job_script(workdir: &str) -> Result { let script = format!("{}/.solstice/job.sh", workdir); if !fs::try_exists(&script).await.into_diagnostic()? { warn!(path = %script, "job script not found"); - if let Some(tx0) = tx.as_ref() { - let _ = tx0 - .send(LogItem { - request_id: request_id.to_string(), - event: Some(Event::Log(LogChunk { - line: format!("[runner] job script not found at {}", script), - stderr: true, - })), - }) - .await; - } else { - eprintln!("[runner] job script not found at {}", script); - } + eprintln!("{}", ndjson_line("job_run", "error", &format!("job script not found at {}", script), None)); return Ok(1); } // Emit explicit pre-exec line to aid diagnostics - if let Some(tx0) = tx.as_ref() { - let _ = tx0 - .send(LogItem { - request_id: request_id.to_string(), - event: Some(Event::Log(LogChunk { - line: format!("[runner] executing {}", script), - stderr: false, - })), - }) - .await; - } else { - println!("[runner] executing {}", script); - } + println!("{}", ndjson_line("job_run", "info", &format!("executing {}", script), None)); let _ = run_shell(&format!("chmod +x {} || true", script)).await?; let mut cmd = Command::new("/bin/sh"); @@ -216,88 +301,78 @@ async fn run_job_script_streamed( .stderr(Stdio::piped()); let mut child = cmd.spawn().into_diagnostic()?; - if let Some(tx) = tx.clone() { - if let Some(stdout) = child.stdout.take() { - let mut reader = BufReader::new(stdout); - let tx2 = tx.clone(); - let req = request_id.to_string(); - tokio::spawn(async move { - loop { - let mut buf = Vec::with_capacity(256); - match reader.read_until(b'\n', &mut buf).await { - Ok(0) => break, - Ok(_) => { - let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); - // Always echo to console so serial captures logs even without gRPC - println!("{}", line); - let _ = tx2 - .send(LogItem { - request_id: req.clone(), - event: Some(Event::Log(LogChunk { - line, - stderr: false, - })), - }) - .await; - } - Err(e) => { - let _ = tx2 - .send(LogItem { - request_id: req.clone(), - event: Some(Event::Log(LogChunk { - line: format!("[runner] error reading stdout: {e}"), - stderr: true, - })), - }) - .await; - break; - } + // Buffer the last N stderr lines for failure summary + let last_err: Arc>> = Arc::new(Mutex::new(VecDeque::with_capacity(20))); + + // Attach readers to child stdout/stderr so logs stream as NDJSON categorized under job_run + if let Some(stdout) = child.stdout.take() { + let mut reader = BufReader::new(stdout); + tokio::spawn(async move { + loop { + let mut buf = Vec::with_capacity(256); + match reader.read_until(b'\n', &mut buf).await { + Ok(0) => break, + Ok(_) => { + let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); + println!("{}", ndjson_line("job_run", "info", &line, None)); + } + Err(e) => { + eprintln!("{}", ndjson_line("job_run", "error", &format!("error reading stdout: {}", e), None)); + break; } } - }); - } - if let Some(stderr) = child.stderr.take() { - let mut reader = BufReader::new(stderr); - let tx2 = tx.clone(); - let req = request_id.to_string(); - tokio::spawn(async move { - loop { - let mut buf = Vec::with_capacity(256); - match reader.read_until(b'\n', &mut buf).await { - Ok(0) => break, - Ok(_) => { - let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); - let _ = tx2 - .send(LogItem { - request_id: req.clone(), - event: Some(Event::Log(LogChunk { line, stderr: true })), - }) - .await; - } - Err(e) => { - let _ = tx2 - .send(LogItem { - request_id: req.clone(), - event: Some(Event::Log(LogChunk { - line: format!("[runner] error reading stderr: {e}"), - stderr: true, - })), - }) - .await; - break; + } + }); + } + if let Some(stderr) = child.stderr.take() { + let mut reader = BufReader::new(stderr); + let last_err2 = last_err.clone(); + tokio::spawn(async move { + loop { + let mut buf = Vec::with_capacity(256); + match reader.read_until(b'\n', &mut buf).await { + Ok(0) => break, + Ok(_) => { + let line = String::from_utf8_lossy(&buf).trim_end_matches(['\n', '\r']).to_string(); + eprintln!("{}", ndjson_line("job_run", "error", &line, None)); + if let Ok(mut dq) = last_err2.lock() { + if dq.len() == 20 { dq.pop_front(); } + dq.push_back(line); } } + Err(e) => { + eprintln!("{}", ndjson_line("job_run", "error", &format!("error reading stderr: {}", e), None)); + break; + } } - }); - } - } else { - // If no streaming, still attach to child I/O to avoid blocking - let _ = child.stdout.take(); - let _ = child.stderr.take(); + } + }); } let status = child.wait().await.into_diagnostic()?; - Ok(status.code().unwrap_or(1)) + let code = status.code().unwrap_or(1); + + if code != 0 { + // Emit a concise failure summary (structured) + eprintln!("{}", ndjson_line("job_run", "error", &format!("job script exited with code {}", code), None)); + + // Include recent stderr lines for context (structured) + let lines: Vec = last_err + .lock() + .ok() + .map(|dq| dq.iter().cloned().collect()) + .unwrap_or_default(); + if lines.is_empty() { + eprintln!("{}", ndjson_line("job_run", "warn", "no stderr lines were captured from the script", None)); + } else { + eprintln!("{}", ndjson_line("job_run", "info", "recent stderr lines follow", None)); + for l in lines { + eprintln!("{}", ndjson_line("job_run", "error", &l, None)); + } + } + } + + Ok(code) } #[tokio::main(flavor = "multi_thread")] @@ -320,92 +395,31 @@ async fn main() -> Result<()> { info!(%repo, %sha, "runner starting"); let workdir = std::env::var("SOLSTICE_WORKDIR").unwrap_or_else(|_| "/root/work".into()); - // Setup gRPC streaming if orchestrator address and request id are provided - let orch_addr = std::env::var("SOLSTICE_ORCH_ADDR").ok(); - let request_id_env = std::env::var("SOLSTICE_REQUEST_ID").ok(); - // Use provided request id or empty string (server will warn if invalid) - let request_id_effective = request_id_env.unwrap_or_default(); - let mut tx_opt: Option> = None; - let mut stream_handle: Option> = None; - if let Some(addr) = orch_addr.clone() { - match RunnerClient::connect(format!("http://{addr}")).await { - Ok(mut client) => { - let (tx, rx) = mpsc::channel::(512); - let stream = ReceiverStream::new(rx); - // Spawn client task and keep a handle so we can await graceful flush on shutdown. - let handle = tokio::spawn(async move { - if let Err(e) = client.stream_logs(stream).await { - warn!(error = %e, "log stream to orchestrator terminated with error"); - } - }); - stream_handle = Some(handle); - tx_opt = Some(tx); - } - Err(e) => { - // Explicitly inform console so serial capture gets this - eprintln!("[runner] failed to connect to orchestrator gRPC at {}: {e}. Falling back to serial console only.", addr); - } - } - } - // Emit a first line visibly regardless of streaming - println!("runner starting: repo={} sha={}", repo, sha); - if let Some(ref tx) = tx_opt { - let _ = tx - .send(LogItem { - request_id: request_id_effective.clone(), - event: Some(Event::Log(LogChunk { - line: format!("runner starting: repo={repo} sha={sha}"), - stderr: false, - })), - }) - .await; + // Emit startup environment and tool checks + let uname = Command::new("/bin/sh").arg("-lc").arg("uname -a || echo unknown").output().await.ok() + .and_then(|o| String::from_utf8(o.stdout).ok()).unwrap_or_else(|| "unknown".into()); + println!("{}", ndjson_line("env", "info", "system", Some(serde_json::json!({"uname": uname.trim()})))); + + // Preflight environment checks (tools, workdir, network) + if let Err(e) = preflight(&repo, &workdir).await { + eprintln!("{}", ndjson_line("env_setup", "error", &format!("preflight failed: {}", e), None)); + std::process::exit(1); } + // Announce workspace + println!("{}", ndjson_line("env_setup", "info", "workdir", Some(serde_json::json!({"path": workdir})))); + let code = match ensure_repo(&repo, &sha, &workdir).await { Ok(_) => { // proceed to run job script - run_job_script_streamed(&workdir, tx_opt.clone(), &request_id_effective).await? + run_job_script(&workdir).await? } Err(e) => { - // Report checkout failure to orchestrator and return non-zero - if let Some(tx) = tx_opt.clone() { - let _ = tx - .send(LogItem { - request_id: request_id_effective.clone(), - event: Some(Event::Log(LogChunk { - line: format!("[runner] failed to prepare repo: {e}"), - stderr: true, - })), - }) - .await; - } + eprintln!("[runner] failed to prepare repo: {}", e); 1 } }; - // Send JobEnd if streaming enabled - if let Some(tx) = tx_opt.clone() { - let _ = tx - .send(LogItem { - request_id: request_id_effective.clone(), - event: Some(Event::End(JobEnd { - exit_code: code, - success: code == 0, - repo_url: repo.clone(), - commit_sha: sha.clone(), - })), - }) - .await; - // Drop the last sender to close the client stream and allow server to flush - drop(tx); - tx_opt = None; - // Give the client task a brief moment to flush and then await it - tokio::time::sleep(std::time::Duration::from_millis(150)).await; - if let Some(h) = stream_handle { - let _ = h.await; - } - } - if code != 0 { error!(exit_code = code, "job script failed"); std::process::exit(code); diff --git a/deploy/images/logs-service/Containerfile b/deploy/images/logs-service/Containerfile new file mode 100644 index 0000000..9d639b8 --- /dev/null +++ b/deploy/images/logs-service/Containerfile @@ -0,0 +1,29 @@ +# syntax=docker/dockerfile:1.7 +# Build Solstice Logs Service using upstream official images (no sccache) + +FROM docker.io/library/rust:bookworm AS builder +ENV CARGO_HOME=/cargo +WORKDIR /work +# Install build dependencies: protoc (for common crate), DB headers +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + protobuf-compiler pkg-config libsqlite3-dev libpq-dev ca-certificates \ + && rm -rf /var/lib/apt/lists/* +# Configure cargo target-dir so it can be cached between layers +RUN mkdir -p /cargo && printf "[build]\ntarget-dir = \"/cargo/target\"\n" > /cargo/config.toml +# Pre-copy manifests for better caching +COPY Cargo.toml ./ +COPY crates ./crates +RUN --mount=type=cache,target=/cargo/registry \ + --mount=type=cache,target=/cargo/git \ + --mount=type=cache,target=/cargo/target \ + cargo build --release -p logs-service && cp /cargo/target/release/logs-service /logs-service + +FROM docker.io/library/debian:bookworm-slim +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + libsqlite3-0 libpq5 ca-certificates \ + && rm -rf /var/lib/apt/lists/* +COPY --from=builder /logs-service /usr/local/bin/logs-service +EXPOSE 8082 +ENTRYPOINT ["/usr/local/bin/logs-service"] diff --git a/deploy/podman/compose.yml b/deploy/podman/compose.yml index 6a7afe5..b2c31e5 100644 --- a/deploy/podman/compose.yml +++ b/deploy/podman/compose.yml @@ -193,6 +193,8 @@ services: AMQP_QUEUE: solstice.jobs.v1 AMQP_ROUTING_KEY: jobrequest.v1 HTTP_ADDR: 0.0.0.0:8081 + # URL where logs-service is exposed (used for redirects) + LOGS_BASE_URL: https://logs.${ENV}.${DOMAIN} # Paths inside the container to runner binaries that will be uploaded over SSH RUNNER_LINUX_PATH: /opt/solstice/runners/solstice-runner-linux RUNNER_ILLUMOS_PATH: /opt/solstice/runners/solstice-runner-illumos @@ -240,21 +242,31 @@ services: - traefik.http.routers.api.tls.certresolver=le - traefik.http.services.api.loadbalancer.server.port=8081 - orchestrator-logs-proxy: - image: docker.io/library/nginx:alpine - container_name: solstice-orchestrator-logs + logs-service: + build: + context: ../.. + dockerfile: deploy/images/logs-service/Containerfile + image: local/solstice-logs-service:latest + container_name: solstice-logs-service restart: unless-stopped - volumes: - - ./nginx/orchestrator-logs.conf:/etc/nginx/conf.d/default.conf:ro,Z + environment: + RUST_LOG: info + DATABASE_URL: postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/solstice_${ENV} + HTTP_ADDR: 0.0.0.0:8082 + depends_on: + postgres: + condition: service_healthy + postgres-setup: + condition: service_completed_successfully networks: - core labels: - traefik.enable=true - # Expose orchestrator HTTP (logs) running on the host via a tiny proxy + # Expose logs service at logs.${ENV}.${DOMAIN} - traefik.http.routers.logs.rule=Host(`logs.${ENV}.${DOMAIN}`) - traefik.http.routers.logs.entrypoints=websecure - traefik.http.routers.logs.tls.certresolver=le - - traefik.http.services.logs.loadbalancer.server.port=80 + - traefik.http.services.logs.loadbalancer.server.port=8082 forge-integration: build: @@ -275,6 +287,8 @@ services: WEBHOOK_SECRET: ${WEBHOOK_SECRET} FORGEJO_TOKEN: ${FORGEJO_TOKEN} FORGEJO_BASE_URL: ${FORGEJO_BASE_URL} + # URL where logs-service is exposed (used for commit status links) + LOGS_BASE_URL: https://logs.${ENV}.${DOMAIN} depends_on: rabbitmq: condition: service_healthy