From 6f5040978b18867fead9ceaba2d71705d1117da5 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sun, 15 Mar 2026 21:04:03 +0100 Subject: [PATCH] fix: Deduplicate payload downloads to prevent parallel rename races Multiple file actions in a manifest can reference the same payload hash. When downloaded in parallel via rayon, multiple threads would write to the same temp file path simultaneously, causing rename failures. Now deduplicates payloads by digest before parallel download, then maps results back to all file actions that reference each payload. --- libips/src/recv.rs | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/libips/src/recv.rs b/libips/src/recv.rs index f71fb51..a0b1e7c 100644 --- a/libips/src/recv.rs +++ b/libips/src/recv.rs @@ -215,20 +215,25 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { .iter() .filter(|f| f.payload.is_some()) .collect(); - let total_files = payload_files.len() as u64; + // Deduplicate payloads by digest — multiple file actions may reference the + // same payload, and parallel downloads to the same temp path cause races + let mut unique_digests: Vec = payload_files + .iter() + .map(|f| f.payload.as_ref().unwrap().primary_identifier.hash.clone()) + .collect(); + unique_digests.sort(); + unique_digests.dedup(); - // Download all payloads in parallel + // Download unique payloads in parallel let files_done = Arc::new(Mutex::new(0u64)); let publisher_str = publisher.to_string(); let fmri_name = fmri.name.clone(); let temp_dir_path = temp_dir.path().to_path_buf(); + let unique_total = unique_digests.len() as u64; - let download_results: std::result::Result, RepositoryError> = payload_files + let download_results: std::result::Result, RepositoryError> = unique_digests .par_iter() - .map(|file| { - let payload = file.payload.as_ref().unwrap(); - - let digest = &payload.primary_identifier.hash; + .map(|digest| { let temp_file_path = temp_dir_path.join(digest); debug!( @@ -237,12 +242,9 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { temp_file_path.display() ); - // Fetch payload using the primary hash from the manifest. - // This is the hash the source repository knows the file by. self.source .fetch_payload(&publisher_str, digest, &temp_file_path)?; - // Update progress atomically let current_count = { let mut count = files_done.lock() .map_err(|e| RepositoryError::Other(format!("Failed to lock progress counter: {}", e)))?; @@ -252,19 +254,21 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { progress.update( &ProgressInfo::new(format!("Receiving payloads for {}", fmri_name)) - .with_total(total_files) + .with_total(unique_total) .with_current(current_count) .with_context(format!("Payload: {}", digest)), ); - Ok((file, temp_file_path)) + Ok(()) }) .collect(); - let download_info = download_results?; + download_results?; - // Add all files to the transaction - for (file, temp_file_path) in download_info { + // Add all files to the transaction, referencing the downloaded payloads + for file in &payload_files { + let digest = &file.payload.as_ref().unwrap().primary_identifier.hash; + let temp_file_path = temp_dir_path.join(digest); txn.add_file((*file).clone(), &temp_file_path)?; }