fix: Deduplicate payload downloads to prevent parallel rename races

Multiple file actions in a manifest can reference the same payload hash.
When downloaded in parallel via rayon, multiple threads would write to
the same temp file path simultaneously, causing rename failures.

Now deduplicates payloads by digest before parallel download, then maps
results back to all file actions that reference each payload.
This commit is contained in:
Till Wegmueller 2026-03-15 21:04:03 +01:00
parent bcea795848
commit 6f5040978b

View file

@ -215,20 +215,25 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> {
.iter() .iter()
.filter(|f| f.payload.is_some()) .filter(|f| f.payload.is_some())
.collect(); .collect();
let total_files = payload_files.len() as u64; // Deduplicate payloads by digest — multiple file actions may reference the
// same payload, and parallel downloads to the same temp path cause races
let mut unique_digests: Vec<String> = payload_files
.iter()
.map(|f| f.payload.as_ref().unwrap().primary_identifier.hash.clone())
.collect();
unique_digests.sort();
unique_digests.dedup();
// Download all payloads in parallel // Download unique payloads in parallel
let files_done = Arc::new(Mutex::new(0u64)); let files_done = Arc::new(Mutex::new(0u64));
let publisher_str = publisher.to_string(); let publisher_str = publisher.to_string();
let fmri_name = fmri.name.clone(); let fmri_name = fmri.name.clone();
let temp_dir_path = temp_dir.path().to_path_buf(); let temp_dir_path = temp_dir.path().to_path_buf();
let unique_total = unique_digests.len() as u64;
let download_results: std::result::Result<Vec<_>, RepositoryError> = payload_files let download_results: std::result::Result<Vec<_>, RepositoryError> = unique_digests
.par_iter() .par_iter()
.map(|file| { .map(|digest| {
let payload = file.payload.as_ref().unwrap();
let digest = &payload.primary_identifier.hash;
let temp_file_path = temp_dir_path.join(digest); let temp_file_path = temp_dir_path.join(digest);
debug!( debug!(
@ -237,12 +242,9 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> {
temp_file_path.display() temp_file_path.display()
); );
// Fetch payload using the primary hash from the manifest.
// This is the hash the source repository knows the file by.
self.source self.source
.fetch_payload(&publisher_str, digest, &temp_file_path)?; .fetch_payload(&publisher_str, digest, &temp_file_path)?;
// Update progress atomically
let current_count = { let current_count = {
let mut count = files_done.lock() let mut count = files_done.lock()
.map_err(|e| RepositoryError::Other(format!("Failed to lock progress counter: {}", e)))?; .map_err(|e| RepositoryError::Other(format!("Failed to lock progress counter: {}", e)))?;
@ -252,19 +254,21 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> {
progress.update( progress.update(
&ProgressInfo::new(format!("Receiving payloads for {}", fmri_name)) &ProgressInfo::new(format!("Receiving payloads for {}", fmri_name))
.with_total(total_files) .with_total(unique_total)
.with_current(current_count) .with_current(current_count)
.with_context(format!("Payload: {}", digest)), .with_context(format!("Payload: {}", digest)),
); );
Ok((file, temp_file_path)) Ok(())
}) })
.collect(); .collect();
let download_info = download_results?; download_results?;
// Add all files to the transaction // Add all files to the transaction, referencing the downloaded payloads
for (file, temp_file_path) in download_info { for file in &payload_files {
let digest = &file.payload.as_ref().unwrap().primary_identifier.hash;
let temp_file_path = temp_dir_path.join(digest);
txn.add_file((*file).clone(), &temp_file_path)?; txn.add_file((*file).clone(), &temp_file_path)?;
} }