diff --git a/libips/src/recv.rs b/libips/src/recv.rs index f71fb51..a0b1e7c 100644 --- a/libips/src/recv.rs +++ b/libips/src/recv.rs @@ -215,20 +215,25 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { .iter() .filter(|f| f.payload.is_some()) .collect(); - let total_files = payload_files.len() as u64; + // Deduplicate payloads by digest — multiple file actions may reference the + // same payload, and parallel downloads to the same temp path cause races + let mut unique_digests: Vec = payload_files + .iter() + .map(|f| f.payload.as_ref().unwrap().primary_identifier.hash.clone()) + .collect(); + unique_digests.sort(); + unique_digests.dedup(); - // Download all payloads in parallel + // Download unique payloads in parallel let files_done = Arc::new(Mutex::new(0u64)); let publisher_str = publisher.to_string(); let fmri_name = fmri.name.clone(); let temp_dir_path = temp_dir.path().to_path_buf(); + let unique_total = unique_digests.len() as u64; - let download_results: std::result::Result, RepositoryError> = payload_files + let download_results: std::result::Result, RepositoryError> = unique_digests .par_iter() - .map(|file| { - let payload = file.payload.as_ref().unwrap(); - - let digest = &payload.primary_identifier.hash; + .map(|digest| { let temp_file_path = temp_dir_path.join(digest); debug!( @@ -237,12 +242,9 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { temp_file_path.display() ); - // Fetch payload using the primary hash from the manifest. - // This is the hash the source repository knows the file by. self.source .fetch_payload(&publisher_str, digest, &temp_file_path)?; - // Update progress atomically let current_count = { let mut count = files_done.lock() .map_err(|e| RepositoryError::Other(format!("Failed to lock progress counter: {}", e)))?; @@ -252,19 +254,21 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { progress.update( &ProgressInfo::new(format!("Receiving payloads for {}", fmri_name)) - .with_total(total_files) + .with_total(unique_total) .with_current(current_count) .with_context(format!("Payload: {}", digest)), ); - Ok((file, temp_file_path)) + Ok(()) }) .collect(); - let download_info = download_results?; + download_results?; - // Add all files to the transaction - for (file, temp_file_path) in download_info { + // Add all files to the transaction, referencing the downloaded payloads + for file in &payload_files { + let digest = &file.payload.as_ref().unwrap().primary_identifier.hash; + let temp_file_path = temp_dir_path.join(digest); txn.add_file((*file).clone(), &temp_file_path)?; }