From 66b3de433f76e8dfde34e83820ba164f594a24b1 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Fri, 3 Apr 2026 19:39:27 +0200 Subject: [PATCH] feat: add background TTL reaper with orphaned resource cleanup --- src/lib.rs | 1 + src/reaper.rs | 82 ++++++++++++++++++++++++++++++++++++++++ tests/test_reaper.rs | 90 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+) create mode 100644 src/reaper.rs create mode 100644 tests/test_reaper.rs diff --git a/src/lib.rs b/src/lib.rs index 2e575d0..8c30282 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,4 +5,5 @@ pub mod config; pub mod entity; pub mod error; pub mod handler; +pub mod reaper; pub mod state; diff --git a/src/reaper.rs b/src/reaper.rs new file mode 100644 index 0000000..2473a6e --- /dev/null +++ b/src/reaper.rs @@ -0,0 +1,82 @@ +use sea_orm::*; +use std::time::Duration; +use tokio::time; + +use crate::cache::Cache; +use crate::entity::{links, resources}; + +/// Run a single reap cycle: delete expired links, clean up orphaned resources. +pub async fn reap_once(db: &DatabaseConnection, cache: &Cache) -> Result<(), DbErr> { + let now = chrono::Utc::now().naive_utc(); + + // Find expired links and their resource URIs + let expired_links = links::Entity::find() + .filter(links::Column::ExpiresAt.is_not_null()) + .filter(links::Column::ExpiresAt.lt(now)) + .find_also_related(resources::Entity) + .all(db) + .await?; + + let affected_resource_ids: std::collections::HashSet = expired_links + .iter() + .map(|(link, _)| link.resource_id.clone()) + .collect(); + + let affected_resource_uris: std::collections::HashMap = expired_links + .iter() + .filter_map(|(link, resource)| { + resource + .as_ref() + .map(|r| (link.resource_id.clone(), r.resource_uri.clone())) + }) + .collect(); + + if affected_resource_ids.is_empty() { + return Ok(()); + } + + // Delete expired links + let deleted = links::Entity::delete_many() + .filter(links::Column::ExpiresAt.is_not_null()) + .filter(links::Column::ExpiresAt.lt(now)) + .exec(db) + .await?; + + if deleted.rows_affected > 0 { + tracing::info!(count = deleted.rows_affected, "reaped expired links"); + } + + // Clean up orphaned resources (resources with no remaining links) + for resource_id in &affected_resource_ids { + let link_count = links::Entity::find() + .filter(links::Column::ResourceId.eq(resource_id.as_str())) + .count(db) + .await?; + + if link_count == 0 { + resources::Entity::delete_by_id(resource_id) + .exec(db) + .await?; + } + } + + // Refresh cache for affected resources + for (_, uri) in &affected_resource_uris { + cache.refresh_resource(db, uri).await?; + } + + Ok(()) +} + +/// Spawn the background reaper task. +pub fn spawn_reaper(db: DatabaseConnection, cache: Cache, interval_secs: u64) { + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_secs(interval_secs)); + loop { + interval.tick().await; + if let Err(e) = reap_once(&db, &cache).await { + tracing::error!("reaper error: {e}"); + } + } + }); +} diff --git a/tests/test_reaper.rs b/tests/test_reaper.rs new file mode 100644 index 0000000..a607269 --- /dev/null +++ b/tests/test_reaper.rs @@ -0,0 +1,90 @@ +mod common; + +use webfingerd::reaper; + +#[tokio::test] +async fn test_reaper_expires_links() { + let state = common::test_state().await; + + // Insert a resource + link that expires immediately + use sea_orm::*; + use webfingerd::auth; + use webfingerd::entity::{domains, links, resources, service_tokens}; + + let now = chrono::Utc::now().naive_utc(); + let past = now - chrono::Duration::seconds(60); + + // Create domain + let domain = domains::ActiveModel { + id: Set("d1".into()), + domain: Set("example.com".into()), + owner_token_hash: Set(auth::hash_token("test").unwrap()), + registration_secret: Set(String::new()), + challenge_type: Set("dns-01".into()), + challenge_token: Set(None), + verified: Set(true), + created_at: Set(now), + verified_at: Set(Some(now)), + }; + domain.insert(&state.db).await.unwrap(); + + // Create service token + let token = service_tokens::ActiveModel { + id: Set("t1".into()), + domain_id: Set("d1".into()), + name: Set("test".into()), + token_hash: Set(auth::hash_token("test").unwrap()), + allowed_rels: Set(r#"["self"]"#.into()), + resource_pattern: Set("acct:*@example.com".into()), + created_at: Set(now), + revoked_at: Set(None), + }; + token.insert(&state.db).await.unwrap(); + + // Create resource + let resource = resources::ActiveModel { + id: Set("r1".into()), + domain_id: Set("d1".into()), + resource_uri: Set("acct:alice@example.com".into()), + aliases: Set(None), + properties: Set(None), + created_at: Set(now), + updated_at: Set(now), + }; + resource.insert(&state.db).await.unwrap(); + + // Create expired link + let link = links::ActiveModel { + id: Set("l1".into()), + resource_id: Set("r1".into()), + service_token_id: Set("t1".into()), + domain_id: Set("d1".into()), + rel: Set("self".into()), + href: Set(Some("https://example.com/users/alice".into())), + link_type: Set(None), + titles: Set(None), + properties: Set(None), + template: Set(None), + ttl_seconds: Set(Some(1)), + created_at: Set(past), + expires_at: Set(Some(past + chrono::Duration::seconds(1))), + }; + link.insert(&state.db).await.unwrap(); + + // Hydrate cache + state.cache.hydrate(&state.db).await.unwrap(); + + // Should NOT be in cache (already expired) + assert!(state.cache.get("acct:alice@example.com").is_none()); + + // Run reaper once + reaper::reap_once(&state.db, &state.cache).await.unwrap(); + + // Link should be deleted from DB + let remaining = links::Entity::find().all(&state.db).await.unwrap(); + assert!(remaining.is_empty()); + + // Orphaned resource should also be cleaned up + let remaining_resources = resources::Entity::find().all(&state.db).await.unwrap(); + assert!(remaining_resources.is_empty()); +}