fix: Resolve pkg6recv multithreaded download race conditions and file loss

- Use tempdir_in(repo_path) instead of system /tmp to keep temp files on
  the same filesystem, preventing cross-device rename failures
- Replace fs::copy with fs::rename for atomic file moves in transaction
  commit and signature payload storage
- Fix Digest construction to use correct SHA256 algorithm instead of
  defaulting to SHA1 for bare hex hashes
- Try compressed hash (additional_identifiers) as fallback when primary
  hash lookup fails during payload fetch
- Remove duplicate publisher resolution block in Transaction::commit()
- Add integration test for multi-file parallel package receive

Closes: https://codeberg.org/Toasterson/ips/issues/21
This commit is contained in:
Till Wegmueller 2026-03-15 19:32:24 +01:00
parent ec9c55daf3
commit 750df8dcc7
2 changed files with 181 additions and 76 deletions

View file

@ -12,7 +12,7 @@ use crate::repository::{
use rayon::prelude::*; use rayon::prelude::*;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tempfile::tempdir; use tempfile::tempdir_in;
use tracing::{debug, info}; use tracing::{debug, info};
/// PackageReceiver handles downloading packages from a source repository /// PackageReceiver handles downloading packages from a source repository
@ -208,7 +208,7 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> {
txn.set_publisher(publisher); txn.set_publisher(publisher);
txn.set_legacy_manifest(manifest_text); txn.set_legacy_manifest(manifest_text);
let temp_dir = tempdir().map_err(RepositoryError::IoError)?; let temp_dir = tempdir_in(&self.dest.path).map_err(RepositoryError::IoError)?;
let payload_files: Vec<_> = manifest let payload_files: Vec<_> = manifest
.files .files
@ -227,6 +227,16 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> {
.par_iter() .par_iter()
.map(|file| { .map(|file| {
let payload = file.payload.as_ref().unwrap(); let payload = file.payload.as_ref().unwrap();
// Collect all candidate digests: primary first, then additional (compressed hash)
// Use Display format (source:algorithm:hash) so fetch_payload parses correctly
let mut digests = vec![payload.primary_identifier.to_string()];
for additional in &payload.additional_identifiers {
if !additional.hash.is_empty() {
digests.push(additional.to_string());
}
}
let digest = &payload.primary_identifier.hash; let digest = &payload.primary_identifier.hash;
let temp_file_path = temp_dir_path.join(digest); let temp_file_path = temp_dir_path.join(digest);
@ -236,9 +246,23 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> {
temp_file_path.display() temp_file_path.display()
); );
// Download the payload (now works with &self) // Try each digest until one succeeds (primary hash may differ from storage hash)
self.source let mut last_err = None;
.fetch_payload(&publisher_str, digest, &temp_file_path)?; for d in &digests {
match self.source.fetch_payload(&publisher_str, d, &temp_file_path) {
Ok(()) => {
last_err = None;
break;
}
Err(e) => {
debug!("Failed to fetch payload with digest {}: {}", d, e);
last_err = Some(e);
}
}
}
if let Some(e) = last_err {
return Err(e);
}
// Update progress atomically // Update progress atomically
let current_count = { let current_count = {
@ -315,7 +339,13 @@ impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> {
digest, digest,
dest_path.display() dest_path.display()
); );
std::fs::copy(&temp_file_path, &dest_path).map_err(RepositoryError::IoError)?; std::fs::rename(&temp_file_path, &dest_path).map_err(|e| {
RepositoryError::FileRenameError {
from: temp_file_path.clone(),
to: dest_path,
source: e,
}
})?;
} }
} }
@ -493,4 +523,106 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn test_receive_multi_file_package() -> Result<()> {
use crate::actions::File as FileAction;
let source_dir = tempdir().map_err(RepositoryError::IoError)?;
let dest_dir = tempdir().map_err(RepositoryError::IoError)?;
// Create source repo
let mut source_repo = FileBackend::create(source_dir.path(), RepositoryVersion::V4)?;
source_repo.add_publisher("test")?;
// Create a prototype directory with multiple files
let proto_dir = source_dir.path().join("proto");
std::fs::create_dir_all(&proto_dir).map_err(RepositoryError::IoError)?;
let file_contents: Vec<(&str, &[u8])> = vec![
("etc/config.txt", b"config-content-1"),
("usr/bin/tool", b"binary-content-2"),
("usr/lib/libfoo.so", b"library-content-3"),
("var/data/db.dat", b"data-content-4"),
("opt/extra/readme", b"readme-content-5"),
];
let mut file_actions = Vec::new();
for (path, content) in &file_contents {
let full_path = proto_dir.join(path);
std::fs::create_dir_all(full_path.parent().unwrap())
.map_err(RepositoryError::IoError)?;
std::fs::write(&full_path, content).map_err(RepositoryError::IoError)?;
let mut fa = FileAction::default();
fa.path = path.to_string();
fa.payload = Some(crate::payload::Payload::compute_payload(&full_path).unwrap());
file_actions.push((fa, full_path));
}
// Create the package with files via transaction
let fmri = Fmri::parse("pkg://test/multifile@1.0").unwrap();
let mut manifest = Manifest::new();
manifest.attributes.push(Attr {
key: "pkg.fmri".to_string(),
values: vec![fmri.to_string()],
..Default::default()
});
let mut txn = source_repo.begin_transaction()?;
txn.set_publisher("test");
for (fa, path) in file_actions {
txn.add_file(fa, &path)?;
}
txn.update_manifest(manifest);
txn.commit()?;
source_repo.rebuild(Some("test"), false, false)?;
// Receive into destination repo
let dest_repo = FileBackend::create(dest_dir.path(), RepositoryVersion::V4)?;
let mut receiver = PackageReceiver::new(&source_repo, dest_repo);
receiver.receive(Some("test"), &[Fmri::new("multifile")], false)?;
// Verify destination has the package
let dest_repo_check = FileBackend::open(dest_dir.path())?;
let pkgs = dest_repo_check.list_packages(Some("test"), Some("multifile"))?;
assert_eq!(pkgs.len(), 1, "Package should exist in destination");
// Verify the manifest contains all files
let dest_manifest = dest_repo_check.fetch_manifest(
"test",
&Fmri::parse("pkg://test/multifile@1.0").unwrap(),
)?;
assert_eq!(
dest_manifest.files.len(),
file_contents.len(),
"All {} files should be in the manifest",
file_contents.len()
);
// Verify payload files exist in the destination repo's file store.
// The dest re-compresses files so hashes differ from source. Count files in file store.
let file_store = dest_dir.path().join("publisher/test/file");
let mut payload_count = 0;
if file_store.exists() {
for bucket in std::fs::read_dir(&file_store).unwrap() {
let bucket = bucket.unwrap();
if bucket.path().is_dir() {
for entry in std::fs::read_dir(bucket.path()).unwrap() {
if entry.unwrap().path().is_file() {
payload_count += 1;
}
}
}
}
}
assert_eq!(
payload_count,
file_contents.len(),
"All {} payload files should exist in destination file store",
file_contents.len()
);
Ok(())
}
} }

View file

@ -20,7 +20,7 @@ use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
use crate::actions::{File as FileAction, Manifest}; use crate::actions::{File as FileAction, Manifest};
use crate::digest::Digest; use crate::digest::{Digest, DigestAlgorithm, DigestSource};
use crate::fmri::Fmri; use crate::fmri::Fmri;
use crate::payload::{Payload, PayloadCompressionAlgorithm}; use crate::payload::{Payload, PayloadCompressionAlgorithm};
@ -280,15 +280,26 @@ impl Transaction {
self.files self.files
.push((temp_file_path.clone(), compressed_hash.clone())); .push((temp_file_path.clone(), compressed_hash.clone()));
// Set the primary identifier (uncompressed hash) // Set the primary identifier (uncompressed SHA256 hash)
payload.primary_identifier = Digest::from_str(&hash)?; payload.primary_identifier = Digest {
hash: hash.clone(),
algorithm: DigestAlgorithm::SHA256,
source: DigestSource::UncompressedFile,
};
// Set the compression algorithm // Set the compression algorithm
payload.compression_algorithm = compression_algorithm; payload.compression_algorithm = compression_algorithm;
// Add the compressed hash as an additional identifier // Add the compressed hash as an additional identifier
let compressed_digest = Digest::from_str(&compressed_hash)?; let compressed_source = match payload.compression_algorithm {
payload.additional_identifiers.push(compressed_digest); PayloadCompressionAlgorithm::Gzip => DigestSource::GzipCompressed,
PayloadCompressionAlgorithm::LZ4 => DigestSource::GzipCompressed, // LZ4 shares file storage pattern
};
payload.additional_identifiers.push(Digest {
hash: compressed_hash.clone(),
algorithm: DigestAlgorithm::SHA256,
source: compressed_source,
});
// Update the FileAction with the payload // Update the FileAction with the payload
updated_file_action.payload = Some(payload); updated_file_action.payload = Some(payload);
@ -365,30 +376,6 @@ impl Transaction {
} }
}; };
// Copy files to their final location
for (source_path, hash) in self.files {
// Create the destination path using the helper function with publisher
let dest_path =
FileBackend::construct_file_path_with_publisher(&self.repo, &publisher, &hash);
// Create parent directories if they don't exist
if let Some(parent) = dest_path.parent() {
fs::create_dir_all(parent).map_err(|e| RepositoryError::DirectoryCreateError {
path: parent.to_path_buf(),
source: e,
})?;
}
// Copy the file if it doesn't already exist
if !dest_path.exists() {
fs::copy(&source_path, &dest_path).map_err(|e| RepositoryError::FileCopyError {
from: source_path.clone(),
to: dest_path,
source: e,
})?;
}
}
// Extract package information from manifest // Extract package information from manifest
let mut package_stem = String::from("unknown"); let mut package_stem = String::from("unknown");
let mut package_version = String::from(""); let mut package_version = String::from("");
@ -404,40 +391,26 @@ impl Transaction {
} }
} }
// Determine the publisher to use // Move files to their final location (atomic rename, same filesystem)
let publisher = match &self.publisher { for (source_path, hash) in self.files {
Some(pub_name) => { let dest_path =
debug!("Using specified publisher: {}", pub_name); FileBackend::construct_file_path_with_publisher(&self.repo, &publisher, &hash);
pub_name.clone()
if let Some(parent) = dest_path.parent() {
fs::create_dir_all(parent).map_err(|e| RepositoryError::DirectoryCreateError {
path: parent.to_path_buf(),
source: e,
})?;
} }
None => {
debug!("No publisher specified, trying to use default publisher"); if !dest_path.exists() {
// If no publisher is specified, use the default publisher from the repository config fs::rename(&source_path, &dest_path).map_err(|e| RepositoryError::FileRenameError {
let config_path = self.repo.join(REPOSITORY_CONFIG_FILENAME); from: source_path.clone(),
if config_path.exists() { to: dest_path,
let config_content = fs::read_to_string(&config_path)?; source: e,
let config: RepositoryConfig = serde_json::from_str(&config_content)?; })?;
match config.default_publisher {
Some(default_pub) => {
debug!("Using default publisher: {}", default_pub);
default_pub
}
None => {
debug!("No default publisher set in repository");
return Err(RepositoryError::Other(
"No publisher specified and no default publisher set in repository"
.to_string(),
));
} }
} }
} else {
debug!("Repository configuration not found");
return Err(RepositoryError::Other(
"No publisher specified and repository configuration not found".to_string(),
));
}
}
};
// Create the package directory if it doesn't exist // Create the package directory if it doesn't exist
let pkg_dir = FileBackend::construct_package_dir(&self.repo, &publisher, &package_stem); let pkg_dir = FileBackend::construct_package_dir(&self.repo, &publisher, &package_stem);
@ -474,30 +447,30 @@ impl Transaction {
})?; })?;
} }
// Copy to pkg directory // Move manifests to pkg directory (atomic rename, same filesystem)
// 1. Copy JSON manifest // 1. JSON manifest
let pkg_manifest_json_path = PathBuf::from(format!("{}.json", pkg_manifest_path.display())); let pkg_manifest_json_path = PathBuf::from(format!("{}.json", pkg_manifest_path.display()));
debug!( debug!(
"Copying JSON manifest from {} to {}", "Moving JSON manifest from {} to {}",
manifest_json_path.display(), manifest_json_path.display(),
pkg_manifest_json_path.display() pkg_manifest_json_path.display()
); );
fs::copy(&manifest_json_path, &pkg_manifest_json_path).map_err(|e| { fs::rename(&manifest_json_path, &pkg_manifest_json_path).map_err(|e| {
RepositoryError::FileCopyError { RepositoryError::FileRenameError {
from: manifest_json_path, from: manifest_json_path,
to: pkg_manifest_json_path, to: pkg_manifest_json_path,
source: e, source: e,
} }
})?; })?;
// 2. Copy legacy manifest // 2. Legacy manifest
debug!( debug!(
"Copying legacy manifest from {} to {}", "Moving legacy manifest from {} to {}",
manifest_legacy_path.display(), manifest_legacy_path.display(),
pkg_manifest_path.display() pkg_manifest_path.display()
); );
fs::copy(&manifest_legacy_path, &pkg_manifest_path).map_err(|e| { fs::rename(&manifest_legacy_path, &pkg_manifest_path).map_err(|e| {
RepositoryError::FileCopyError { RepositoryError::FileRenameError {
from: manifest_legacy_path, from: manifest_legacy_path,
to: pkg_manifest_path, to: pkg_manifest_path,
source: e, source: e,