feat: add admin GraphQL API, background jobs, and user sync CLI

Major Features:
- Admin GraphQL API with dual endpoints (Seaography + custom)
- Background job scheduler with execution tracking
- Idempotent user sync CLI for Kubernetes deployments
- Secure PUT /properties endpoint with Bearer token auth

Admin GraphQL API:
- Entity CRUD via Seaography at /admin/graphql
- Custom job management API at /admin/jobs
- Mutations: triggerJob
- Queries: jobLogs, availableJobs
- GraphiQL playgrounds for both endpoints

Background Jobs:
- tokio-cron-scheduler integration
- Automated cleanup of expired sessions (hourly)
- Automated cleanup of expired refresh tokens (hourly)
- Job execution tracking in database
- Manual job triggering via GraphQL

User Sync CLI:
- Command: barycenter sync-users --file users.json
- Idempotent user synchronization from JSON
- Creates new users with hashed passwords
- Updates existing users (enabled, email_verified, email)
- Syncs custom properties per user
- Perfect for Kubernetes init containers

Security Enhancements:
- PUT /properties endpoint requires Bearer token
- Users can only modify their own properties
- Public registration disabled by default
- Admin API on separate port for network isolation

Database:
- New job_executions table for job tracking
- User update functions (update_user, update_user_email)
- PostgreSQL + SQLite support maintained

Configuration:
- allow_public_registration setting (default: false)
- admin_port setting (default: main port + 1)

Documentation:
- Comprehensive Kubernetes deployment guide
- User sync JSON schema and examples
- Init container and CronJob examples
- Production deployment patterns

Files Added:
- src/admin_graphql.rs - GraphQL schema builders
- src/admin_mutations.rs - Custom mutations and queries
- src/jobs.rs - Job scheduler and tracking
- src/user_sync.rs - User sync logic
- src/entities/ - SeaORM entities (8 entities)
- docs/kubernetes-deployment.md - K8s deployment guide
- users.json.example - User sync example

Dependencies:
- tokio-cron-scheduler 0.13
- seaography 1.1.4
- async-graphql 7.0
- async-graphql-axum 7.0

🤖 Generated with Claude Code (https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Till Wegmueller 2025-11-30 18:06:50 +01:00
parent 06ff10dda9
commit a1056bb237
No known key found for this signature in database
22 changed files with 3331 additions and 349 deletions

View file

@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Project Overview ## Project Overview
Barycenter is an OpenID Connect Identity Provider (IdP) implementing OAuth 2.0 Authorization Code flow with PKCE. The project is written in Rust using axum for the web framework, SeaORM for database access (SQLite), and josekit for JOSE/JWT operations. Barycenter is an OpenID Connect Identity Provider (IdP) implementing OAuth 2.0 Authorization Code flow with PKCE. The project is written in Rust using axum for the web framework, SeaORM for database access (SQLite and PostgreSQL), and josekit for JOSE/JWT operations.
## Build and Development Commands ## Build and Development Commands
@ -68,6 +68,27 @@ The application loads configuration from:
Environment variables use double underscores as separators for nested keys. Environment variables use double underscores as separators for nested keys.
### Database Configuration
Barycenter supports both SQLite and PostgreSQL databases. The database backend is automatically detected from the connection string:
**SQLite (default):**
```toml
[database]
url = "sqlite://barycenter.db?mode=rwc"
```
**PostgreSQL:**
```toml
[database]
url = "postgresql://user:password@localhost/barycenter"
```
Or via environment variable:
```bash
export BARYCENTER__DATABASE__URL="postgresql://user:password@localhost/barycenter"
```
## Architecture and Module Structure ## Architecture and Module Structure
### Entry Point (`src/main.rs`) ### Entry Point (`src/main.rs`)
@ -81,14 +102,14 @@ The application initializes in this order:
### Settings (`src/settings.rs`) ### Settings (`src/settings.rs`)
Manages configuration with four main sections: Manages configuration with four main sections:
- `Server`: listen address and public base URL (issuer) - `Server`: listen address and public base URL (issuer)
- `Database`: SQLite connection string - `Database`: database connection string (SQLite or PostgreSQL)
- `Keys`: JWKS and private key paths, signing algorithm - `Keys`: JWKS and private key paths, signing algorithm
- `Federation`: trust anchor URLs (future use) - `Federation`: trust anchor URLs (future use)
The `issuer()` method returns the OAuth issuer URL, preferring `public_base_url` or falling back to `http://{host}:{port}`. The `issuer()` method returns the OAuth issuer URL, preferring `public_base_url` or falling back to `http://{host}:{port}`.
### Storage (`src/storage.rs`) ### Storage (`src/storage.rs`)
Database layer with raw SQL using SeaORM's `DatabaseConnection`. Tables: Database layer with raw SQL using SeaORM's `DatabaseConnection`. Supports both SQLite and PostgreSQL backends, automatically detected from the connection string. Tables:
- `clients`: OAuth client registrations (client_id, client_secret, redirect_uris) - `clients`: OAuth client registrations (client_id, client_secret, redirect_uris)
- `auth_codes`: Authorization codes with PKCE challenge, subject, scope, nonce - `auth_codes`: Authorization codes with PKCE challenge, subject, scope, nonce
- `access_tokens`: Bearer tokens with subject, scope, expiration - `access_tokens`: Bearer tokens with subject, scope, expiration

1022
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -23,8 +23,8 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
serde_with = "3" serde_with = "3"
# SeaORM for SQLite # SeaORM for SQLite and PostgreSQL
sea-orm = { version = "1", default-features = false, features = ["sqlx-sqlite", "runtime-tokio-rustls"] } sea-orm = { version = "1", default-features = false, features = ["sqlx-sqlite", "sqlx-postgres", "runtime-tokio-rustls", "macros"] }
# JOSE / JWKS & JWT # JOSE / JWKS & JWT
josekit = "0.10" josekit = "0.10"
@ -48,6 +48,14 @@ tower_governor = "0.4"
regex = "1" regex = "1"
url = "2" url = "2"
# GraphQL Admin API
seaography = { version = "1", features = ["with-decimal", "with-chrono", "with-uuid"] }
async-graphql = "7"
async-graphql-axum = "7"
# Background job scheduler
tokio-cron-scheduler = "0.13"
[dev-dependencies] [dev-dependencies]
openidconnect = { version = "4", features = ["reqwest-blocking"] } openidconnect = { version = "4", features = ["reqwest-blocking"] }
oauth2 = "5" oauth2 = "5"

View file

@ -0,0 +1,931 @@
# Kubernetes Deployment Guide
This guide covers deploying Barycenter in Kubernetes with user sync, persistent storage, and proper configuration management.
## Table of Contents
- [Quick Start](#quick-start)
- [Architecture Overview](#architecture-overview)
- [Prerequisites](#prerequisites)
- [Configuration](#configuration)
- [Storage](#storage)
- [User Management](#user-management)
- [Deployment](#deployment)
- [Services](#services)
- [Complete Example](#complete-example)
- [Production Considerations](#production-considerations)
## Quick Start
```bash
# Create namespace
kubectl create namespace barycenter
# Apply all manifests
kubectl apply -f deploy/kubernetes/
```
## Architecture Overview
```
┌─────────────────────────────────────────────────────────┐
│ Kubernetes Cluster │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Barycenter Deployment │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌──────────────────────┐ │ │
│ │ │ Init Container │───▶│ Main Container │ │ │
│ │ │ (User Sync) │ │ (OIDC Server) │ │ │
│ │ └─────────────────┘ └──────────────────────┘ │ │
│ │ │ │ │ │
│ │ └────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌────────────────────┐ │ │
│ │ │ PersistentVolume │ │ │
│ │ │ (SQLite/Data) │ │ │
│ │ └────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────┐ ┌────────────────┐ │
│ │ ConfigMap │ │ Secret │ │
│ │ (config.toml) │ │ (users.json) │ │
│ └────────────────┘ └────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Services │ │
│ │ ┌──────────────┐ ┌──────────────────┐ │ │
│ │ │ Public │ │ Admin │ │ │
│ │ │ (Port 8080) │ │ (Port 8081) │ │ │
│ │ └──────────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
```
## Prerequisites
- Kubernetes cluster (1.20+)
- `kubectl` configured
- Container registry access (or Docker Hub)
- Persistent storage provisioner (optional, for production)
## Configuration
### ConfigMap for Application Configuration
Create `barycenter-config.yaml`:
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: barycenter-config
namespace: barycenter
data:
config.toml: |
[server]
host = "0.0.0.0"
port = 8080
admin_port = 8081
# IMPORTANT: Set this to your actual domain in production
public_base_url = "https://auth.example.com"
allow_public_registration = false
[database]
# SQLite for simplicity (use PostgreSQL in production)
url = "sqlite:///data/barycenter.db?mode=rwc"
[keys]
jwks_path = "/data/jwks.json"
private_key_path = "/data/private_key.pem"
alg = "RS256"
[federation]
trust_anchors = []
```
### Secret for User Data
Create `barycenter-users-secret.yaml`:
```yaml
apiVersion: v1
kind: Secret
metadata:
name: barycenter-users
namespace: barycenter
type: Opaque
stringData:
users.json: |
{
"users": [
{
"username": "admin",
"email": "admin@example.com",
"password": "CHANGE-ME-IN-PRODUCTION",
"enabled": true,
"email_verified": true,
"properties": {
"role": "administrator",
"display_name": "System Administrator"
}
},
{
"username": "app-service",
"email": "service@example.com",
"password": "service-account-password",
"enabled": true,
"email_verified": true,
"properties": {
"role": "service_account",
"display_name": "Application Service Account"
}
}
]
}
```
**IMPORTANT:** In production, generate this secret from a secure source:
```bash
# Generate from template with environment variables
cat users.json.template | envsubst | kubectl create secret generic barycenter-users \
--namespace=barycenter \
--from-file=users.json=/dev/stdin \
--dry-run=client -o yaml | kubectl apply -f -
```
## Storage
### Development: EmptyDir (Data lost on pod restart)
```yaml
volumes:
- name: data
emptyDir: {}
```
### Production: PersistentVolumeClaim
Create `barycenter-pvc.yaml`:
```yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: barycenter-data
namespace: barycenter
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
# Optional: Use a specific storage class
# storageClassName: fast-ssd
```
## User Management
### Init Container for User Sync
The init container runs before the main application starts and syncs users from the secret:
```yaml
initContainers:
- name: user-sync
image: your-registry/barycenter:latest
command:
- barycenter
- sync-users
- --file
- /secrets/users.json
volumeMounts:
- name: users-secret
mountPath: /secrets
readOnly: true
- name: data
mountPath: /data
- name: config
mountPath: /app
readOnly: true
env:
- name: RUST_LOG
value: info
```
### Standalone User Sync Job
For updating users without redeploying:
Create `barycenter-user-sync-job.yaml`:
```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: barycenter-user-sync
namespace: barycenter
spec:
backoffLimit: 3
template:
metadata:
labels:
app: barycenter
component: user-sync
spec:
restartPolicy: OnFailure
containers:
- name: user-sync
image: your-registry/barycenter:latest
command:
- barycenter
- sync-users
- --file
- /secrets/users.json
volumeMounts:
- name: users-secret
mountPath: /secrets
readOnly: true
- name: data
mountPath: /data
- name: config
mountPath: /app
readOnly: true
env:
- name: RUST_LOG
value: info
volumes:
- name: users-secret
secret:
secretName: barycenter-users
- name: data
persistentVolumeClaim:
claimName: barycenter-data
- name: config
configMap:
name: barycenter-config
```
Run with:
```bash
kubectl apply -f barycenter-user-sync-job.yaml
# Watch progress
kubectl logs -f job/barycenter-user-sync -n barycenter
# Clean up job after completion
kubectl delete job barycenter-user-sync -n barycenter
```
### CronJob for Periodic User Sync
Create `barycenter-user-sync-cronjob.yaml`:
```yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: barycenter-user-sync
namespace: barycenter
spec:
# Run every hour
schedule: "0 * * * *"
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
backoffLimit: 2
template:
metadata:
labels:
app: barycenter
component: user-sync
spec:
restartPolicy: OnFailure
containers:
- name: user-sync
image: your-registry/barycenter:latest
command:
- barycenter
- sync-users
- --file
- /secrets/users.json
volumeMounts:
- name: users-secret
mountPath: /secrets
readOnly: true
- name: data
mountPath: /data
- name: config
mountPath: /app
readOnly: true
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
volumes:
- name: users-secret
secret:
secretName: barycenter-users
- name: data
persistentVolumeClaim:
claimName: barycenter-data
- name: config
configMap:
name: barycenter-config
```
## Deployment
### Main Deployment
Create `barycenter-deployment.yaml`:
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: barycenter
namespace: barycenter
labels:
app: barycenter
spec:
replicas: 1 # NOTE: SQLite supports only 1 replica. Use PostgreSQL for HA.
selector:
matchLabels:
app: barycenter
template:
metadata:
labels:
app: barycenter
spec:
# Init container syncs users before app starts
initContainers:
- name: user-sync
image: your-registry/barycenter:latest
command:
- barycenter
- sync-users
- --file
- /secrets/users.json
volumeMounts:
- name: users-secret
mountPath: /secrets
readOnly: true
- name: data
mountPath: /data
- name: config
mountPath: /app
readOnly: true
env:
- name: RUST_LOG
value: info
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
# Main application container
containers:
- name: barycenter
image: your-registry/barycenter:latest
ports:
- name: public
containerPort: 8080
protocol: TCP
- name: admin
containerPort: 8081
protocol: TCP
volumeMounts:
- name: data
mountPath: /data
- name: config
mountPath: /app
readOnly: true
env:
- name: RUST_LOG
value: info
# Liveness probe - checks if app is alive
livenessProbe:
httpGet:
path: /.well-known/openid-configuration
port: public
initialDelaySeconds: 10
periodSeconds: 30
timeoutSeconds: 5
failureThreshold: 3
# Readiness probe - checks if app is ready to serve traffic
readinessProbe:
httpGet:
path: /.well-known/openid-configuration
port: public
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
resources:
requests:
memory: "256Mi"
cpu: "200m"
limits:
memory: "512Mi"
cpu: "500m"
securityContext:
runAsNonRoot: true
runAsUser: 1000
readOnlyRootFilesystem: false
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
volumes:
- name: users-secret
secret:
secretName: barycenter-users
- name: data
persistentVolumeClaim:
claimName: barycenter-data
- name: config
configMap:
name: barycenter-config
# Security context for the pod
securityContext:
fsGroup: 1000
```
## Services
### Public Service (OIDC Endpoints)
Create `barycenter-service-public.yaml`:
```yaml
apiVersion: v1
kind: Service
metadata:
name: barycenter-public
namespace: barycenter
labels:
app: barycenter
component: public
spec:
type: ClusterIP
ports:
- port: 8080
targetPort: public
protocol: TCP
name: http
selector:
app: barycenter
```
### Admin Service (GraphQL API)
Create `barycenter-service-admin.yaml`:
```yaml
apiVersion: v1
kind: Service
metadata:
name: barycenter-admin
namespace: barycenter
labels:
app: barycenter
component: admin
spec:
type: ClusterIP
ports:
- port: 8081
targetPort: admin
protocol: TCP
name: http
selector:
app: barycenter
```
### Ingress (Optional)
Create `barycenter-ingress.yaml`:
```yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: barycenter
namespace: barycenter
annotations:
# cert-manager for TLS
cert-manager.io/cluster-issuer: letsencrypt-prod
# nginx ingress specific
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
spec:
ingressClassName: nginx
tls:
- hosts:
- auth.example.com
secretName: barycenter-tls
rules:
# Public OIDC endpoints
- host: auth.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: barycenter-public
port:
number: 8080
---
# Separate ingress for admin (restrict access)
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: barycenter-admin
namespace: barycenter
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/ssl-redirect: "true"
# Restrict to internal IPs only
nginx.ingress.kubernetes.io/whitelist-source-range: "10.0.0.0/8,172.16.0.0/12,192.168.0.0/16"
spec:
ingressClassName: nginx
tls:
- hosts:
- admin.auth.example.com
secretName: barycenter-admin-tls
rules:
- host: admin.auth.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: barycenter-admin
port:
number: 8081
```
## Complete Example
Create a directory structure:
```
deploy/kubernetes/
├── namespace.yaml
├── configmap.yaml
├── secret.yaml
├── pvc.yaml
├── deployment.yaml
├── service-public.yaml
├── service-admin.yaml
└── ingress.yaml
```
### namespace.yaml
```yaml
apiVersion: v1
kind: Namespace
metadata:
name: barycenter
labels:
name: barycenter
```
### Apply All Resources
```bash
# Create namespace first
kubectl apply -f deploy/kubernetes/namespace.yaml
# Apply configuration and storage
kubectl apply -f deploy/kubernetes/configmap.yaml
kubectl apply -f deploy/kubernetes/secret.yaml
kubectl apply -f deploy/kubernetes/pvc.yaml
# Deploy application
kubectl apply -f deploy/kubernetes/deployment.yaml
# Expose services
kubectl apply -f deploy/kubernetes/service-public.yaml
kubectl apply -f deploy/kubernetes/service-admin.yaml
# Optional: Create ingress
kubectl apply -f deploy/kubernetes/ingress.yaml
```
### Verify Deployment
```bash
# Check all resources
kubectl get all -n barycenter
# Check init container logs (user sync)
kubectl logs -n barycenter deployment/barycenter -c user-sync
# Check main container logs
kubectl logs -n barycenter deployment/barycenter -c barycenter -f
# Check services
kubectl get svc -n barycenter
# Port forward for testing
kubectl port-forward -n barycenter svc/barycenter-public 8080:8080
kubectl port-forward -n barycenter svc/barycenter-admin 8081:8081
# Test OIDC discovery
curl http://localhost:8080/.well-known/openid-configuration
# Test admin GraphQL
curl http://localhost:8081/admin/playground
```
## Production Considerations
### High Availability
**SQLite Limitation:**
- SQLite only supports single writer
- For HA, use PostgreSQL instead
**PostgreSQL Setup:**
1. Update `configmap.yaml`:
```yaml
[database]
url = "postgresql://barycenter:password@postgres-service:5432/barycenter"
```
2. Deploy PostgreSQL (or use cloud provider):
```yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres
namespace: barycenter
spec:
serviceName: postgres
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:16
ports:
- containerPort: 5432
env:
- name: POSTGRES_DB
value: barycenter
- name: POSTGRES_USER
value: barycenter
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secret
key: password
volumeMounts:
- name: data
mountPath: /var/lib/postgresql/data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 20Gi
```
3. Scale deployment:
```yaml
spec:
replicas: 3 # Now safe with PostgreSQL
```
### Security Hardening
1. **Network Policies:**
```yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: barycenter-network-policy
namespace: barycenter
spec:
podSelector:
matchLabels:
app: barycenter
policyTypes:
- Ingress
- Egress
ingress:
# Allow from ingress controller
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
ports:
- protocol: TCP
port: 8080
# Admin access only from internal
- from:
- podSelector:
matchLabels:
role: admin
ports:
- protocol: TCP
port: 8081
egress:
# Allow DNS
- to:
- namespaceSelector:
matchLabels:
name: kube-system
ports:
- protocol: UDP
port: 53
# Allow PostgreSQL
- to:
- podSelector:
matchLabels:
app: postgres
ports:
- protocol: TCP
port: 5432
```
2. **Pod Security Standards:**
```yaml
apiVersion: v1
kind: Namespace
metadata:
name: barycenter
labels:
pod-security.kubernetes.io/enforce: restricted
pod-security.kubernetes.io/audit: restricted
pod-security.kubernetes.io/warn: restricted
```
3. **Resource Quotas:**
```yaml
apiVersion: v1
kind: ResourceQuota
metadata:
name: barycenter-quota
namespace: barycenter
spec:
hard:
requests.cpu: "2"
requests.memory: 4Gi
limits.cpu: "4"
limits.memory: 8Gi
persistentvolumeclaims: "5"
```
### Monitoring
1. **ServiceMonitor (Prometheus Operator):**
```yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: barycenter
namespace: barycenter
spec:
selector:
matchLabels:
app: barycenter
endpoints:
- port: http
interval: 30s
path: /metrics # Add metrics endpoint to Barycenter
```
2. **Logging:**
```yaml
# Add to deployment
env:
- name: RUST_LOG
value: "info,barycenter=debug"
```
### Backup
```yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: barycenter-backup
namespace: barycenter
spec:
schedule: "0 2 * * *" # Daily at 2 AM
jobTemplate:
spec:
template:
spec:
containers:
- name: backup
image: alpine:latest
command:
- sh
- -c
- |
apk add --no-cache sqlite
sqlite3 /data/barycenter.db ".backup /backup/barycenter-$(date +%Y%m%d-%H%M%S).db"
# Upload to S3/GCS/etc
volumeMounts:
- name: data
mountPath: /data
readOnly: true
- name: backup
mountPath: /backup
volumes:
- name: data
persistentVolumeClaim:
claimName: barycenter-data
- name: backup
persistentVolumeClaim:
claimName: barycenter-backup
restartPolicy: OnFailure
```
## Troubleshooting
### Check Init Container
```bash
# View init container logs
kubectl logs -n barycenter deployment/barycenter -c user-sync
# Common issues:
# - Secret not found: Check kubectl get secret -n barycenter
# - Permission denied: Check fsGroup and securityContext
# - Database locked: Check if multiple pods are running with SQLite
```
### Check Main Container
```bash
# View logs
kubectl logs -n barycenter deployment/barycenter -c barycenter -f
# Exec into pod
kubectl exec -it -n barycenter deployment/barycenter -- sh
# Check database
ls -la /data/
```
### Update Users
```bash
# Method 1: Update secret and restart
kubectl delete secret barycenter-users -n barycenter
kubectl create secret generic barycenter-users \
--from-file=users.json=./users.json \
-n barycenter
kubectl rollout restart deployment/barycenter -n barycenter
# Method 2: Run sync job
kubectl apply -f barycenter-user-sync-job.yaml
kubectl logs -f job/barycenter-user-sync -n barycenter
```
## Summary
You now have:
- ✅ Complete Kubernetes deployment setup
- ✅ User sync via init containers
- ✅ Persistent storage configuration
- ✅ Service exposure (public + admin)
- ✅ Production-ready configurations
- ✅ HA setup with PostgreSQL
- ✅ Security hardening options
- ✅ Monitoring and backup strategies
For the actual Helm chart deployment, see `deploy/helm/barycenter/`.

113
src/admin_graphql.rs Normal file
View file

@ -0,0 +1,113 @@
use async_graphql::dynamic::Schema as DynamicSchema;
use async_graphql::EmptySubscription;
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::{
extract::State,
response::IntoResponse,
routing::{get, post},
Router,
};
use sea_orm::DatabaseConnection;
use std::sync::Arc;
use crate::admin_mutations::{AdminMutation, AdminQuery};
use crate::entities;
/// Initialize the Seaography admin GraphQL schema with all entities
pub fn build_seaography_schema(db: DatabaseConnection) -> DynamicSchema {
use seaography::{Builder, BuilderContext};
// Create a static BuilderContext for Seaography
let context: &'static BuilderContext = Box::leak(Box::new(BuilderContext::default()));
let mut builder = Builder::new(context, db.clone());
// Register all entities
builder.register_entity::<entities::user::Entity>(vec![]);
builder.register_entity::<entities::client::Entity>(vec![]);
builder.register_entity::<entities::session::Entity>(vec![]);
builder.register_entity::<entities::access_token::Entity>(vec![]);
builder.register_entity::<entities::auth_code::Entity>(vec![]);
builder.register_entity::<entities::refresh_token::Entity>(vec![]);
builder.register_entity::<entities::property::Entity>(vec![]);
builder.register_entity::<entities::job_execution::Entity>(vec![]);
// Build and return the schema
builder.schema_builder().finish().unwrap()
}
/// Build custom job management GraphQL schema
pub fn build_jobs_schema(
db: DatabaseConnection,
) -> async_graphql::Schema<AdminQuery, AdminMutation, EmptySubscription> {
async_graphql::Schema::build(AdminQuery, AdminMutation, EmptySubscription)
.data(Arc::new(db))
.finish()
}
#[derive(Clone)]
pub struct SeaographyState {
pub schema: DynamicSchema,
}
#[derive(Clone)]
pub struct JobsState {
pub schema: async_graphql::Schema<AdminQuery, AdminMutation, EmptySubscription>,
}
/// Seaography GraphQL POST handler for entity CRUD
async fn seaography_handler(
State(state): State<Arc<SeaographyState>>,
req: GraphQLRequest,
) -> GraphQLResponse {
state.schema.execute(req.into_inner()).await.into()
}
/// Jobs GraphQL POST handler for job management
async fn jobs_handler(
State(state): State<Arc<JobsState>>,
req: GraphQLRequest,
) -> GraphQLResponse {
state.schema.execute(req.into_inner()).await.into()
}
/// Seaography GraphQL playground (GraphiQL) handler
async fn seaography_playground() -> impl IntoResponse {
axum::response::Html(
async_graphql::http::GraphiQLSource::build()
.endpoint("/admin/graphql")
.finish(),
)
}
/// Jobs GraphQL playground (GraphiQL) handler
async fn jobs_playground() -> impl IntoResponse {
axum::response::Html(
async_graphql::http::GraphiQLSource::build()
.endpoint("/admin/jobs")
.finish(),
)
}
/// Create the admin API router with both Seaography and custom job endpoints
pub fn router(
seaography_schema: DynamicSchema,
jobs_schema: async_graphql::Schema<AdminQuery, AdminMutation, EmptySubscription>,
) -> Router {
let seaography_state = Arc::new(SeaographyState {
schema: seaography_schema,
});
let jobs_state = Arc::new(JobsState {
schema: jobs_schema,
});
Router::new()
// Seaography entity CRUD
.route("/admin/graphql", post(seaography_handler))
.route("/admin/playground", get(seaography_playground))
.with_state(seaography_state)
// Custom job management
.route("/admin/jobs", post(jobs_handler))
.route("/admin/jobs/playground", get(jobs_playground))
.with_state(jobs_state)
}

132
src/admin_mutations.rs Normal file
View file

@ -0,0 +1,132 @@
use async_graphql::*;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect};
use std::sync::Arc;
use crate::jobs;
/// Custom mutations for admin operations
#[derive(Default)]
pub struct AdminMutation;
#[Object]
impl AdminMutation {
/// Manually trigger a background job by name
async fn trigger_job(&self, ctx: &Context<'_>, job_name: String) -> Result<JobTriggerResult> {
let db = ctx
.data::<Arc<DatabaseConnection>>()
.map_err(|_| Error::new("Database connection not available"))?;
match jobs::trigger_job_manually(db.as_ref(), &job_name).await {
Ok(_) => Ok(JobTriggerResult {
success: true,
message: format!("Job '{}' triggered successfully", job_name),
job_name,
}),
Err(e) => Ok(JobTriggerResult {
success: false,
message: format!("Failed to trigger job '{}': {}", job_name, e),
job_name,
}),
}
}
}
/// Result of triggering a job
#[derive(SimpleObject)]
pub struct JobTriggerResult {
pub success: bool,
pub message: String,
pub job_name: String,
}
/// Custom queries for admin operations
#[derive(Default)]
pub struct AdminQuery;
#[Object]
impl AdminQuery {
/// Get recent job executions with optional filtering
async fn job_logs(
&self,
ctx: &Context<'_>,
#[graphql(desc = "Filter by job name")] job_name: Option<String>,
#[graphql(desc = "Limit number of results", default = 100)] limit: i64,
#[graphql(desc = "Only show failed jobs")] only_failures: Option<bool>,
) -> Result<Vec<JobLog>> {
let db = ctx
.data::<Arc<DatabaseConnection>>()
.map_err(|_| Error::new("Database connection not available"))?;
use crate::entities::job_execution::{Column, Entity};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder};
let mut query = Entity::find();
// Filter by job name if provided
if let Some(name) = job_name {
query = query.filter(Column::JobName.eq(name));
}
// Filter by failures if requested
if let Some(true) = only_failures {
query = query.filter(Column::Success.eq(0));
}
// Order by most recent first and limit
let results = query
.order_by_desc(Column::StartedAt)
.limit(limit as u64)
.all(db.as_ref())
.await
.map_err(|e| Error::new(format!("Database error: {}", e)))?;
Ok(results
.into_iter()
.map(|model| JobLog {
id: model.id,
job_name: model.job_name,
started_at: model.started_at,
completed_at: model.completed_at,
success: model.success,
error_message: model.error_message,
records_processed: model.records_processed,
})
.collect())
}
/// Get list of available jobs that can be triggered
async fn available_jobs(&self) -> Result<Vec<JobInfo>> {
Ok(vec![
JobInfo {
name: "cleanup_expired_sessions".to_string(),
description: "Clean up expired user sessions".to_string(),
schedule: "Hourly at :00".to_string(),
},
JobInfo {
name: "cleanup_expired_refresh_tokens".to_string(),
description: "Clean up expired refresh tokens".to_string(),
schedule: "Hourly at :30".to_string(),
},
])
}
}
/// Job log entry
#[derive(SimpleObject)]
pub struct JobLog {
pub id: i64,
pub job_name: String,
pub started_at: i64,
pub completed_at: Option<i64>,
pub success: Option<i64>,
pub error_message: Option<String>,
pub records_processed: Option<i64>,
}
/// Information about an available job
#[derive(SimpleObject)]
pub struct JobInfo {
pub name: String,
pub description: String,
pub schedule: String,
}

View file

@ -0,0 +1,20 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "access_tokens")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub token: String,
pub client_id: String,
pub subject: String,
pub scope: String,
pub created_at: i64,
pub expires_at: i64,
pub revoked: i64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

25
src/entities/auth_code.rs Normal file
View file

@ -0,0 +1,25 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "auth_codes")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub code: String,
pub client_id: String,
pub redirect_uri: String,
pub scope: String,
pub subject: String,
pub nonce: Option<String>,
pub code_challenge: String,
pub code_challenge_method: String,
pub created_at: i64,
pub expires_at: i64,
pub consumed: i64,
pub auth_time: Option<i64>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

18
src/entities/client.rs Normal file
View file

@ -0,0 +1,18 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "clients")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub client_id: String,
pub client_secret: String,
pub client_name: Option<String>,
pub redirect_uris: String, // JSON-encoded Vec<String>
pub created_at: i64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -0,0 +1,20 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "job_executions")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = true)]
pub id: i64,
pub job_name: String,
pub started_at: i64,
pub completed_at: Option<i64>,
pub success: Option<i64>, // 0 = failure, 1 = success, NULL = running
pub error_message: Option<String>,
pub records_processed: Option<i64>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

17
src/entities/mod.rs Normal file
View file

@ -0,0 +1,17 @@
pub mod access_token;
pub mod auth_code;
pub mod client;
pub mod job_execution;
pub mod property;
pub mod refresh_token;
pub mod session;
pub mod user;
pub use access_token::Entity as AccessToken;
pub use auth_code::Entity as AuthCode;
pub use client::Entity as Client;
pub use job_execution::Entity as JobExecution;
pub use property::Entity as Property;
pub use refresh_token::Entity as RefreshToken;
pub use session::Entity as Session;
pub use user::Entity as User;

18
src/entities/property.rs Normal file
View file

@ -0,0 +1,18 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "properties")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub owner: String,
#[sea_orm(primary_key, auto_increment = false)]
pub key: String,
pub value: String, // JSON-encoded Value
pub updated_at: i64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -0,0 +1,21 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "refresh_tokens")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub token: String,
pub client_id: String,
pub subject: String,
pub scope: String,
pub created_at: i64,
pub expires_at: i64,
pub revoked: i64,
pub parent_token: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

20
src/entities/session.rs Normal file
View file

@ -0,0 +1,20 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "sessions")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub session_id: String,
pub subject: String,
pub auth_time: i64,
pub created_at: i64,
pub expires_at: i64,
pub user_agent: Option<String>,
pub ip_address: Option<String>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

21
src/entities/user.rs Normal file
View file

@ -0,0 +1,21 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "users")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub subject: String,
#[sea_orm(unique)]
pub username: String,
pub password_hash: String,
pub email: Option<String>,
pub email_verified: i64,
pub created_at: i64,
pub enabled: i64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

189
src/jobs.rs Normal file
View file

@ -0,0 +1,189 @@
use crate::entities;
use crate::errors::CrabError;
use crate::storage;
use chrono::Utc;
use sea_orm::{ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter, Set};
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{error, info};
/// Initialize and start the job scheduler with all background tasks
pub async fn init_scheduler(db: DatabaseConnection) -> Result<JobScheduler, CrabError> {
let sched = JobScheduler::new()
.await
.map_err(|e| CrabError::Other(format!("Failed to create job scheduler: {}", e)))?;
let db_clone = db.clone();
// Cleanup expired sessions job - runs every hour
let cleanup_sessions_job = Job::new_async("0 0 * * * *", move |_uuid, _l| {
let db = db_clone.clone();
Box::pin(async move {
info!("Running cleanup_expired_sessions job");
let execution_id = start_job_execution(&db, "cleanup_expired_sessions")
.await
.ok();
match storage::cleanup_expired_sessions(&db).await {
Ok(count) => {
info!("Cleaned up {} expired sessions", count);
if let Some(id) = execution_id {
let _ = complete_job_execution(&db, id, true, None, Some(count as i64))
.await;
}
}
Err(e) => {
error!("Failed to cleanup expired sessions: {}", e);
if let Some(id) = execution_id {
let _ = complete_job_execution(
&db,
id,
false,
Some(e.to_string()),
None,
)
.await;
}
}
}
})
})
.map_err(|e| CrabError::Other(format!("Failed to create cleanup sessions job: {}", e)))?;
sched
.add(cleanup_sessions_job)
.await
.map_err(|e| CrabError::Other(format!("Failed to add cleanup sessions job: {}", e)))?;
let db_clone = db.clone();
// Cleanup expired refresh tokens job - runs every hour at 30 minutes past
let cleanup_tokens_job = Job::new_async("0 30 * * * *", move |_uuid, _l| {
let db = db_clone.clone();
Box::pin(async move {
info!("Running cleanup_expired_refresh_tokens job");
let execution_id = start_job_execution(&db, "cleanup_expired_refresh_tokens")
.await
.ok();
match storage::cleanup_expired_refresh_tokens(&db).await {
Ok(count) => {
info!("Cleaned up {} expired refresh tokens", count);
if let Some(id) = execution_id {
let _ = complete_job_execution(&db, id, true, None, Some(count as i64))
.await;
}
}
Err(e) => {
error!("Failed to cleanup expired refresh tokens: {}", e);
if let Some(id) = execution_id {
let _ = complete_job_execution(
&db,
id,
false,
Some(e.to_string()),
None,
)
.await;
}
}
}
})
})
.map_err(|e| CrabError::Other(format!("Failed to create cleanup tokens job: {}", e)))?;
sched
.add(cleanup_tokens_job)
.await
.map_err(|e| CrabError::Other(format!("Failed to add cleanup tokens job: {}", e)))?;
// Start the scheduler
sched
.start()
.await
.map_err(|e| CrabError::Other(format!("Failed to start job scheduler: {}", e)))?;
info!("Job scheduler started with {} jobs", 2);
Ok(sched)
}
/// Record the start of a job execution
pub async fn start_job_execution(
db: &DatabaseConnection,
job_name: &str,
) -> Result<i64, CrabError> {
use entities::job_execution;
let now = Utc::now().timestamp();
let execution = job_execution::ActiveModel {
id: Set(0), // Will be auto-generated
job_name: Set(job_name.to_string()),
started_at: Set(now),
completed_at: Set(None),
success: Set(None),
error_message: Set(None),
records_processed: Set(None),
};
let result = execution.insert(db).await?;
Ok(result.id)
}
/// Record the completion of a job execution
pub async fn complete_job_execution(
db: &DatabaseConnection,
execution_id: i64,
success: bool,
error_message: Option<String>,
records_processed: Option<i64>,
) -> Result<(), CrabError> {
use entities::job_execution::{Column, Entity};
let now = Utc::now().timestamp();
if let Some(execution) = Entity::find()
.filter(Column::Id.eq(execution_id))
.one(db)
.await?
{
let mut active: entities::job_execution::ActiveModel = execution.into_active_model();
active.completed_at = Set(Some(now));
active.success = Set(Some(if success { 1 } else { 0 }));
active.error_message = Set(error_message);
active.records_processed = Set(records_processed);
active.update(db).await?;
}
Ok(())
}
/// Manually trigger a job by name (useful for admin API)
pub async fn trigger_job_manually(
db: &DatabaseConnection,
job_name: &str,
) -> Result<(), CrabError> {
info!("Manually triggering job: {}", job_name);
let execution_id = start_job_execution(db, job_name).await?;
let result = match job_name {
"cleanup_expired_sessions" => storage::cleanup_expired_sessions(db).await,
"cleanup_expired_refresh_tokens" => storage::cleanup_expired_refresh_tokens(db).await,
_ => {
return Err(CrabError::Other(format!("Unknown job name: {}", job_name)));
}
};
match result {
Ok(count) => {
info!("Manually triggered job {} completed: {} records", job_name, count);
complete_job_execution(db, execution_id, true, None, Some(count as i64)).await?;
}
Err(e) => {
error!("Manually triggered job {} failed: {}", job_name, e);
complete_job_execution(db, execution_id, false, Some(e.to_string()), None).await?;
}
}
Ok(())
}

View file

@ -1,8 +1,13 @@
mod admin_graphql;
mod admin_mutations;
mod entities;
mod errors; mod errors;
mod jobs;
mod jwks; mod jwks;
mod session; mod session;
mod settings; mod settings;
mod storage; mod storage;
mod user_sync;
mod web; mod web;
use clap::Parser; use clap::Parser;
@ -19,6 +24,19 @@ struct Cli {
/// Path to configuration file /// Path to configuration file
#[arg(short, long, default_value = "config.toml")] #[arg(short, long, default_value = "config.toml")]
config: String, config: String,
#[command(subcommand)]
command: Option<Command>,
}
#[derive(Parser, Debug)]
enum Command {
/// Sync users from a JSON file (idempotent)
SyncUsers {
/// Path to JSON file containing users
#[arg(short, long)]
file: String,
},
} }
#[tokio::main] #[tokio::main]
@ -36,14 +54,34 @@ async fn main() -> Result<()> {
// init storage (database) // init storage (database)
let db = storage::init(&settings.database).await?; let db = storage::init(&settings.database).await?;
// ensure test users exist // Handle subcommands
ensure_test_users(&db).await?; match cli.command {
Some(Command::SyncUsers { file }) => {
// Run user sync and exit
user_sync::sync_users_from_file(&db, &file).await?;
tracing::info!("User sync completed successfully");
return Ok(());
}
None => {
// Normal server startup
// ensure test users exist
ensure_test_users(&db).await?;
// init jwks (generate if missing) // init jwks (generate if missing)
let jwks_mgr = jwks::JwksManager::new(settings.keys.clone()).await?; let jwks_mgr = jwks::JwksManager::new(settings.keys.clone()).await?;
// build admin GraphQL schemas
let seaography_schema = admin_graphql::build_seaography_schema(db.clone());
let jobs_schema = admin_graphql::build_jobs_schema(db.clone());
// init and start background job scheduler
let _scheduler = jobs::init_scheduler(db.clone()).await?;
// start web server (includes both public and admin servers)
web::serve(settings, db, jwks_mgr, seaography_schema, jobs_schema).await?;
}
}
// start web server
web::serve(settings, db, jwks_mgr).await?;
Ok(()) Ok(())
} }

View file

@ -16,11 +16,23 @@ pub struct Server {
pub port: u16, pub port: u16,
/// If set, this is used as the issuer/public base URL, e.g., https://idp.example.com /// If set, this is used as the issuer/public base URL, e.g., https://idp.example.com
pub public_base_url: Option<String>, pub public_base_url: Option<String>,
/// Enable public user registration. If false, only admin API can create users.
#[serde(default = "default_allow_public_registration")]
pub allow_public_registration: bool,
/// Admin GraphQL API port (defaults to port + 1)
pub admin_port: Option<u16>,
}
fn default_allow_public_registration() -> bool {
false // Secure by default - registration disabled
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Database { pub struct Database {
/// SeaORM/SQLx connection string, e.g., sqlite://barycenter.db?mode=rwc /// SeaORM/SQLx connection string
/// Examples:
/// - SQLite: sqlite://barycenter.db?mode=rwc
/// - PostgreSQL: postgresql://user:password@localhost/barycenter
pub url: String, pub url: String,
} }
@ -48,6 +60,8 @@ impl Default for Server {
host: "0.0.0.0".to_string(), host: "0.0.0.0".to_string(),
port: 8080, port: 8080,
public_base_url: None, public_base_url: None,
allow_public_registration: false,
admin_port: None, // Defaults to port + 1 if not set
} }
} }
} }

View file

@ -1,9 +1,13 @@
use crate::entities;
use crate::errors::CrabError; use crate::errors::CrabError;
use crate::settings::Database as DbCfg; use crate::settings::Database as DbCfg;
use base64ct::Encoding; use base64ct::Encoding;
use chrono::Utc; use chrono::Utc;
use rand::RngCore; use rand::RngCore;
use sea_orm::{ConnectionTrait, Database, DatabaseConnection, DbBackend, Statement}; use sea_orm::{
ActiveModelTrait, ActiveValue::NotSet, ColumnTrait, ConnectionTrait, Database,
DatabaseConnection, DbBackend, EntityTrait, QueryFilter, Set, Statement,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
@ -83,37 +87,50 @@ pub struct RefreshToken {
pub parent_token: Option<String>, // For token rotation tracking pub parent_token: Option<String>, // For token rotation tracking
} }
fn detect_backend(url: &str) -> DbBackend {
if url.starts_with("postgres://") || url.starts_with("postgresql://") {
DbBackend::Postgres
} else {
DbBackend::Sqlite
}
}
pub async fn init(cfg: &DbCfg) -> Result<DatabaseConnection, CrabError> { pub async fn init(cfg: &DbCfg) -> Result<DatabaseConnection, CrabError> {
let db = Database::connect(&cfg.url).await?; let db = Database::connect(&cfg.url).await?;
let backend = detect_backend(&cfg.url);
// bootstrap schema // bootstrap schema
db.execute(Statement::from_string( // Enable foreign keys for SQLite only
DbBackend::Sqlite, if backend == DbBackend::Sqlite {
"PRAGMA foreign_keys = ON", db.execute(Statement::from_string(
)) DbBackend::Sqlite,
.await?; "PRAGMA foreign_keys = ON",
))
.await?;
}
db.execute(Statement::from_string( db.execute(Statement::from_string(
DbBackend::Sqlite, backend,
r#" r#"
CREATE TABLE IF NOT EXISTS clients ( CREATE TABLE IF NOT EXISTS clients (
client_id TEXT PRIMARY KEY, client_id TEXT PRIMARY KEY,
client_secret TEXT NOT NULL, client_secret TEXT NOT NULL,
client_name TEXT, client_name TEXT,
redirect_uris TEXT NOT NULL, redirect_uris TEXT NOT NULL,
created_at INTEGER NOT NULL created_at BIGINT NOT NULL
) )
"#, "#,
)) ))
.await?; .await?;
db.execute(Statement::from_string( db.execute(Statement::from_string(
DbBackend::Sqlite, backend,
r#" r#"
CREATE TABLE IF NOT EXISTS properties ( CREATE TABLE IF NOT EXISTS properties (
owner TEXT NOT NULL, owner TEXT NOT NULL,
key TEXT NOT NULL, key TEXT NOT NULL,
value TEXT NOT NULL, value TEXT NOT NULL,
updated_at INTEGER NOT NULL, updated_at BIGINT NOT NULL,
PRIMARY KEY(owner, key) PRIMARY KEY(owner, key)
) )
"#, "#,
@ -121,7 +138,7 @@ pub async fn init(cfg: &DbCfg) -> Result<DatabaseConnection, CrabError> {
.await?; .await?;
db.execute(Statement::from_string( db.execute(Statement::from_string(
DbBackend::Sqlite, backend,
r#" r#"
CREATE TABLE IF NOT EXISTS auth_codes ( CREATE TABLE IF NOT EXISTS auth_codes (
code TEXT PRIMARY KEY, code TEXT PRIMARY KEY,
@ -132,56 +149,56 @@ pub async fn init(cfg: &DbCfg) -> Result<DatabaseConnection, CrabError> {
nonce TEXT, nonce TEXT,
code_challenge TEXT NOT NULL, code_challenge TEXT NOT NULL,
code_challenge_method TEXT NOT NULL, code_challenge_method TEXT NOT NULL,
created_at INTEGER NOT NULL, created_at BIGINT NOT NULL,
expires_at INTEGER NOT NULL, expires_at BIGINT NOT NULL,
consumed INTEGER NOT NULL DEFAULT 0, consumed BIGINT NOT NULL DEFAULT 0,
auth_time INTEGER auth_time BIGINT
) )
"#, "#,
)) ))
.await?; .await?;
db.execute(Statement::from_string( db.execute(Statement::from_string(
DbBackend::Sqlite, backend,
r#" r#"
CREATE TABLE IF NOT EXISTS access_tokens ( CREATE TABLE IF NOT EXISTS access_tokens (
token TEXT PRIMARY KEY, token TEXT PRIMARY KEY,
client_id TEXT NOT NULL, client_id TEXT NOT NULL,
subject TEXT NOT NULL, subject TEXT NOT NULL,
scope TEXT NOT NULL, scope TEXT NOT NULL,
created_at INTEGER NOT NULL, created_at BIGINT NOT NULL,
expires_at INTEGER NOT NULL, expires_at BIGINT NOT NULL,
revoked INTEGER NOT NULL DEFAULT 0 revoked BIGINT NOT NULL DEFAULT 0
) )
"#, "#,
)) ))
.await?; .await?;
db.execute(Statement::from_string( db.execute(Statement::from_string(
DbBackend::Sqlite, backend,
r#" r#"
CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS users (
subject TEXT PRIMARY KEY, subject TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE, username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL, password_hash TEXT NOT NULL,
email TEXT, email TEXT,
email_verified INTEGER NOT NULL DEFAULT 0, email_verified BIGINT NOT NULL DEFAULT 0,
created_at INTEGER NOT NULL, created_at BIGINT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1 enabled BIGINT NOT NULL DEFAULT 1
) )
"#, "#,
)) ))
.await?; .await?;
db.execute(Statement::from_string( db.execute(Statement::from_string(
DbBackend::Sqlite, backend,
r#" r#"
CREATE TABLE IF NOT EXISTS sessions ( CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY, session_id TEXT PRIMARY KEY,
subject TEXT NOT NULL, subject TEXT NOT NULL,
auth_time INTEGER NOT NULL, auth_time BIGINT NOT NULL,
created_at INTEGER NOT NULL, created_at BIGINT NOT NULL,
expires_at INTEGER NOT NULL, expires_at BIGINT NOT NULL,
user_agent TEXT, user_agent TEXT,
ip_address TEXT ip_address TEXT
) )
@ -190,22 +207,22 @@ pub async fn init(cfg: &DbCfg) -> Result<DatabaseConnection, CrabError> {
.await?; .await?;
db.execute(Statement::from_string( db.execute(Statement::from_string(
DbBackend::Sqlite, backend,
"CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at)", "CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at)",
)) ))
.await?; .await?;
db.execute(Statement::from_string( db.execute(Statement::from_string(
DbBackend::Sqlite, backend,
r#" r#"
CREATE TABLE IF NOT EXISTS refresh_tokens ( CREATE TABLE IF NOT EXISTS refresh_tokens (
token TEXT PRIMARY KEY, token TEXT PRIMARY KEY,
client_id TEXT NOT NULL, client_id TEXT NOT NULL,
subject TEXT NOT NULL, subject TEXT NOT NULL,
scope TEXT NOT NULL, scope TEXT NOT NULL,
created_at INTEGER NOT NULL, created_at BIGINT NOT NULL,
expires_at INTEGER NOT NULL, expires_at BIGINT NOT NULL,
revoked INTEGER NOT NULL DEFAULT 0, revoked BIGINT NOT NULL DEFAULT 0,
parent_token TEXT parent_token TEXT
) )
"#, "#,
@ -213,11 +230,41 @@ pub async fn init(cfg: &DbCfg) -> Result<DatabaseConnection, CrabError> {
.await?; .await?;
db.execute(Statement::from_string( db.execute(Statement::from_string(
DbBackend::Sqlite, backend,
"CREATE INDEX IF NOT EXISTS idx_refresh_tokens_expires ON refresh_tokens(expires_at)", "CREATE INDEX IF NOT EXISTS idx_refresh_tokens_expires ON refresh_tokens(expires_at)",
)) ))
.await?; .await?;
// Job executions table for tracking background job runs
let id_type = match backend {
DbBackend::Postgres => "BIGSERIAL PRIMARY KEY",
_ => "INTEGER PRIMARY KEY AUTOINCREMENT",
};
db.execute(Statement::from_string(
backend,
format!(
r#"
CREATE TABLE IF NOT EXISTS job_executions (
id {},
job_name TEXT NOT NULL,
started_at BIGINT NOT NULL,
completed_at BIGINT,
success BIGINT,
error_message TEXT,
records_processed BIGINT
)
"#,
id_type
),
))
.await?;
db.execute(Statement::from_string(
backend,
"CREATE INDEX IF NOT EXISTS idx_job_executions_started ON job_executions(started_at)",
))
.await?;
Ok(db) Ok(db)
} }
@ -227,19 +274,15 @@ pub async fn create_client(db: &DatabaseConnection, input: NewClient) -> Result<
let created_at = Utc::now().timestamp(); let created_at = Utc::now().timestamp();
let redirect_uris_json = serde_json::to_string(&input.redirect_uris)?; let redirect_uris_json = serde_json::to_string(&input.redirect_uris)?;
db.execute(Statement::from_sql_and_values( let client = entities::client::ActiveModel {
DbBackend::Sqlite, client_id: Set(client_id.clone()),
r#"INSERT INTO clients (client_id, client_secret, client_name, redirect_uris, created_at) client_secret: Set(client_secret.clone()),
VALUES (?, ?, ?, ?, ?)"#, client_name: Set(input.client_name.clone()),
[ redirect_uris: Set(redirect_uris_json),
client_id.clone().into(), created_at: Set(created_at),
client_secret.clone().into(), };
input.client_name.clone().into(),
redirect_uris_json.into(), client.insert(db).await?;
created_at.into(),
],
))
.await?;
Ok(Client { Ok(Client {
client_id, client_id,
@ -255,16 +298,15 @@ pub async fn get_property(
owner: &str, owner: &str,
key: &str, key: &str,
) -> Result<Option<Value>, CrabError> { ) -> Result<Option<Value>, CrabError> {
if let Some(row) = db use entities::property::{Column, Entity};
.query_one(Statement::from_sql_and_values(
DbBackend::Sqlite, if let Some(model) = Entity::find()
"SELECT value FROM properties WHERE owner = ? AND key = ?", .filter(Column::Owner.eq(owner))
[owner.into(), key.into()], .filter(Column::Key.eq(key))
)) .one(db)
.await? .await?
{ {
let value_str: String = row.try_get("", "value").unwrap_or_default(); let json: Value = serde_json::from_str(&model.value)?;
let json: Value = serde_json::from_str(&value_str)?;
Ok(Some(json)) Ok(Some(json))
} else { } else {
Ok(None) Ok(None)
@ -277,16 +319,28 @@ pub async fn set_property(
key: &str, key: &str,
value: &Value, value: &Value,
) -> Result<(), CrabError> { ) -> Result<(), CrabError> {
use entities::property::{Column, Entity};
use sea_orm::sea_query::OnConflict;
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
let json = serde_json::to_string(value)?; let json = serde_json::to_string(value)?;
db.execute(Statement::from_sql_and_values(
DbBackend::Sqlite, let property = entities::property::ActiveModel {
r#"INSERT INTO properties (owner, key, value, updated_at) owner: Set(owner.to_string()),
VALUES (?, ?, ?, ?) key: Set(key.to_string()),
ON CONFLICT(owner, key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at"#, value: Set(json.clone()),
[owner.into(), key.into(), json.into(), now.into()], updated_at: Set(now),
)) };
.await?;
Entity::insert(property)
.on_conflict(
OnConflict::columns([Column::Owner, Column::Key])
.update_columns([Column::Value, Column::UpdatedAt])
.to_owned(),
)
.exec(db)
.await?;
Ok(()) Ok(())
} }
@ -294,21 +348,21 @@ pub async fn get_client(
db: &DatabaseConnection, db: &DatabaseConnection,
client_id: &str, client_id: &str,
) -> Result<Option<Client>, CrabError> { ) -> Result<Option<Client>, CrabError> {
if let Some(row) = db use entities::client::{Column, Entity};
.query_one(Statement::from_sql_and_values(
DbBackend::Sqlite, if let Some(model) = Entity::find()
r#"SELECT client_id, client_secret, client_name, redirect_uris, created_at FROM clients WHERE client_id = ?"#, .filter(Column::ClientId.eq(client_id))
[client_id.into()], .one(db)
))
.await? .await?
{ {
let client_id: String = row.try_get("", "client_id").unwrap_or_default(); let redirect_uris: Vec<String> = serde_json::from_str(&model.redirect_uris)?;
let client_secret: String = row.try_get("", "client_secret").unwrap_or_default(); Ok(Some(Client {
let client_name: Option<String> = row.try_get("", "client_name").ok(); client_id: model.client_id,
let redirect_uris_json: String = row.try_get("", "redirect_uris").unwrap_or_default(); client_secret: model.client_secret,
let redirect_uris: Vec<String> = serde_json::from_str(&redirect_uris_json).unwrap_or_default(); client_name: model.client_name,
let created_at: i64 = row.try_get("", "created_at").unwrap_or_default(); redirect_uris,
Ok(Some(Client { client_id, client_secret, client_name, redirect_uris, created_at })) created_at: model.created_at,
}))
} else { } else {
Ok(None) Ok(None)
} }
@ -329,25 +383,24 @@ pub async fn issue_auth_code(
let code = random_id(); let code = random_id();
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
let expires_at = now + ttl_secs; let expires_at = now + ttl_secs;
db.execute(Statement::from_sql_and_values(
DbBackend::Sqlite, let auth_code = entities::auth_code::ActiveModel {
r#"INSERT INTO auth_codes (code, client_id, redirect_uri, scope, subject, nonce, code_challenge, code_challenge_method, created_at, expires_at, consumed, auth_time) code: Set(code.clone()),
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?)"#, client_id: Set(client_id.to_string()),
[ redirect_uri: Set(redirect_uri.to_string()),
code.clone().into(), scope: Set(scope.to_string()),
client_id.into(), subject: Set(subject.to_string()),
redirect_uri.into(), nonce: Set(nonce.clone()),
scope.into(), code_challenge: Set(code_challenge.to_string()),
subject.into(), code_challenge_method: Set(code_challenge_method.to_string()),
nonce.clone().into(), created_at: Set(now),
code_challenge.into(), expires_at: Set(expires_at),
code_challenge_method.into(), consumed: Set(0),
now.into(), auth_time: Set(auth_time),
expires_at.into(), };
auth_time.into(),
], auth_code.insert(db).await?;
))
.await?;
Ok(AuthCode { Ok(AuthCode {
code, code,
client_id: client_id.to_string(), client_id: client_id.to_string(),
@ -368,42 +421,37 @@ pub async fn consume_auth_code(
db: &DatabaseConnection, db: &DatabaseConnection,
code: &str, code: &str,
) -> Result<Option<AuthCode>, CrabError> { ) -> Result<Option<AuthCode>, CrabError> {
if let Some(row) = db use entities::auth_code::{Column, Entity};
.query_one(Statement::from_sql_and_values(
DbBackend::Sqlite, if let Some(model) = Entity::find()
r#"SELECT code, client_id, redirect_uri, scope, subject, nonce, code_challenge, code_challenge_method, created_at, expires_at, consumed, auth_time .filter(Column::Code.eq(code))
FROM auth_codes WHERE code = ?"#, .one(db)
[code.into()],
))
.await? .await?
{ {
let consumed: i64 = row.try_get("", "consumed").unwrap_or_default();
let expires_at: i64 = row.try_get("", "expires_at").unwrap_or_default();
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
if consumed != 0 || now > expires_at { if model.consumed != 0 || now > model.expires_at {
return Ok(None); return Ok(None);
} }
// Mark as consumed // Mark as consumed
db.execute(Statement::from_sql_and_values( let mut active_model: entities::auth_code::ActiveModel = model.clone().into();
DbBackend::Sqlite, active_model.consumed = Set(1);
r#"UPDATE auth_codes SET consumed = ? WHERE code = ?"#, active_model.update(db).await?;
[1.into(), code.into()],
))
.await?;
let code_val: String = row.try_get("", "code").unwrap_or_default(); Ok(Some(AuthCode {
let client_id: String = row.try_get("", "client_id").unwrap_or_default(); code: model.code,
let redirect_uri: String = row.try_get("", "redirect_uri").unwrap_or_default(); client_id: model.client_id,
let scope: String = row.try_get("", "scope").unwrap_or_default(); redirect_uri: model.redirect_uri,
let subject: String = row.try_get("", "subject").unwrap_or_default(); scope: model.scope,
let nonce: Option<String> = row.try_get("", "nonce").ok(); subject: model.subject,
let code_challenge: String = row.try_get("", "code_challenge").unwrap_or_default(); nonce: model.nonce,
let code_challenge_method: String = row.try_get("", "code_challenge_method").unwrap_or_default(); code_challenge: model.code_challenge,
let created_at: i64 = row.try_get("", "created_at").unwrap_or_default(); code_challenge_method: model.code_challenge_method,
let expires_at: i64 = row.try_get("", "expires_at").unwrap_or_default(); created_at: model.created_at,
let auth_time: Option<i64> = row.try_get("", "auth_time").ok(); expires_at: model.expires_at,
Ok(Some(AuthCode { code: code_val, client_id, redirect_uri, scope, subject, nonce, code_challenge, code_challenge_method, created_at, expires_at, consumed: 1, auth_time })) consumed: 1,
auth_time: model.auth_time,
}))
} else { } else {
Ok(None) Ok(None)
} }
@ -419,13 +467,19 @@ pub async fn issue_access_token(
let token = random_id(); let token = random_id();
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
let expires_at = now + ttl_secs; let expires_at = now + ttl_secs;
db.execute(Statement::from_sql_and_values(
DbBackend::Sqlite, let access_token = entities::access_token::ActiveModel {
r#"INSERT INTO access_tokens (token, client_id, subject, scope, created_at, expires_at, revoked) token: Set(token.clone()),
VALUES (?, ?, ?, ?, ?, ?, 0)"#, client_id: Set(client_id.to_string()),
[token.clone().into(), client_id.into(), subject.into(), scope.into(), now.into(), expires_at.into()], subject: Set(subject.to_string()),
)) scope: Set(scope.to_string()),
.await?; created_at: Set(now),
expires_at: Set(expires_at),
revoked: Set(0),
};
access_token.insert(db).await?;
Ok(AccessToken { Ok(AccessToken {
token, token,
client_id: client_id.to_string(), client_id: client_id.to_string(),
@ -441,24 +495,27 @@ pub async fn get_access_token(
db: &DatabaseConnection, db: &DatabaseConnection,
token: &str, token: &str,
) -> Result<Option<AccessToken>, CrabError> { ) -> Result<Option<AccessToken>, CrabError> {
if let Some(row) = db use entities::access_token::{Column, Entity};
.query_one(Statement::from_sql_and_values(
DbBackend::Sqlite, if let Some(model) = Entity::find()
r#"SELECT token, client_id, subject, scope, created_at, expires_at, revoked FROM access_tokens WHERE token = ?"#, .filter(Column::Token.eq(token))
[token.into()], .one(db)
))
.await? .await?
{ {
let revoked: i64 = row.try_get("", "revoked").unwrap_or_default();
let expires_at: i64 = row.try_get("", "expires_at").unwrap_or_default();
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
if revoked != 0 || now > expires_at { return Ok(None); } if model.revoked != 0 || now > model.expires_at {
let token: String = row.try_get("", "token").unwrap_or_default(); return Ok(None);
let client_id: String = row.try_get("", "client_id").unwrap_or_default(); }
let subject: String = row.try_get("", "subject").unwrap_or_default();
let scope: String = row.try_get("", "scope").unwrap_or_default(); Ok(Some(AccessToken {
let created_at: i64 = row.try_get("", "created_at").unwrap_or_default(); token: model.token,
Ok(Some(AccessToken { token, client_id, subject, scope, created_at, expires_at, revoked })) client_id: model.client_id,
subject: model.subject,
scope: model.scope,
created_at: model.created_at,
expires_at: model.expires_at,
revoked: model.revoked,
}))
} else { } else {
Ok(None) Ok(None)
} }
@ -492,19 +549,17 @@ pub async fn create_user(
.map_err(|e| CrabError::Other(format!("Password hashing failed: {}", e)))? .map_err(|e| CrabError::Other(format!("Password hashing failed: {}", e)))?
.to_string(); .to_string();
db.execute(Statement::from_sql_and_values( let user = entities::user::ActiveModel {
DbBackend::Sqlite, subject: Set(subject.clone()),
r#"INSERT INTO users (subject, username, password_hash, email, email_verified, created_at, enabled) username: Set(username.to_string()),
VALUES (?, ?, ?, ?, 0, ?, 1)"#, password_hash: Set(password_hash.clone()),
[ email: Set(email.clone()),
subject.clone().into(), email_verified: Set(0),
username.into(), created_at: Set(created_at),
password_hash.clone().into(), enabled: Set(1),
email.clone().into(), };
created_at.into(),
], user.insert(db).await?;
))
.await?;
Ok(User { Ok(User {
subject, subject,
@ -521,31 +576,21 @@ pub async fn get_user_by_username(
db: &DatabaseConnection, db: &DatabaseConnection,
username: &str, username: &str,
) -> Result<Option<User>, CrabError> { ) -> Result<Option<User>, CrabError> {
if let Some(row) = db use entities::user::{Column, Entity};
.query_one(Statement::from_sql_and_values(
DbBackend::Sqlite, if let Some(model) = Entity::find()
r#"SELECT subject, username, password_hash, email, email_verified, created_at, enabled .filter(Column::Username.eq(username))
FROM users WHERE username = ?"#, .one(db)
[username.into()],
))
.await? .await?
{ {
let subject: String = row.try_get("", "subject").unwrap_or_default();
let username: String = row.try_get("", "username").unwrap_or_default();
let password_hash: String = row.try_get("", "password_hash").unwrap_or_default();
let email: Option<String> = row.try_get("", "email").ok();
let email_verified: i64 = row.try_get("", "email_verified").unwrap_or_default();
let created_at: i64 = row.try_get("", "created_at").unwrap_or_default();
let enabled: i64 = row.try_get("", "enabled").unwrap_or_default();
Ok(Some(User { Ok(Some(User {
subject, subject: model.subject,
username, username: model.username,
password_hash, password_hash: model.password_hash,
email, email: model.email,
email_verified, email_verified: model.email_verified,
created_at, created_at: model.created_at,
enabled, enabled: model.enabled,
})) }))
} else { } else {
Ok(None) Ok(None)
@ -577,6 +622,54 @@ pub async fn verify_user_password(
} }
} }
/// Update user enabled and email_verified flags
pub async fn update_user(
db: &DatabaseConnection,
username: &str,
enabled: bool,
email_verified: bool,
) -> Result<(), CrabError> {
use entities::user::{Column, Entity};
// Find the user
let user = Entity::find()
.filter(Column::Username.eq(username))
.one(db)
.await?
.ok_or_else(|| CrabError::Other(format!("User not found: {}", username)))?;
// Update the user
let mut active: entities::user::ActiveModel = user.into();
active.enabled = Set(if enabled { 1 } else { 0 });
active.email_verified = Set(if email_verified { 1 } else { 0 });
active.update(db).await?;
Ok(())
}
/// Update user email
pub async fn update_user_email(
db: &DatabaseConnection,
username: &str,
email: Option<String>,
) -> Result<(), CrabError> {
use entities::user::{Column, Entity};
// Find the user
let user = Entity::find()
.filter(Column::Username.eq(username))
.one(db)
.await?
.ok_or_else(|| CrabError::Other(format!("User not found: {}", username)))?;
// Update the user
let mut active: entities::user::ActiveModel = user.into();
active.email = Set(email);
active.update(db).await?;
Ok(())
}
// Session management functions // Session management functions
pub async fn create_session( pub async fn create_session(
@ -590,21 +683,17 @@ pub async fn create_session(
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
let expires_at = now + ttl_secs; let expires_at = now + ttl_secs;
db.execute(Statement::from_sql_and_values( let session = entities::session::ActiveModel {
DbBackend::Sqlite, session_id: Set(session_id.clone()),
r#"INSERT INTO sessions (session_id, subject, auth_time, created_at, expires_at, user_agent, ip_address) subject: Set(subject.to_string()),
VALUES (?, ?, ?, ?, ?, ?, ?)"#, auth_time: Set(now),
[ created_at: Set(now),
session_id.clone().into(), expires_at: Set(expires_at),
subject.into(), user_agent: Set(user_agent.clone()),
now.into(), ip_address: Set(ip_address.clone()),
now.into(), };
expires_at.into(),
user_agent.clone().into(), session.insert(db).await?;
ip_address.clone().into(),
],
))
.await?;
Ok(Session { Ok(Session {
session_id, session_id,
@ -621,37 +710,27 @@ pub async fn get_session(
db: &DatabaseConnection, db: &DatabaseConnection,
session_id: &str, session_id: &str,
) -> Result<Option<Session>, CrabError> { ) -> Result<Option<Session>, CrabError> {
if let Some(row) = db use entities::session::{Column, Entity};
.query_one(Statement::from_sql_and_values(
DbBackend::Sqlite, if let Some(model) = Entity::find()
r#"SELECT session_id, subject, auth_time, created_at, expires_at, user_agent, ip_address .filter(Column::SessionId.eq(session_id))
FROM sessions WHERE session_id = ?"#, .one(db)
[session_id.into()],
))
.await? .await?
{ {
let session_id: String = row.try_get("", "session_id").unwrap_or_default();
let subject: String = row.try_get("", "subject").unwrap_or_default();
let auth_time: i64 = row.try_get("", "auth_time").unwrap_or_default();
let created_at: i64 = row.try_get("", "created_at").unwrap_or_default();
let expires_at: i64 = row.try_get("", "expires_at").unwrap_or_default();
let user_agent: Option<String> = row.try_get("", "user_agent").ok();
let ip_address: Option<String> = row.try_get("", "ip_address").ok();
// Check if session is expired // Check if session is expired
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
if now > expires_at { if now > model.expires_at {
return Ok(None); return Ok(None);
} }
Ok(Some(Session { Ok(Some(Session {
session_id, session_id: model.session_id,
subject, subject: model.subject,
auth_time, auth_time: model.auth_time,
created_at, created_at: model.created_at,
expires_at, expires_at: model.expires_at,
user_agent, user_agent: model.user_agent,
ip_address, ip_address: model.ip_address,
})) }))
} else { } else {
Ok(None) Ok(None)
@ -659,25 +738,26 @@ pub async fn get_session(
} }
pub async fn delete_session(db: &DatabaseConnection, session_id: &str) -> Result<(), CrabError> { pub async fn delete_session(db: &DatabaseConnection, session_id: &str) -> Result<(), CrabError> {
db.execute(Statement::from_sql_and_values( use entities::session::{Column, Entity};
DbBackend::Sqlite,
"DELETE FROM sessions WHERE session_id = ?", Entity::delete_many()
[session_id.into()], .filter(Column::SessionId.eq(session_id))
)) .exec(db)
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn cleanup_expired_sessions(db: &DatabaseConnection) -> Result<u64, CrabError> { pub async fn cleanup_expired_sessions(db: &DatabaseConnection) -> Result<u64, CrabError> {
use entities::session::{Column, Entity};
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
let result = db let result = Entity::delete_many()
.execute(Statement::from_sql_and_values( .filter(Column::ExpiresAt.lt(now))
DbBackend::Sqlite, .exec(db)
"DELETE FROM sessions WHERE expires_at < ?",
[now.into()],
))
.await?; .await?;
Ok(result.rows_affected())
Ok(result.rows_affected)
} }
// Refresh Token Functions // Refresh Token Functions
@ -694,21 +774,18 @@ pub async fn issue_refresh_token(
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
let expires_at = now + ttl_secs; let expires_at = now + ttl_secs;
db.execute(Statement::from_sql_and_values( let refresh_token = entities::refresh_token::ActiveModel {
DbBackend::Sqlite, token: Set(token.clone()),
r#"INSERT INTO refresh_tokens (token, client_id, subject, scope, created_at, expires_at, revoked, parent_token) client_id: Set(client_id.to_string()),
VALUES (?, ?, ?, ?, ?, ?, 0, ?)"#, subject: Set(subject.to_string()),
[ scope: Set(scope.to_string()),
token.clone().into(), created_at: Set(now),
client_id.into(), expires_at: Set(expires_at),
subject.into(), revoked: Set(0),
scope.into(), parent_token: Set(parent_token.clone()),
now.into(), };
expires_at.into(),
parent_token.clone().into(), refresh_token.insert(db).await?;
],
))
.await?;
Ok(RefreshToken { Ok(RefreshToken {
token, token,
@ -726,40 +803,28 @@ pub async fn get_refresh_token(
db: &DatabaseConnection, db: &DatabaseConnection,
token: &str, token: &str,
) -> Result<Option<RefreshToken>, CrabError> { ) -> Result<Option<RefreshToken>, CrabError> {
let result = db use entities::refresh_token::{Column, Entity};
.query_one(Statement::from_sql_and_values(
DbBackend::Sqlite,
r#"SELECT token, client_id, subject, scope, created_at, expires_at, revoked, parent_token
FROM refresh_tokens WHERE token = ?"#,
[token.into()],
))
.await?;
if let Some(row) = result {
let token: String = row.try_get("", "token")?;
let client_id: String = row.try_get("", "client_id")?;
let subject: String = row.try_get("", "subject")?;
let scope: String = row.try_get("", "scope")?;
let created_at: i64 = row.try_get("", "created_at")?;
let expires_at: i64 = row.try_get("", "expires_at")?;
let revoked: i64 = row.try_get("", "revoked")?;
let parent_token: Option<String> = row.try_get("", "parent_token").ok();
if let Some(model) = Entity::find()
.filter(Column::Token.eq(token))
.one(db)
.await?
{
// Check if token is expired or revoked // Check if token is expired or revoked
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
if revoked != 0 || now > expires_at { if model.revoked != 0 || now > model.expires_at {
return Ok(None); return Ok(None);
} }
Ok(Some(RefreshToken { Ok(Some(RefreshToken {
token, token: model.token,
client_id, client_id: model.client_id,
subject, subject: model.subject,
scope, scope: model.scope,
created_at, created_at: model.created_at,
expires_at, expires_at: model.expires_at,
revoked, revoked: model.revoked,
parent_token, parent_token: model.parent_token,
})) }))
} else { } else {
Ok(None) Ok(None)
@ -767,12 +832,19 @@ pub async fn get_refresh_token(
} }
pub async fn revoke_refresh_token(db: &DatabaseConnection, token: &str) -> Result<(), CrabError> { pub async fn revoke_refresh_token(db: &DatabaseConnection, token: &str) -> Result<(), CrabError> {
db.execute(Statement::from_sql_and_values( use entities::refresh_token::{Column, Entity};
DbBackend::Sqlite,
"UPDATE refresh_tokens SET revoked = 1 WHERE token = ?", // Find the token and update it
[token.into()], if let Some(model) = Entity::find()
)) .filter(Column::Token.eq(token))
.await?; .one(db)
.await?
{
let mut active_model: entities::refresh_token::ActiveModel = model.into();
active_model.revoked = Set(1);
active_model.update(db).await?;
}
Ok(()) Ok(())
} }
@ -800,13 +872,13 @@ pub async fn rotate_refresh_token(
} }
pub async fn cleanup_expired_refresh_tokens(db: &DatabaseConnection) -> Result<u64, CrabError> { pub async fn cleanup_expired_refresh_tokens(db: &DatabaseConnection) -> Result<u64, CrabError> {
use entities::refresh_token::{Column, Entity};
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
let result = db let result = Entity::delete_many()
.execute(Statement::from_sql_and_values( .filter(Column::ExpiresAt.lt(now))
DbBackend::Sqlite, .exec(db)
"DELETE FROM refresh_tokens WHERE expires_at < ?",
[now.into()],
))
.await?; .await?;
Ok(result.rows_affected())
Ok(result.rows_affected)
} }

184
src/user_sync.rs Normal file
View file

@ -0,0 +1,184 @@
use crate::errors::CrabError;
use crate::storage;
use miette::{IntoDiagnostic, Result};
use sea_orm::DatabaseConnection;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
/// User definition from JSON file
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserDefinition {
/// Username (unique identifier)
pub username: String,
/// User email
#[serde(default)]
pub email: Option<String>,
/// Plain text password (will be hashed)
pub password: String,
/// Whether the user account is enabled
#[serde(default = "default_true")]
pub enabled: bool,
/// Whether the email is verified
#[serde(default)]
pub email_verified: bool,
/// Custom properties to attach to the user
#[serde(default)]
pub properties: HashMap<String, Value>,
}
fn default_true() -> bool {
true
}
/// Root structure of the users JSON file
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UsersFile {
pub users: Vec<UserDefinition>,
}
/// Sync users from a JSON file to the database (idempotent)
pub async fn sync_users_from_file(db: &DatabaseConnection, file_path: &str) -> Result<()> {
tracing::info!("Loading users from {}", file_path);
// Read and parse JSON file
let content = fs::read_to_string(file_path)
.into_diagnostic()
.map_err(|e| {
miette::miette!(
"Failed to read users file at '{}': {}",
file_path,
e
)
})?;
let users_file: UsersFile = serde_json::from_str(&content)
.into_diagnostic()
.map_err(|e| {
miette::miette!(
"Failed to parse users JSON file: {}\n\nExpected format:\n{{\n \"users\": [\n {{\n \"username\": \"alice\",\n \"email\": \"alice@example.com\",\n \"password\": \"secure-password\",\n \"enabled\": true,\n \"email_verified\": false,\n \"properties\": {{\n \"department\": \"Engineering\"\n }}\n }}\n ]\n}}",
e
)
})?;
tracing::info!("Found {} user(s) in file", users_file.users.len());
let mut created = 0;
let mut updated = 0;
let mut unchanged = 0;
for user_def in users_file.users {
match sync_user(db, &user_def).await? {
SyncResult::Created => created += 1,
SyncResult::Updated => updated += 1,
SyncResult::Unchanged => unchanged += 1,
}
}
tracing::info!(
"User sync complete: {} created, {} updated, {} unchanged",
created,
updated,
unchanged
);
Ok(())
}
#[derive(Debug)]
enum SyncResult {
Created,
Updated,
Unchanged,
}
/// Sync a single user (idempotent)
async fn sync_user(db: &DatabaseConnection, user_def: &UserDefinition) -> Result<SyncResult> {
// Check if user exists
let existing = storage::get_user_by_username(db, &user_def.username)
.await
.into_diagnostic()?;
let result = match existing {
None => {
// Create new user
tracing::info!("Creating user: {}", user_def.username);
storage::create_user(
db,
&user_def.username,
&user_def.password,
user_def.email.clone(),
)
.await
.into_diagnostic()?;
// Update enabled and email_verified flags if needed
if !user_def.enabled || user_def.email_verified {
storage::update_user(
db,
&user_def.username,
user_def.enabled,
user_def.email_verified,
)
.await
.into_diagnostic()?;
}
SyncResult::Created
}
Some(existing_user) => {
// User exists - check if update is needed
let enabled_matches = (existing_user.enabled == 1) == user_def.enabled;
let email_verified_matches =
(existing_user.email_verified == 1) == user_def.email_verified;
let email_matches = existing_user.email == user_def.email;
if !enabled_matches || !email_verified_matches || !email_matches {
tracing::info!("Updating user: {}", user_def.username);
storage::update_user(
db,
&user_def.username,
user_def.enabled,
user_def.email_verified,
)
.await
.into_diagnostic()?;
// Update email if it changed
if !email_matches {
storage::update_user_email(db, &user_def.username, user_def.email.clone())
.await
.into_diagnostic()?;
}
SyncResult::Updated
} else {
SyncResult::Unchanged
}
}
};
// Sync properties
for (key, value) in &user_def.properties {
// Get the user's subject to use as owner for properties
let user = storage::get_user_by_username(db, &user_def.username)
.await
.into_diagnostic()?
.ok_or_else(|| miette::miette!("User not found after creation: {}", user_def.username))?;
storage::set_property(db, &user.subject, key, value)
.await
.into_diagnostic()?;
}
if !user_def.properties.is_empty() {
tracing::debug!(
"Synced {} properties for user {}",
user_def.properties.len(),
user_def.username
);
}
Ok(result)
}

View file

@ -81,6 +81,12 @@ pub async fn serve(
settings: Settings, settings: Settings,
db: DatabaseConnection, db: DatabaseConnection,
jwks: JwksManager, jwks: JwksManager,
seaography_schema: async_graphql::dynamic::Schema,
jobs_schema: async_graphql::Schema<
crate::admin_mutations::AdminQuery,
crate::admin_mutations::AdminMutation,
async_graphql::EmptySubscription,
>,
) -> miette::Result<()> { ) -> miette::Result<()> {
let state = AppState { let state = AppState {
settings: Arc::new(settings), settings: Arc::new(settings),
@ -95,30 +101,66 @@ pub async fn serve(
// - Login endpoint: 5 attempts/min per IP // - Login endpoint: 5 attempts/min per IP
// - Authorize endpoint: 20 req/min per IP // - Authorize endpoint: 20 req/min per IP
let router = Router::new() let mut router = Router::new()
.route("/.well-known/openid-configuration", get(discovery)) .route("/.well-known/openid-configuration", get(discovery))
.route("/.well-known/jwks.json", get(jwks_handler)) .route("/.well-known/jwks.json", get(jwks_handler))
.route("/connect/register", post(register_client)) .route("/connect/register", post(register_client))
.route("/properties/{owner}/{key}", get(get_property)) .route("/properties/{owner}/{key}", get(get_property).put(set_property))
.route("/federation/trust-anchors", get(trust_anchors)) .route("/federation/trust-anchors", get(trust_anchors))
.route("/register", post(register_user))
.route("/login", get(login_page).post(login_submit)) .route("/login", get(login_page).post(login_submit))
.route("/logout", get(logout)) .route("/logout", get(logout))
.route("/authorize", get(authorize)) .route("/authorize", get(authorize))
.route("/token", post(token)) .route("/token", post(token))
.route("/userinfo", get(userinfo)) .route("/userinfo", get(userinfo));
// Conditionally add public registration route
if state.settings.server.allow_public_registration {
tracing::info!("Public user registration is ENABLED");
router = router.route("/register", post(register_user));
} else {
tracing::info!("Public user registration is DISABLED - use admin API");
}
let router = router
.layer(middleware::from_fn(security_headers)) .layer(middleware::from_fn(security_headers))
.with_state(state.clone()); .with_state(state.clone());
let addr: SocketAddr = format!( let public_addr: SocketAddr = format!(
"{}:{}", "{}:{}",
state.settings.server.host, state.settings.server.port state.settings.server.host, state.settings.server.port
) )
.parse() .parse()
.map_err(|e| miette::miette!("bad listen addr: {e}"))?; .map_err(|e| miette::miette!("bad listen addr: {e}"))?;
tracing::info!(%addr, "listening");
// Start admin GraphQL server on separate port
let admin_port = state
.settings
.server
.admin_port
.unwrap_or(state.settings.server.port + 1);
let admin_addr: SocketAddr = format!("{}:{}", state.settings.server.host, admin_port)
.parse()
.map_err(|e| miette::miette!("bad admin addr: {e}"))?;
let admin_router = crate::admin_graphql::router(seaography_schema, jobs_schema);
// Spawn admin server in background
let admin_listener = tokio::net::TcpListener::bind(admin_addr)
.await
.into_diagnostic()?;
tracing::info!(%admin_addr, "Admin GraphQL API listening");
tracing::info!("GraphQL Playground available at http://{}/admin/playground", admin_addr);
tokio::spawn(async move {
axum::serve(admin_listener, admin_router)
.await
.expect("Admin server failed");
});
// Start public server
tracing::info!(%public_addr, "Public API listening");
tracing::warn!("Rate limiting should be configured at the reverse proxy level for production"); tracing::warn!("Rate limiting should be configured at the reverse proxy level for production");
let listener = tokio::net::TcpListener::bind(addr) let listener = tokio::net::TcpListener::bind(public_addr)
.await .await
.into_diagnostic()?; .into_diagnostic()?;
axum::serve(listener, router).await.into_diagnostic()?; axum::serve(listener, router).await.into_diagnostic()?;
@ -1134,13 +1176,80 @@ async fn get_property(
async fn set_property( async fn set_property(
State(state): State<AppState>, State(state): State<AppState>,
Path((owner, key)): Path<(String, String)>, Path((owner, key)): Path<(String, String)>,
Json(v): Json<Value>, req: axum::http::Request<axum::body::Body>,
) -> impl IntoResponse { ) -> impl IntoResponse {
// Extract bearer token
let token_opt = req
.headers()
.get(axum::http::header::AUTHORIZATION)
.and_then(|h| h.to_str().ok())
.and_then(|s| s.strip_prefix("Bearer "))
.map(|s| s.to_string());
let token = match token_opt {
Some(t) => t,
None => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "missing_token", "error_description": "Bearer token required"})),
)
.into_response();
}
};
// Validate token and get subject
let token_row = match storage::get_access_token(&state.db, &token).await {
Ok(Some(t)) => t,
_ => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "invalid_token", "error_description": "Invalid or expired token"})),
)
.into_response();
}
};
// Check if the authenticated user is trying to set their own property
if token_row.subject != owner {
return (
StatusCode::FORBIDDEN,
Json(json!({
"error": "forbidden",
"error_description": "You can only set your own properties"
})),
)
.into_response();
}
// Extract JSON body
let body_bytes = match axum::body::to_bytes(req.into_body(), usize::MAX).await {
Ok(b) => b,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "invalid_body", "error_description": e.to_string()})),
)
.into_response();
}
};
let v: Value = match serde_json::from_slice(&body_bytes) {
Ok(v) => v,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "invalid_json", "error_description": e.to_string()})),
)
.into_response();
}
};
// Set the property
match storage::set_property(&state.db, &owner, &key, &v).await { match storage::set_property(&state.db, &owner, &key, &v).await {
Ok(_) => (StatusCode::NO_CONTENT, ()).into_response(), Ok(_) => (StatusCode::NO_CONTENT, ()).into_response(),
Err(e) => ( Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": e.to_string()})), Json(json!({"error": "internal_error", "error_description": e.to_string()})),
) )
.into_response(), .into_response(),
} }

55
users.json.example Normal file
View file

@ -0,0 +1,55 @@
{
"users": [
{
"username": "admin",
"email": "admin@example.com",
"password": "change-me-in-production",
"enabled": true,
"email_verified": true,
"properties": {
"department": "IT",
"role": "administrator",
"display_name": "System Administrator"
}
},
{
"username": "alice",
"email": "alice@example.com",
"password": "alice-secure-password",
"enabled": true,
"email_verified": false,
"properties": {
"department": "Engineering",
"role": "developer",
"display_name": "Alice Johnson",
"team": "Platform"
}
},
{
"username": "bob",
"email": "bob@example.com",
"password": "bob-secure-password",
"enabled": true,
"email_verified": true,
"properties": {
"department": "Product",
"role": "product_manager",
"display_name": "Bob Smith"
}
},
{
"username": "charlie",
"email": "charlie@example.com",
"password": "charlie-secure-password",
"enabled": false,
"email_verified": false,
"properties": {
"department": "Engineering",
"role": "developer",
"display_name": "Charlie Brown",
"team": "Backend",
"note": "Account disabled - pending onboarding"
}
}
]
}