barycenter/src/jobs.rs

190 lines
6.4 KiB
Rust
Raw Normal View History

feat: add admin GraphQL API, background jobs, and user sync CLI Major Features: - Admin GraphQL API with dual endpoints (Seaography + custom) - Background job scheduler with execution tracking - Idempotent user sync CLI for Kubernetes deployments - Secure PUT /properties endpoint with Bearer token auth Admin GraphQL API: - Entity CRUD via Seaography at /admin/graphql - Custom job management API at /admin/jobs - Mutations: triggerJob - Queries: jobLogs, availableJobs - GraphiQL playgrounds for both endpoints Background Jobs: - tokio-cron-scheduler integration - Automated cleanup of expired sessions (hourly) - Automated cleanup of expired refresh tokens (hourly) - Job execution tracking in database - Manual job triggering via GraphQL User Sync CLI: - Command: barycenter sync-users --file users.json - Idempotent user synchronization from JSON - Creates new users with hashed passwords - Updates existing users (enabled, email_verified, email) - Syncs custom properties per user - Perfect for Kubernetes init containers Security Enhancements: - PUT /properties endpoint requires Bearer token - Users can only modify their own properties - Public registration disabled by default - Admin API on separate port for network isolation Database: - New job_executions table for job tracking - User update functions (update_user, update_user_email) - PostgreSQL + SQLite support maintained Configuration: - allow_public_registration setting (default: false) - admin_port setting (default: main port + 1) Documentation: - Comprehensive Kubernetes deployment guide - User sync JSON schema and examples - Init container and CronJob examples - Production deployment patterns Files Added: - src/admin_graphql.rs - GraphQL schema builders - src/admin_mutations.rs - Custom mutations and queries - src/jobs.rs - Job scheduler and tracking - src/user_sync.rs - User sync logic - src/entities/ - SeaORM entities (8 entities) - docs/kubernetes-deployment.md - K8s deployment guide - users.json.example - User sync example Dependencies: - tokio-cron-scheduler 0.13 - seaography 1.1.4 - async-graphql 7.0 - async-graphql-axum 7.0 🤖 Generated with Claude Code (https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-30 18:06:50 +01:00
use crate::entities;
use crate::errors::CrabError;
use crate::storage;
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter, Set};
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{error, info};
/// Initialize and start the job scheduler with all background tasks
pub async fn init_scheduler(db: DatabaseConnection) -> Result<JobScheduler, CrabError> {
let sched = JobScheduler::new()
.await
.map_err(|e| CrabError::Other(format!("Failed to create job scheduler: {}", e)))?;
let db_clone = db.clone();
// Cleanup expired sessions job - runs every hour
let cleanup_sessions_job = Job::new_async("0 0 * * * *", move |_uuid, _l| {
let db = db_clone.clone();
Box::pin(async move {
info!("Running cleanup_expired_sessions job");
let execution_id = start_job_execution(&db, "cleanup_expired_sessions")
.await
.ok();
match storage::cleanup_expired_sessions(&db).await {
Ok(count) => {
info!("Cleaned up {} expired sessions", count);
if let Some(id) = execution_id {
let _ = complete_job_execution(&db, id, true, None, Some(count as i64))
.await;
}
}
Err(e) => {
error!("Failed to cleanup expired sessions: {}", e);
if let Some(id) = execution_id {
let _ = complete_job_execution(
&db,
id,
false,
Some(e.to_string()),
None,
)
.await;
}
}
}
})
})
.map_err(|e| CrabError::Other(format!("Failed to create cleanup sessions job: {}", e)))?;
sched
.add(cleanup_sessions_job)
.await
.map_err(|e| CrabError::Other(format!("Failed to add cleanup sessions job: {}", e)))?;
let db_clone = db.clone();
// Cleanup expired refresh tokens job - runs every hour at 30 minutes past
let cleanup_tokens_job = Job::new_async("0 30 * * * *", move |_uuid, _l| {
let db = db_clone.clone();
Box::pin(async move {
info!("Running cleanup_expired_refresh_tokens job");
let execution_id = start_job_execution(&db, "cleanup_expired_refresh_tokens")
.await
.ok();
match storage::cleanup_expired_refresh_tokens(&db).await {
Ok(count) => {
info!("Cleaned up {} expired refresh tokens", count);
if let Some(id) = execution_id {
let _ = complete_job_execution(&db, id, true, None, Some(count as i64))
.await;
}
}
Err(e) => {
error!("Failed to cleanup expired refresh tokens: {}", e);
if let Some(id) = execution_id {
let _ = complete_job_execution(
&db,
id,
false,
Some(e.to_string()),
None,
)
.await;
}
}
}
})
})
.map_err(|e| CrabError::Other(format!("Failed to create cleanup tokens job: {}", e)))?;
sched
.add(cleanup_tokens_job)
.await
.map_err(|e| CrabError::Other(format!("Failed to add cleanup tokens job: {}", e)))?;
// Start the scheduler
sched
.start()
.await
.map_err(|e| CrabError::Other(format!("Failed to start job scheduler: {}", e)))?;
info!("Job scheduler started with {} jobs", 2);
Ok(sched)
}
/// Record the start of a job execution
pub async fn start_job_execution(
db: &DatabaseConnection,
job_name: &str,
) -> Result<i64, CrabError> {
use entities::job_execution;
let now = Utc::now().timestamp();
let execution = job_execution::ActiveModel {
id: Set(0), // Will be auto-generated
job_name: Set(job_name.to_string()),
started_at: Set(now),
completed_at: Set(None),
success: Set(None),
error_message: Set(None),
records_processed: Set(None),
};
let result = execution.insert(db).await?;
Ok(result.id)
}
/// Record the completion of a job execution
pub async fn complete_job_execution(
db: &DatabaseConnection,
execution_id: i64,
success: bool,
error_message: Option<String>,
records_processed: Option<i64>,
) -> Result<(), CrabError> {
use entities::job_execution::{Column, Entity};
let now = Utc::now().timestamp();
if let Some(execution) = Entity::find()
.filter(Column::Id.eq(execution_id))
.one(db)
.await?
{
let mut active: entities::job_execution::ActiveModel = execution.into_active_model();
active.completed_at = Set(Some(now));
active.success = Set(Some(if success { 1 } else { 0 }));
active.error_message = Set(error_message);
active.records_processed = Set(records_processed);
active.update(db).await?;
}
Ok(())
}
/// Manually trigger a job by name (useful for admin API)
pub async fn trigger_job_manually(
db: &DatabaseConnection,
job_name: &str,
) -> Result<(), CrabError> {
info!("Manually triggering job: {}", job_name);
let execution_id = start_job_execution(db, job_name).await?;
let result = match job_name {
"cleanup_expired_sessions" => storage::cleanup_expired_sessions(db).await,
"cleanup_expired_refresh_tokens" => storage::cleanup_expired_refresh_tokens(db).await,
_ => {
return Err(CrabError::Other(format!("Unknown job name: {}", job_name)));
}
};
match result {
Ok(count) => {
info!("Manually triggered job {} completed: {} records", job_name, count);
complete_job_execution(db, execution_id, true, None, Some(count as i64)).await?;
}
Err(e) => {
error!("Manually triggered job {} failed: {}", job_name, e);
complete_job_execution(db, execution_id, false, Some(e.to_string()), None).await?;
}
}
Ok(())
}