From 88e06b488dd4f9ec80f400fd69416c1dcc190908 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Wed, 27 Aug 2025 00:05:36 +0200 Subject: [PATCH] Refactor catalog processing to merge parts by version - Introduced `process_publisher_merged` to consolidate catalog parts per `stem@version` before processing. - Replaced part-wise processing with merged version logic for efficiency and improved determinism. - Added `process_catalog_part` dead code annotation and cleaned up outdated logic. - Enhanced handling of merged actions and signature deduplication across catalog parts. - Improved logging for import and obsolete package metrics. --- libips/src/image/catalog.rs | 145 ++++++++++++++++++++++++++++++++---- 1 file changed, 129 insertions(+), 16 deletions(-) diff --git a/libips/src/image/catalog.rs b/libips/src/image/catalog.rs index 1a584ba..988dc6d 100644 --- a/libips/src/image/catalog.rs +++ b/libips/src/image/catalog.rs @@ -10,6 +10,7 @@ use thiserror::Error; use tracing::{info, warn, trace}; use std::io::{Cursor, Read, Write}; use lz4::{Decoder as Lz4Decoder, EncoderBuilder as Lz4EncoderBuilder}; +use std::collections::HashMap; /// Table definition for the catalog database /// Key: stem@version @@ -417,24 +418,14 @@ impl ImageCatalog { .map_err(|e| CatalogError::Repository(crate::repository::RepositoryError::Other(format!("Failed to load catalog part: {}", e))))?; } - // Process each catalog part in a deterministic order: base, dependency, summary, others - let mut part_names: Vec = parts.keys().cloned().collect(); - part_names.sort_by_key(|name| { - if name.contains(".base") { 1 } - else if name.contains(".dependency") { 0 } - else if name.contains(".summary") { 2 } - else { 3 } - }); - for part_name in part_names { - trace!("Processing catalog part: {}", part_name); - if let Some(part) = catalog_manager.get_part(&part_name) { - trace!("Found catalog part: {}", part_name); - trace!("Packages in part: {:?}", part.packages.keys().collect::>()); - self.process_catalog_part(&mut catalog_table, &mut obsoleted_table, &part_name, part, publisher)?; - } else { - trace!("Catalog part not found: {}", part_name); + // New approach: Merge information across all catalog parts per stem@version, then process once + let mut loaded_parts: Vec<&CatalogPart> = Vec::new(); + for part_name in parts.keys() { + if let Some(part) = catalog_manager.get_part(part_name) { + loaded_parts.push(part); } } + self.process_publisher_merged(&mut catalog_table, &mut obsoleted_table, publisher, &loaded_parts)?; } // Drop the tables to release the borrow on tx @@ -452,6 +443,7 @@ impl ImageCatalog { } /// Process a catalog part and add its packages to the catalog + #[allow(dead_code)] fn process_catalog_part( &self, catalog_table: &mut redb::Table<&str, &[u8]>, @@ -586,6 +578,127 @@ impl ImageCatalog { Ok(()) } + /// Process all catalog parts by merging entries per stem@version and deciding once per package + fn process_publisher_merged( + &self, + catalog_table: &mut redb::Table<&str, &[u8]>, + obsoleted_table: &mut redb::Table<&str, &[u8]>, + publisher: &str, + parts: &[&CatalogPart], + ) -> Result<()> { + trace!("Processing merged catalog for publisher: {}", publisher); + + // Build merged map: stem -> version -> PackageVersionEntry (with merged actions/signature) + let mut merged: HashMap> = HashMap::new(); + + for part in parts { + if let Some(publisher_packages) = part.packages.get(publisher) { + for (stem, versions) in publisher_packages { + let stem_map = merged.entry(stem.clone()).or_default(); + for v in versions { + let entry = stem_map + .entry(v.version.clone()) + .or_insert(PackageVersionEntry { + version: v.version.clone(), + actions: None, + signature_sha1: None, + }); + // Merge signature if not yet set + if entry.signature_sha1.is_none() { + if let Some(sig) = &v.signature_sha1 { entry.signature_sha1 = Some(sig.clone()); } + } + // Merge actions, de-duplicating + if let Some(actions) = &v.actions { + let ea = entry.actions.get_or_insert_with(Vec::new); + for a in actions { + if !ea.contains(a) { + ea.push(a.clone()); + } + } + } + } + } + } + } + + // Compute totals for progress logging + let total_versions: usize = merged.values().map(|m| m.len()).sum(); + let mut processed: usize = 0; + let mut obsolete_count: usize = 0; + let progress_step: usize = 500; + + // Deterministic order: sort stems and versions + let mut stems: Vec<&String> = merged.keys().collect(); + stems.sort(); + + for stem in stems { + if let Some(versions_map) = merged.get(stem) { + let mut versions: Vec<&String> = versions_map.keys().collect(); + versions.sort(); + + for ver in versions { + let entry = versions_map.get(ver).expect("version entry exists"); + + // Keys + let catalog_key = format!("{}@{}", stem, entry.version); + + // Read existing manifest if present + let existing_manifest = match catalog_table.get(catalog_key.as_str()) { + Ok(Some(bytes)) => Some(decode_manifest_bytes(bytes.value())?), + _ => None, + }; + + // Build/update manifest with merged actions + let manifest = self.create_or_update_manifest(existing_manifest, entry, stem, publisher)?; + + // Obsolete decision based on merged actions in manifest + let is_obsolete = self.is_package_obsolete(&manifest); + if is_obsolete { obsolete_count += 1; } + + // Serialize and write + if is_obsolete { + // Compute full FMRI for obsoleted key + let version_obj = if !entry.version.is_empty() { + match crate::fmri::Version::parse(&entry.version) { Ok(v) => Some(v), Err(_) => None } + } else { None }; + let fmri = Fmri::with_publisher(publisher, stem, version_obj); + let obsoleted_key = fmri.to_string(); + let empty_bytes: &[u8] = &[0u8; 0]; + obsoleted_table + .insert(obsoleted_key.as_str(), empty_bytes) + .map_err(|e| CatalogError::Database(format!("Failed to insert into obsoleted table: {}", e)))?; + } else { + let manifest_bytes = serde_json::to_vec(&manifest)?; + let compressed = compress_json_lz4(&manifest_bytes)?; + catalog_table + .insert(catalog_key.as_str(), compressed.as_slice()) + .map_err(|e| CatalogError::Database(format!("Failed to insert into catalog table: {}", e)))?; + } + + processed += 1; + if processed % progress_step == 0 { + info!( + "Import progress (publisher {}, merged): {}/{} versions processed ({} obsolete)", + publisher, + processed, + total_versions, + obsolete_count + ); + } + } + } + } + + info!( + "Finished merged import for publisher {}: {} versions processed ({} obsolete)", + publisher, + processed, + obsolete_count + ); + + Ok(()) + } + /// Create or update a manifest from a package version entry fn create_or_update_manifest( &self,