From 750df8dcc7fc2c5aea2789c616d9402a23de555f Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sun, 15 Mar 2026 19:32:24 +0100 Subject: [PATCH] fix: Resolve pkg6recv multithreaded download race conditions and file loss - Use tempdir_in(repo_path) instead of system /tmp to keep temp files on the same filesystem, preventing cross-device rename failures - Replace fs::copy with fs::rename for atomic file moves in transaction commit and signature payload storage - Fix Digest construction to use correct SHA256 algorithm instead of defaulting to SHA1 for bare hex hashes - Try compressed hash (additional_identifiers) as fallback when primary hash lookup fails during payload fetch - Remove duplicate publisher resolution block in Transaction::commit() - Add integration test for multi-file parallel package receive Closes: https://codeberg.org/Toasterson/ips/issues/21 --- libips/src/recv.rs | 144 ++++++++++++++++++++++++-- libips/src/repository/file_backend.rs | 113 ++++++++------------ 2 files changed, 181 insertions(+), 76 deletions(-) diff --git a/libips/src/recv.rs b/libips/src/recv.rs index c5885fc..b8b2d7a 100644 --- a/libips/src/recv.rs +++ b/libips/src/recv.rs @@ -12,7 +12,7 @@ use crate::repository::{ use rayon::prelude::*; use std::collections::HashSet; use std::sync::{Arc, Mutex}; -use tempfile::tempdir; +use tempfile::tempdir_in; use tracing::{debug, info}; /// PackageReceiver handles downloading packages from a source repository @@ -208,7 +208,7 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { txn.set_publisher(publisher); txn.set_legacy_manifest(manifest_text); - let temp_dir = tempdir().map_err(RepositoryError::IoError)?; + let temp_dir = tempdir_in(&self.dest.path).map_err(RepositoryError::IoError)?; let payload_files: Vec<_> = manifest .files @@ -227,6 +227,16 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { .par_iter() .map(|file| { let payload = file.payload.as_ref().unwrap(); + + // Collect all candidate digests: primary first, then additional (compressed hash) + // Use Display format (source:algorithm:hash) so fetch_payload parses correctly + let mut digests = vec![payload.primary_identifier.to_string()]; + for additional in &payload.additional_identifiers { + if !additional.hash.is_empty() { + digests.push(additional.to_string()); + } + } + let digest = &payload.primary_identifier.hash; let temp_file_path = temp_dir_path.join(digest); @@ -236,9 +246,23 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { temp_file_path.display() ); - // Download the payload (now works with &self) - self.source - .fetch_payload(&publisher_str, digest, &temp_file_path)?; + // Try each digest until one succeeds (primary hash may differ from storage hash) + let mut last_err = None; + for d in &digests { + match self.source.fetch_payload(&publisher_str, d, &temp_file_path) { + Ok(()) => { + last_err = None; + break; + } + Err(e) => { + debug!("Failed to fetch payload with digest {}: {}", d, e); + last_err = Some(e); + } + } + } + if let Some(e) = last_err { + return Err(e); + } // Update progress atomically let current_count = { @@ -315,7 +339,13 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { digest, dest_path.display() ); - std::fs::copy(&temp_file_path, &dest_path).map_err(RepositoryError::IoError)?; + std::fs::rename(&temp_file_path, &dest_path).map_err(|e| { + RepositoryError::FileRenameError { + from: temp_file_path.clone(), + to: dest_path, + source: e, + } + })?; } } @@ -493,4 +523,106 @@ mod tests { Ok(()) } + + #[test] + fn test_receive_multi_file_package() -> Result<()> { + use crate::actions::File as FileAction; + + let source_dir = tempdir().map_err(RepositoryError::IoError)?; + let dest_dir = tempdir().map_err(RepositoryError::IoError)?; + + // Create source repo + let mut source_repo = FileBackend::create(source_dir.path(), RepositoryVersion::V4)?; + source_repo.add_publisher("test")?; + + // Create a prototype directory with multiple files + let proto_dir = source_dir.path().join("proto"); + std::fs::create_dir_all(&proto_dir).map_err(RepositoryError::IoError)?; + + let file_contents: Vec<(&str, &[u8])> = vec![ + ("etc/config.txt", b"config-content-1"), + ("usr/bin/tool", b"binary-content-2"), + ("usr/lib/libfoo.so", b"library-content-3"), + ("var/data/db.dat", b"data-content-4"), + ("opt/extra/readme", b"readme-content-5"), + ]; + + let mut file_actions = Vec::new(); + for (path, content) in &file_contents { + let full_path = proto_dir.join(path); + std::fs::create_dir_all(full_path.parent().unwrap()) + .map_err(RepositoryError::IoError)?; + std::fs::write(&full_path, content).map_err(RepositoryError::IoError)?; + + let mut fa = FileAction::default(); + fa.path = path.to_string(); + fa.payload = Some(crate::payload::Payload::compute_payload(&full_path).unwrap()); + file_actions.push((fa, full_path)); + } + + // Create the package with files via transaction + let fmri = Fmri::parse("pkg://test/multifile@1.0").unwrap(); + let mut manifest = Manifest::new(); + manifest.attributes.push(Attr { + key: "pkg.fmri".to_string(), + values: vec![fmri.to_string()], + ..Default::default() + }); + + let mut txn = source_repo.begin_transaction()?; + txn.set_publisher("test"); + for (fa, path) in file_actions { + txn.add_file(fa, &path)?; + } + txn.update_manifest(manifest); + txn.commit()?; + source_repo.rebuild(Some("test"), false, false)?; + + // Receive into destination repo + let dest_repo = FileBackend::create(dest_dir.path(), RepositoryVersion::V4)?; + let mut receiver = PackageReceiver::new(&source_repo, dest_repo); + receiver.receive(Some("test"), &[Fmri::new("multifile")], false)?; + + // Verify destination has the package + let dest_repo_check = FileBackend::open(dest_dir.path())?; + let pkgs = dest_repo_check.list_packages(Some("test"), Some("multifile"))?; + assert_eq!(pkgs.len(), 1, "Package should exist in destination"); + + // Verify the manifest contains all files + let dest_manifest = dest_repo_check.fetch_manifest( + "test", + &Fmri::parse("pkg://test/multifile@1.0").unwrap(), + )?; + assert_eq!( + dest_manifest.files.len(), + file_contents.len(), + "All {} files should be in the manifest", + file_contents.len() + ); + + // Verify payload files exist in the destination repo's file store. + // The dest re-compresses files so hashes differ from source. Count files in file store. + let file_store = dest_dir.path().join("publisher/test/file"); + let mut payload_count = 0; + if file_store.exists() { + for bucket in std::fs::read_dir(&file_store).unwrap() { + let bucket = bucket.unwrap(); + if bucket.path().is_dir() { + for entry in std::fs::read_dir(bucket.path()).unwrap() { + if entry.unwrap().path().is_file() { + payload_count += 1; + } + } + } + } + } + assert_eq!( + payload_count, + file_contents.len(), + "All {} payload files should exist in destination file store", + file_contents.len() + ); + + Ok(()) + } } diff --git a/libips/src/repository/file_backend.rs b/libips/src/repository/file_backend.rs index 16eb24e..0df2c0d 100644 --- a/libips/src/repository/file_backend.rs +++ b/libips/src/repository/file_backend.rs @@ -20,7 +20,7 @@ use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{debug, error, info}; use crate::actions::{File as FileAction, Manifest}; -use crate::digest::Digest; +use crate::digest::{Digest, DigestAlgorithm, DigestSource}; use crate::fmri::Fmri; use crate::payload::{Payload, PayloadCompressionAlgorithm}; @@ -280,15 +280,26 @@ impl Transaction { self.files .push((temp_file_path.clone(), compressed_hash.clone())); - // Set the primary identifier (uncompressed hash) - payload.primary_identifier = Digest::from_str(&hash)?; + // Set the primary identifier (uncompressed SHA256 hash) + payload.primary_identifier = Digest { + hash: hash.clone(), + algorithm: DigestAlgorithm::SHA256, + source: DigestSource::UncompressedFile, + }; // Set the compression algorithm payload.compression_algorithm = compression_algorithm; // Add the compressed hash as an additional identifier - let compressed_digest = Digest::from_str(&compressed_hash)?; - payload.additional_identifiers.push(compressed_digest); + let compressed_source = match payload.compression_algorithm { + PayloadCompressionAlgorithm::Gzip => DigestSource::GzipCompressed, + PayloadCompressionAlgorithm::LZ4 => DigestSource::GzipCompressed, // LZ4 shares file storage pattern + }; + payload.additional_identifiers.push(Digest { + hash: compressed_hash.clone(), + algorithm: DigestAlgorithm::SHA256, + source: compressed_source, + }); // Update the FileAction with the payload updated_file_action.payload = Some(payload); @@ -365,30 +376,6 @@ impl Transaction { } }; - // Copy files to their final location - for (source_path, hash) in self.files { - // Create the destination path using the helper function with publisher - let dest_path = - FileBackend::construct_file_path_with_publisher(&self.repo, &publisher, &hash); - - // Create parent directories if they don't exist - if let Some(parent) = dest_path.parent() { - fs::create_dir_all(parent).map_err(|e| RepositoryError::DirectoryCreateError { - path: parent.to_path_buf(), - source: e, - })?; - } - - // Copy the file if it doesn't already exist - if !dest_path.exists() { - fs::copy(&source_path, &dest_path).map_err(|e| RepositoryError::FileCopyError { - from: source_path.clone(), - to: dest_path, - source: e, - })?; - } - } - // Extract package information from manifest let mut package_stem = String::from("unknown"); let mut package_version = String::from(""); @@ -404,40 +391,26 @@ impl Transaction { } } - // Determine the publisher to use - let publisher = match &self.publisher { - Some(pub_name) => { - debug!("Using specified publisher: {}", pub_name); - pub_name.clone() + // Move files to their final location (atomic rename, same filesystem) + for (source_path, hash) in self.files { + let dest_path = + FileBackend::construct_file_path_with_publisher(&self.repo, &publisher, &hash); + + if let Some(parent) = dest_path.parent() { + fs::create_dir_all(parent).map_err(|e| RepositoryError::DirectoryCreateError { + path: parent.to_path_buf(), + source: e, + })?; } - None => { - debug!("No publisher specified, trying to use default publisher"); - // If no publisher is specified, use the default publisher from the repository config - let config_path = self.repo.join(REPOSITORY_CONFIG_FILENAME); - if config_path.exists() { - let config_content = fs::read_to_string(&config_path)?; - let config: RepositoryConfig = serde_json::from_str(&config_content)?; - match config.default_publisher { - Some(default_pub) => { - debug!("Using default publisher: {}", default_pub); - default_pub - } - None => { - debug!("No default publisher set in repository"); - return Err(RepositoryError::Other( - "No publisher specified and no default publisher set in repository" - .to_string(), - )); - } - } - } else { - debug!("Repository configuration not found"); - return Err(RepositoryError::Other( - "No publisher specified and repository configuration not found".to_string(), - )); - } + + if !dest_path.exists() { + fs::rename(&source_path, &dest_path).map_err(|e| RepositoryError::FileRenameError { + from: source_path.clone(), + to: dest_path, + source: e, + })?; } - }; + } // Create the package directory if it doesn't exist let pkg_dir = FileBackend::construct_package_dir(&self.repo, &publisher, &package_stem); @@ -474,30 +447,30 @@ impl Transaction { })?; } - // Copy to pkg directory - // 1. Copy JSON manifest + // Move manifests to pkg directory (atomic rename, same filesystem) + // 1. JSON manifest let pkg_manifest_json_path = PathBuf::from(format!("{}.json", pkg_manifest_path.display())); debug!( - "Copying JSON manifest from {} to {}", + "Moving JSON manifest from {} to {}", manifest_json_path.display(), pkg_manifest_json_path.display() ); - fs::copy(&manifest_json_path, &pkg_manifest_json_path).map_err(|e| { - RepositoryError::FileCopyError { + fs::rename(&manifest_json_path, &pkg_manifest_json_path).map_err(|e| { + RepositoryError::FileRenameError { from: manifest_json_path, to: pkg_manifest_json_path, source: e, } })?; - // 2. Copy legacy manifest + // 2. Legacy manifest debug!( - "Copying legacy manifest from {} to {}", + "Moving legacy manifest from {} to {}", manifest_legacy_path.display(), pkg_manifest_path.display() ); - fs::copy(&manifest_legacy_path, &pkg_manifest_path).map_err(|e| { - RepositoryError::FileCopyError { + fs::rename(&manifest_legacy_path, &pkg_manifest_path).map_err(|e| { + RepositoryError::FileRenameError { from: manifest_legacy_path, to: pkg_manifest_path, source: e,