From df1a3126b1435c2adf189bd1ea2b79824f4184944096ef6553977d827e2d7cd9 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Tue, 18 Nov 2025 12:28:22 +0100 Subject: [PATCH] Refactor tool checks and log categorization for improved clarity and backend portability; bump version to 0.1.14 - Enhance runner tool check diagnostics with more descriptive output and JSON fields for better observability. - Replace raw SQL queries in `logs-service` with ORM-based logic for portable and backend-agnostic log categorization. - Add error category aggregation and structured summary reporting in logs-service. - Improve environment variable fallback mechanics for runner workdir selection. Signed-off-by: Till Wegmueller --- crates/logs-service/src/main.rs | 77 +++++++++++++++++++----------- crates/workflow-runner/src/main.rs | 14 ++++-- 2 files changed, 60 insertions(+), 31 deletions(-) diff --git a/crates/logs-service/src/main.rs b/crates/logs-service/src/main.rs index 9055f57..788adb3 100644 --- a/crates/logs-service/src/main.rs +++ b/crates/logs-service/src/main.rs @@ -1,7 +1,8 @@ use axum::{extract::Path, http::StatusCode, response::{IntoResponse, Response}, routing::get, Json, Router}; use clap::Parser; use miette::{IntoDiagnostic as _, Result}; -use sea_orm::{entity::prelude::*, Database, DatabaseConnection, QueryOrder, ColumnTrait, QueryFilter, Statement, DatabaseBackend, Value}; +use sea_orm::{entity::prelude::*, Database, DatabaseConnection, QueryOrder, ColumnTrait, QueryFilter, QuerySelect}; +use sea_orm::sea_query::Expr; use sea_orm_migration::MigratorTrait; use serde::Serialize; use std::net::SocketAddr; @@ -72,7 +73,7 @@ mod job_logs { impl ActiveModelBehavior for ActiveModel {} } -#[derive(Serialize)] +#[derive(Serialize, sea_orm::FromQueryResult)] struct LogCategorySummary { category: String, count: i64, @@ -83,33 +84,53 @@ struct LogCategorySummary { async fn list_logs(Path(request_id): Path, axum::extract::State(state): axum::extract::State) -> Response { let Ok(id) = Uuid::parse_str(&request_id) else { return StatusCode::BAD_REQUEST.into_response(); }; - // Query per-category summaries using backend-agnostic SQL + parameter binding - let backend = state.db.get_database_backend(); - let (sql, vals): (&str, Vec) = match backend { - DatabaseBackend::Postgres => ( - "SELECT category AS category, COUNT(*) AS count, MIN(ts) AS first_ts, MAX(ts) AS last_ts, MAX(has_error) AS has_errors FROM job_logs WHERE request_id = $1 GROUP BY category ORDER BY category", - vec![Value::Uuid(Some(Box::new(id)))] - ), - _ => ( - "SELECT category AS category, COUNT(*) AS count, MIN(ts) AS first_ts, MAX(ts) AS last_ts, MAX(has_error) AS has_errors FROM job_logs WHERE request_id = ? GROUP BY category ORDER BY category", - vec![Value::Uuid(Some(Box::new(id)))] - ), - }; - let stmt = Statement::from_sql_and_values(backend, sql, vals); - let rows = match state.db.query_all(stmt).await.into_diagnostic() { - Ok(r) => r, - Err(e) => { warn!(error = %e, request_id = %id, "failed to query log categories"); return StatusCode::INTERNAL_SERVER_ERROR.into_response(); } - }; - let mut out: Vec = Vec::new(); - for row in rows { - let category: String = row.try_get_by("category").unwrap_or_else(|_| "default".into()); - let count: i64 = row.try_get_by("count").unwrap_or(0); - let first_ts: chrono::DateTime = row.try_get_by("first_ts").unwrap_or_else(|_| chrono::Utc::now()); - let last_ts: chrono::DateTime = row.try_get_by("last_ts").unwrap_or_else(|_| chrono::Utc::now()); - let has_errors: bool = row.try_get_by("has_errors").unwrap_or(false); - out.push(LogCategorySummary { category, count, has_errors, first_ts, last_ts }); + + let query = job_logs::Entity::find() + .select_only() + .column(job_logs::Column::Category) + .expr(Expr::col(job_logs::Column::Seq).count()) + .expr(Expr::col(job_logs::Column::Ts).min()) + .expr(Expr::col(job_logs::Column::Ts).max()) + .filter(job_logs::Column::RequestId.eq(id)) + .group_by(job_logs::Column::Category) + .order_by_asc(job_logs::Column::Category); + + // Aggregate basic stats per category + let tuples: miette::Result, chrono::DateTime)>> = query + .into_tuple() + .all(&state.db) + .await + .into_diagnostic(); + + // Separately fetch categories that have any error (portable across backends) + let errs_res: miette::Result> = job_logs::Entity::find() + .select_only() + .column(job_logs::Column::Category) + .filter(job_logs::Column::RequestId.eq(id)) + .filter(job_logs::Column::HasError.eq(true)) + .group_by(job_logs::Column::Category) + .into_tuple() + .all(&state.db) + .await + .into_diagnostic(); + + match (tuples, errs_res) { + (Ok(rows), Ok(err_cats)) => { + let errset: std::collections::HashSet = err_cats.into_iter().collect(); + let out: Vec = rows + .into_iter() + .map(|(category, count, first_ts, last_ts)| LogCategorySummary { + has_errors: errset.contains(&category), + category, + count, + first_ts, + last_ts, + }) + .collect(); + Json::>(out).into_response() + } + (Err(e), _) | (_, Err(e)) => { warn!(error = %e, request_id = %id, "failed to query log categories"); StatusCode::INTERNAL_SERVER_ERROR.into_response() } } - Json(out).into_response() } async fn get_logs_by_category(Path((request_id, category)): Path<(String, String)>, axum::extract::State(state): axum::extract::State) -> Response { diff --git a/crates/workflow-runner/src/main.rs b/crates/workflow-runner/src/main.rs index ab8bb14..d418c4c 100644 --- a/crates/workflow-runner/src/main.rs +++ b/crates/workflow-runner/src/main.rs @@ -143,7 +143,12 @@ async fn preflight(repo: &str, workdir: &str) -> Result<()> { let has_tar = has_cmd("tar").await; for (tool, ok) in [("git", has_git), ("curl", has_curl), ("wget", has_wget), ("tar", has_tar)] { let lvl = if ok { "info" } else { "warn" }; - println!("{}", ndjson_line("tool_check", lvl, tool, Some(serde_json::json!({"available": ok})))); + let msg = if ok { + format!("tool {tool}: available") + } else { + format!("tool {tool}: missing") + }; + println!("{}", ndjson_line("tool_check", lvl, &msg, Some(serde_json::json!({"available": ok, "tool": tool})))); } let can_clone = has_git || (has_tar && (has_curl || has_wget)); let lvl = if can_clone { "info" } else { "error" }; @@ -393,7 +398,10 @@ async fn main() -> Result<()> { }; info!(%repo, %sha, "runner starting"); - let workdir = std::env::var("SOLSTICE_WORKDIR").unwrap_or_else(|_| "/root/work".into()); + // Workdir selection: prefer explicit SOLSTICE_WORKDIR, otherwise default to "$HOME/work" + let workdir = std::env::var("SOLSTICE_WORKDIR").ok().or_else(|| { + std::env::var("HOME").ok().map(|home| format!("{}/work", home)) + }).unwrap_or_else(|| "/root/work".into()); // Emit startup environment and tool checks let uname = Command::new("/bin/sh").arg("-lc").arg("uname -a || echo unknown").output().await.ok() @@ -415,7 +423,7 @@ async fn main() -> Result<()> { run_job_script(&workdir).await? } Err(e) => { - eprintln!("[runner] failed to prepare repo: {}", e); + eprintln!("{}", ndjson_line("env_setup", "error", &format!("failed to prepare repo: {}", e), None)); 1 } };