mirror of
https://github.com/CloudNebulaProject/webfingerd.git
synced 2026-04-10 21:20:41 +00:00
feat: add background TTL reaper with orphaned resource cleanup
This commit is contained in:
parent
9464e2692e
commit
66b3de433f
3 changed files with 173 additions and 0 deletions
|
|
@ -5,4 +5,5 @@ pub mod config;
|
||||||
pub mod entity;
|
pub mod entity;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod handler;
|
pub mod handler;
|
||||||
|
pub mod reaper;
|
||||||
pub mod state;
|
pub mod state;
|
||||||
|
|
|
||||||
82
src/reaper.rs
Normal file
82
src/reaper.rs
Normal file
|
|
@ -0,0 +1,82 @@
|
||||||
|
use sea_orm::*;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
|
use crate::cache::Cache;
|
||||||
|
use crate::entity::{links, resources};
|
||||||
|
|
||||||
|
/// Run a single reap cycle: delete expired links, clean up orphaned resources.
|
||||||
|
pub async fn reap_once(db: &DatabaseConnection, cache: &Cache) -> Result<(), DbErr> {
|
||||||
|
let now = chrono::Utc::now().naive_utc();
|
||||||
|
|
||||||
|
// Find expired links and their resource URIs
|
||||||
|
let expired_links = links::Entity::find()
|
||||||
|
.filter(links::Column::ExpiresAt.is_not_null())
|
||||||
|
.filter(links::Column::ExpiresAt.lt(now))
|
||||||
|
.find_also_related(resources::Entity)
|
||||||
|
.all(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let affected_resource_ids: std::collections::HashSet<String> = expired_links
|
||||||
|
.iter()
|
||||||
|
.map(|(link, _)| link.resource_id.clone())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let affected_resource_uris: std::collections::HashMap<String, String> = expired_links
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(link, resource)| {
|
||||||
|
resource
|
||||||
|
.as_ref()
|
||||||
|
.map(|r| (link.resource_id.clone(), r.resource_uri.clone()))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if affected_resource_ids.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete expired links
|
||||||
|
let deleted = links::Entity::delete_many()
|
||||||
|
.filter(links::Column::ExpiresAt.is_not_null())
|
||||||
|
.filter(links::Column::ExpiresAt.lt(now))
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if deleted.rows_affected > 0 {
|
||||||
|
tracing::info!(count = deleted.rows_affected, "reaped expired links");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up orphaned resources (resources with no remaining links)
|
||||||
|
for resource_id in &affected_resource_ids {
|
||||||
|
let link_count = links::Entity::find()
|
||||||
|
.filter(links::Column::ResourceId.eq(resource_id.as_str()))
|
||||||
|
.count(db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if link_count == 0 {
|
||||||
|
resources::Entity::delete_by_id(resource_id)
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Refresh cache for affected resources
|
||||||
|
for (_, uri) in &affected_resource_uris {
|
||||||
|
cache.refresh_resource(db, uri).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn the background reaper task.
|
||||||
|
pub fn spawn_reaper(db: DatabaseConnection, cache: Cache, interval_secs: u64) {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = time::interval(Duration::from_secs(interval_secs));
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
if let Err(e) = reap_once(&db, &cache).await {
|
||||||
|
tracing::error!("reaper error: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
90
tests/test_reaper.rs
Normal file
90
tests/test_reaper.rs
Normal file
|
|
@ -0,0 +1,90 @@
|
||||||
|
mod common;
|
||||||
|
|
||||||
|
use webfingerd::reaper;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_reaper_expires_links() {
|
||||||
|
let state = common::test_state().await;
|
||||||
|
|
||||||
|
// Insert a resource + link that expires immediately
|
||||||
|
use sea_orm::*;
|
||||||
|
use webfingerd::auth;
|
||||||
|
use webfingerd::entity::{domains, links, resources, service_tokens};
|
||||||
|
|
||||||
|
let now = chrono::Utc::now().naive_utc();
|
||||||
|
let past = now - chrono::Duration::seconds(60);
|
||||||
|
|
||||||
|
// Create domain
|
||||||
|
let domain = domains::ActiveModel {
|
||||||
|
id: Set("d1".into()),
|
||||||
|
domain: Set("example.com".into()),
|
||||||
|
owner_token_hash: Set(auth::hash_token("test").unwrap()),
|
||||||
|
registration_secret: Set(String::new()),
|
||||||
|
challenge_type: Set("dns-01".into()),
|
||||||
|
challenge_token: Set(None),
|
||||||
|
verified: Set(true),
|
||||||
|
created_at: Set(now),
|
||||||
|
verified_at: Set(Some(now)),
|
||||||
|
};
|
||||||
|
domain.insert(&state.db).await.unwrap();
|
||||||
|
|
||||||
|
// Create service token
|
||||||
|
let token = service_tokens::ActiveModel {
|
||||||
|
id: Set("t1".into()),
|
||||||
|
domain_id: Set("d1".into()),
|
||||||
|
name: Set("test".into()),
|
||||||
|
token_hash: Set(auth::hash_token("test").unwrap()),
|
||||||
|
allowed_rels: Set(r#"["self"]"#.into()),
|
||||||
|
resource_pattern: Set("acct:*@example.com".into()),
|
||||||
|
created_at: Set(now),
|
||||||
|
revoked_at: Set(None),
|
||||||
|
};
|
||||||
|
token.insert(&state.db).await.unwrap();
|
||||||
|
|
||||||
|
// Create resource
|
||||||
|
let resource = resources::ActiveModel {
|
||||||
|
id: Set("r1".into()),
|
||||||
|
domain_id: Set("d1".into()),
|
||||||
|
resource_uri: Set("acct:alice@example.com".into()),
|
||||||
|
aliases: Set(None),
|
||||||
|
properties: Set(None),
|
||||||
|
created_at: Set(now),
|
||||||
|
updated_at: Set(now),
|
||||||
|
};
|
||||||
|
resource.insert(&state.db).await.unwrap();
|
||||||
|
|
||||||
|
// Create expired link
|
||||||
|
let link = links::ActiveModel {
|
||||||
|
id: Set("l1".into()),
|
||||||
|
resource_id: Set("r1".into()),
|
||||||
|
service_token_id: Set("t1".into()),
|
||||||
|
domain_id: Set("d1".into()),
|
||||||
|
rel: Set("self".into()),
|
||||||
|
href: Set(Some("https://example.com/users/alice".into())),
|
||||||
|
link_type: Set(None),
|
||||||
|
titles: Set(None),
|
||||||
|
properties: Set(None),
|
||||||
|
template: Set(None),
|
||||||
|
ttl_seconds: Set(Some(1)),
|
||||||
|
created_at: Set(past),
|
||||||
|
expires_at: Set(Some(past + chrono::Duration::seconds(1))),
|
||||||
|
};
|
||||||
|
link.insert(&state.db).await.unwrap();
|
||||||
|
|
||||||
|
// Hydrate cache
|
||||||
|
state.cache.hydrate(&state.db).await.unwrap();
|
||||||
|
|
||||||
|
// Should NOT be in cache (already expired)
|
||||||
|
assert!(state.cache.get("acct:alice@example.com").is_none());
|
||||||
|
|
||||||
|
// Run reaper once
|
||||||
|
reaper::reap_once(&state.db, &state.cache).await.unwrap();
|
||||||
|
|
||||||
|
// Link should be deleted from DB
|
||||||
|
let remaining = links::Entity::find().all(&state.db).await.unwrap();
|
||||||
|
assert!(remaining.is_empty());
|
||||||
|
|
||||||
|
// Orphaned resource should also be cleaned up
|
||||||
|
let remaining_resources = resources::Entity::find().all(&state.db).await.unwrap();
|
||||||
|
assert!(remaining_resources.is_empty());
|
||||||
|
}
|
||||||
Loading…
Add table
Reference in a new issue