2025-11-18 11:48:09 +01:00
|
|
|
use axum::{extract::Path, http::StatusCode, response::{IntoResponse, Response}, routing::get, Json, Router};
|
|
|
|
|
use clap::Parser;
|
|
|
|
|
use miette::{IntoDiagnostic as _, Result};
|
2025-11-18 12:28:22 +01:00
|
|
|
use sea_orm::{entity::prelude::*, Database, DatabaseConnection, QueryOrder, ColumnTrait, QueryFilter, QuerySelect};
|
|
|
|
|
use sea_orm::sea_query::Expr;
|
2025-11-18 11:48:09 +01:00
|
|
|
use sea_orm_migration::MigratorTrait;
|
|
|
|
|
use serde::Serialize;
|
|
|
|
|
use std::net::SocketAddr;
|
|
|
|
|
use tracing::{info, warn};
|
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
|
|
|
#[derive(Parser, Debug)]
|
|
|
|
|
#[command(name = "solstice-logs", version, about = "Solstice CI — Logs Service")]
|
|
|
|
|
struct Opts {
|
|
|
|
|
/// HTTP bind address
|
|
|
|
|
#[arg(long, env = "HTTP_ADDR", default_value = "0.0.0.0:8082")]
|
|
|
|
|
http_addr: String,
|
|
|
|
|
|
|
|
|
|
/// Database URL
|
|
|
|
|
#[arg(long, env = "DATABASE_URL")]
|
|
|
|
|
database_url: String,
|
|
|
|
|
|
|
|
|
|
/// OTLP endpoint (e.g., http://localhost:4317)
|
|
|
|
|
#[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")]
|
|
|
|
|
otlp: Option<String>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct AppState { db: DatabaseConnection }
|
|
|
|
|
|
|
|
|
|
#[tokio::main(flavor = "multi_thread")]
|
|
|
|
|
async fn main() -> Result<()> {
|
|
|
|
|
let _t = common::init_tracing("solstice-logs-service")?;
|
|
|
|
|
let opts = Opts::parse();
|
|
|
|
|
let db = Database::connect(opts.database_url).await.into_diagnostic()?;
|
|
|
|
|
migration::Migrator::up(&db, None).await.into_diagnostic()?;
|
|
|
|
|
|
|
|
|
|
let state = AppState { db };
|
|
|
|
|
let router = Router::new()
|
|
|
|
|
.route("/jobs/{request_id}/logs", get(list_logs))
|
|
|
|
|
.route("/jobs/{request_id}/logs/{category}", get(get_logs_by_category))
|
|
|
|
|
.with_state(state);
|
|
|
|
|
|
|
|
|
|
let addr: SocketAddr = opts.http_addr.parse().expect("invalid HTTP_ADDR");
|
|
|
|
|
info!(%addr, "logs-service starting");
|
|
|
|
|
axum::serve(
|
|
|
|
|
tokio::net::TcpListener::bind(addr).await.expect("bind"),
|
|
|
|
|
router,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
.into_diagnostic()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mod job_logs {
|
|
|
|
|
use super::*;
|
|
|
|
|
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
|
|
|
|
|
#[sea_orm(table_name = "job_logs")]
|
|
|
|
|
pub struct Model {
|
|
|
|
|
#[sea_orm(primary_key, auto_increment = false)]
|
|
|
|
|
pub request_id: Uuid,
|
|
|
|
|
#[sea_orm(primary_key, auto_increment = false)]
|
|
|
|
|
pub seq: i64,
|
|
|
|
|
pub ts: chrono::DateTime<chrono::Utc>,
|
|
|
|
|
pub stderr: bool,
|
|
|
|
|
pub line: String,
|
|
|
|
|
pub category: String,
|
|
|
|
|
pub level: Option<String>,
|
|
|
|
|
pub fields: Option<String>,
|
|
|
|
|
pub has_error: bool,
|
|
|
|
|
}
|
|
|
|
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
|
|
|
|
pub enum Relation {}
|
|
|
|
|
impl ActiveModelBehavior for ActiveModel {}
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-18 12:28:22 +01:00
|
|
|
#[derive(Serialize, sea_orm::FromQueryResult)]
|
2025-11-18 11:48:09 +01:00
|
|
|
struct LogCategorySummary {
|
|
|
|
|
category: String,
|
|
|
|
|
count: i64,
|
|
|
|
|
has_errors: bool,
|
|
|
|
|
first_ts: chrono::DateTime<chrono::Utc>,
|
|
|
|
|
last_ts: chrono::DateTime<chrono::Utc>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn list_logs(Path(request_id): Path<String>, axum::extract::State(state): axum::extract::State<AppState>) -> Response {
|
|
|
|
|
let Ok(id) = Uuid::parse_str(&request_id) else { return StatusCode::BAD_REQUEST.into_response(); };
|
2025-11-18 12:28:22 +01:00
|
|
|
|
|
|
|
|
let query = job_logs::Entity::find()
|
|
|
|
|
.select_only()
|
|
|
|
|
.column(job_logs::Column::Category)
|
|
|
|
|
.expr(Expr::col(job_logs::Column::Seq).count())
|
|
|
|
|
.expr(Expr::col(job_logs::Column::Ts).min())
|
|
|
|
|
.expr(Expr::col(job_logs::Column::Ts).max())
|
|
|
|
|
.filter(job_logs::Column::RequestId.eq(id))
|
|
|
|
|
.group_by(job_logs::Column::Category)
|
|
|
|
|
.order_by_asc(job_logs::Column::Category);
|
|
|
|
|
|
|
|
|
|
// Aggregate basic stats per category
|
|
|
|
|
let tuples: miette::Result<Vec<(String, i64, chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)>> = query
|
|
|
|
|
.into_tuple()
|
|
|
|
|
.all(&state.db)
|
|
|
|
|
.await
|
|
|
|
|
.into_diagnostic();
|
|
|
|
|
|
|
|
|
|
// Separately fetch categories that have any error (portable across backends)
|
|
|
|
|
let errs_res: miette::Result<Vec<String>> = job_logs::Entity::find()
|
|
|
|
|
.select_only()
|
|
|
|
|
.column(job_logs::Column::Category)
|
|
|
|
|
.filter(job_logs::Column::RequestId.eq(id))
|
|
|
|
|
.filter(job_logs::Column::HasError.eq(true))
|
|
|
|
|
.group_by(job_logs::Column::Category)
|
|
|
|
|
.into_tuple()
|
|
|
|
|
.all(&state.db)
|
|
|
|
|
.await
|
|
|
|
|
.into_diagnostic();
|
|
|
|
|
|
|
|
|
|
match (tuples, errs_res) {
|
|
|
|
|
(Ok(rows), Ok(err_cats)) => {
|
|
|
|
|
let errset: std::collections::HashSet<String> = err_cats.into_iter().collect();
|
|
|
|
|
let out: Vec<LogCategorySummary> = rows
|
|
|
|
|
.into_iter()
|
|
|
|
|
.map(|(category, count, first_ts, last_ts)| LogCategorySummary {
|
|
|
|
|
has_errors: errset.contains(&category),
|
|
|
|
|
category,
|
|
|
|
|
count,
|
|
|
|
|
first_ts,
|
|
|
|
|
last_ts,
|
|
|
|
|
})
|
|
|
|
|
.collect();
|
|
|
|
|
Json::<Vec<LogCategorySummary>>(out).into_response()
|
|
|
|
|
}
|
|
|
|
|
(Err(e), _) | (_, Err(e)) => { warn!(error = %e, request_id = %id, "failed to query log categories"); StatusCode::INTERNAL_SERVER_ERROR.into_response() }
|
2025-11-18 11:48:09 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn get_logs_by_category(Path((request_id, category)): Path<(String, String)>, axum::extract::State(state): axum::extract::State<AppState>) -> Response {
|
|
|
|
|
let Ok(id) = Uuid::parse_str(&request_id) else { return StatusCode::BAD_REQUEST.into_response(); };
|
|
|
|
|
let rows = job_logs::Entity::find()
|
|
|
|
|
.filter(job_logs::Column::RequestId.eq(id))
|
|
|
|
|
.filter(job_logs::Column::Category.eq(category.clone()))
|
|
|
|
|
.order_by_asc(job_logs::Column::Seq)
|
|
|
|
|
.all(&state.db)
|
|
|
|
|
.await
|
|
|
|
|
.into_diagnostic();
|
|
|
|
|
match rows {
|
|
|
|
|
Ok(items) if items.is_empty() => StatusCode::NOT_FOUND.into_response(),
|
|
|
|
|
Ok(items) => {
|
|
|
|
|
let mut text = String::new();
|
|
|
|
|
for r in items {
|
|
|
|
|
if r.stderr || r.has_error || r.level.as_deref() == Some("error") {
|
|
|
|
|
text.push_str("[stderr] ");
|
|
|
|
|
}
|
|
|
|
|
text.push_str(&r.line);
|
|
|
|
|
if !text.ends_with('\n') { text.push('\n'); }
|
|
|
|
|
}
|
|
|
|
|
(
|
|
|
|
|
StatusCode::OK,
|
|
|
|
|
[(axum::http::header::CONTENT_TYPE, "text/plain; charset=utf-8")],
|
|
|
|
|
text,
|
|
|
|
|
).into_response()
|
|
|
|
|
}
|
|
|
|
|
Err(e) => { warn!(error = %e, request_id = %id, "failed to read logs"); StatusCode::INTERNAL_SERVER_ERROR.into_response() }
|
|
|
|
|
}
|
|
|
|
|
}
|