Refactor catalog processing to merge parts by version

- Introduced `process_publisher_merged` to consolidate catalog parts per `stem@version` before processing.
- Replaced part-wise processing with merged version logic for efficiency and improved determinism.
- Added `process_catalog_part` dead code annotation and cleaned up outdated logic.
- Enhanced handling of merged actions and signature deduplication across catalog parts.
- Improved logging for import and obsolete package metrics.
This commit is contained in:
Till Wegmueller 2025-08-27 00:05:36 +02:00
parent 6ed8bb5b5b
commit 88e06b488d
No known key found for this signature in database

View file

@ -10,6 +10,7 @@ use thiserror::Error;
use tracing::{info, warn, trace};
use std::io::{Cursor, Read, Write};
use lz4::{Decoder as Lz4Decoder, EncoderBuilder as Lz4EncoderBuilder};
use std::collections::HashMap;
/// Table definition for the catalog database
/// Key: stem@version
@ -417,24 +418,14 @@ impl ImageCatalog {
.map_err(|e| CatalogError::Repository(crate::repository::RepositoryError::Other(format!("Failed to load catalog part: {}", e))))?;
}
// Process each catalog part in a deterministic order: base, dependency, summary, others
let mut part_names: Vec<String> = parts.keys().cloned().collect();
part_names.sort_by_key(|name| {
if name.contains(".base") { 1 }
else if name.contains(".dependency") { 0 }
else if name.contains(".summary") { 2 }
else { 3 }
});
for part_name in part_names {
trace!("Processing catalog part: {}", part_name);
if let Some(part) = catalog_manager.get_part(&part_name) {
trace!("Found catalog part: {}", part_name);
trace!("Packages in part: {:?}", part.packages.keys().collect::<Vec<_>>());
self.process_catalog_part(&mut catalog_table, &mut obsoleted_table, &part_name, part, publisher)?;
} else {
trace!("Catalog part not found: {}", part_name);
// New approach: Merge information across all catalog parts per stem@version, then process once
let mut loaded_parts: Vec<&CatalogPart> = Vec::new();
for part_name in parts.keys() {
if let Some(part) = catalog_manager.get_part(part_name) {
loaded_parts.push(part);
}
}
self.process_publisher_merged(&mut catalog_table, &mut obsoleted_table, publisher, &loaded_parts)?;
}
// Drop the tables to release the borrow on tx
@ -452,6 +443,7 @@ impl ImageCatalog {
}
/// Process a catalog part and add its packages to the catalog
#[allow(dead_code)]
fn process_catalog_part(
&self,
catalog_table: &mut redb::Table<&str, &[u8]>,
@ -586,6 +578,127 @@ impl ImageCatalog {
Ok(())
}
/// Process all catalog parts by merging entries per stem@version and deciding once per package
fn process_publisher_merged(
&self,
catalog_table: &mut redb::Table<&str, &[u8]>,
obsoleted_table: &mut redb::Table<&str, &[u8]>,
publisher: &str,
parts: &[&CatalogPart],
) -> Result<()> {
trace!("Processing merged catalog for publisher: {}", publisher);
// Build merged map: stem -> version -> PackageVersionEntry (with merged actions/signature)
let mut merged: HashMap<String, HashMap<String, PackageVersionEntry>> = HashMap::new();
for part in parts {
if let Some(publisher_packages) = part.packages.get(publisher) {
for (stem, versions) in publisher_packages {
let stem_map = merged.entry(stem.clone()).or_default();
for v in versions {
let entry = stem_map
.entry(v.version.clone())
.or_insert(PackageVersionEntry {
version: v.version.clone(),
actions: None,
signature_sha1: None,
});
// Merge signature if not yet set
if entry.signature_sha1.is_none() {
if let Some(sig) = &v.signature_sha1 { entry.signature_sha1 = Some(sig.clone()); }
}
// Merge actions, de-duplicating
if let Some(actions) = &v.actions {
let ea = entry.actions.get_or_insert_with(Vec::new);
for a in actions {
if !ea.contains(a) {
ea.push(a.clone());
}
}
}
}
}
}
}
// Compute totals for progress logging
let total_versions: usize = merged.values().map(|m| m.len()).sum();
let mut processed: usize = 0;
let mut obsolete_count: usize = 0;
let progress_step: usize = 500;
// Deterministic order: sort stems and versions
let mut stems: Vec<&String> = merged.keys().collect();
stems.sort();
for stem in stems {
if let Some(versions_map) = merged.get(stem) {
let mut versions: Vec<&String> = versions_map.keys().collect();
versions.sort();
for ver in versions {
let entry = versions_map.get(ver).expect("version entry exists");
// Keys
let catalog_key = format!("{}@{}", stem, entry.version);
// Read existing manifest if present
let existing_manifest = match catalog_table.get(catalog_key.as_str()) {
Ok(Some(bytes)) => Some(decode_manifest_bytes(bytes.value())?),
_ => None,
};
// Build/update manifest with merged actions
let manifest = self.create_or_update_manifest(existing_manifest, entry, stem, publisher)?;
// Obsolete decision based on merged actions in manifest
let is_obsolete = self.is_package_obsolete(&manifest);
if is_obsolete { obsolete_count += 1; }
// Serialize and write
if is_obsolete {
// Compute full FMRI for obsoleted key
let version_obj = if !entry.version.is_empty() {
match crate::fmri::Version::parse(&entry.version) { Ok(v) => Some(v), Err(_) => None }
} else { None };
let fmri = Fmri::with_publisher(publisher, stem, version_obj);
let obsoleted_key = fmri.to_string();
let empty_bytes: &[u8] = &[0u8; 0];
obsoleted_table
.insert(obsoleted_key.as_str(), empty_bytes)
.map_err(|e| CatalogError::Database(format!("Failed to insert into obsoleted table: {}", e)))?;
} else {
let manifest_bytes = serde_json::to_vec(&manifest)?;
let compressed = compress_json_lz4(&manifest_bytes)?;
catalog_table
.insert(catalog_key.as_str(), compressed.as_slice())
.map_err(|e| CatalogError::Database(format!("Failed to insert into catalog table: {}", e)))?;
}
processed += 1;
if processed % progress_step == 0 {
info!(
"Import progress (publisher {}, merged): {}/{} versions processed ({} obsolete)",
publisher,
processed,
total_versions,
obsolete_count
);
}
}
}
}
info!(
"Finished merged import for publisher {}: {} versions processed ({} obsolete)",
publisher,
processed,
obsolete_count
);
Ok(())
}
/// Create or update a manifest from a package version entry
fn create_or_update_manifest(
&self,