Convert trait methods to use &self instead of &mut self, introduce Mutex for interior mutability, optimize HTTP client creation, and implement parallel payload processing using Rayon.

This commit is contained in:
Till Wegmueller 2026-02-05 15:57:56 +01:00
parent 0de84b80c8
commit e236f30f6e
No known key found for this signature in database
8 changed files with 143 additions and 55 deletions

41
Cargo.lock generated
View file

@ -581,6 +581,25 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@ -1458,7 +1477,6 @@ checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
name = "libips"
version = "0.5.3"
dependencies = [
"base64 0.22.1",
"chrono",
"diff-struct",
"flate2",
@ -1469,6 +1487,7 @@ dependencies = [
"object",
"pest",
"pest_derive",
"rayon",
"regex",
"reqwest",
"resolvo",
@ -2418,6 +2437,26 @@ dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "rayon"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"

View file

@ -34,7 +34,6 @@ serde = { version = "1.0.207", features = ["derive"] }
serde_json = "1.0.124"
flate2 = "1.0.28"
lz4 = "1.24.0"
base64 = "0.22"
semver = { version = "1.0.20", features = ["serde"] }
diff-struct = "0.5.3"
chrono = "0.4.41"
@ -44,6 +43,7 @@ rusqlite = { version = "0.31", default-features = false }
rust-ini = "0.21"
reqwest = { version = "0.12", features = ["blocking", "json", "gzip", "deflate"] }
resolvo = "0.10"
rayon = "1.11"
[features]
default = ["bundled-sqlite"]

View file

@ -9,21 +9,23 @@ use crate::repository::{
FileBackend, NoopProgressReporter, ProgressInfo, ProgressReporter, ReadableRepository,
RepositoryError, Result, WritableRepository,
};
use rayon::prelude::*;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use tempfile::tempdir;
use tracing::{debug, info};
/// PackageReceiver handles downloading packages from a source repository
/// and storing them in a destination repository.
pub struct PackageReceiver<'a, S: ReadableRepository> {
source: &'a mut S,
source: &'a S,
dest: FileBackend,
progress: Option<&'a dyn ProgressReporter>,
}
impl<'a, S: ReadableRepository> PackageReceiver<'a, S> {
impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> {
/// Create a new PackageReceiver
pub fn new(source: &'a mut S, dest: FileBackend) -> Self {
pub fn new(source: &'a S, dest: FileBackend) -> Self {
Self {
source,
dest,
@ -215,28 +217,53 @@ impl<'a, S: ReadableRepository> PackageReceiver<'a, S> {
.collect();
let total_files = payload_files.len() as u64;
for (i, file) in payload_files.into_iter().enumerate() {
if let Some(payload) = &file.payload {
let files_done = (i + 1) as u64;
// Download all payloads in parallel
let files_done = Arc::new(Mutex::new(0u64));
let publisher_str = publisher.to_string();
let fmri_name = fmri.name.clone();
let temp_dir_path = temp_dir.path().to_path_buf();
let download_results: std::result::Result<Vec<_>, RepositoryError> = payload_files
.par_iter()
.map(|file| {
let payload = file.payload.as_ref().unwrap();
let digest = &payload.primary_identifier.hash;
let temp_file_path = temp_dir_path.join(digest);
progress.update(
&ProgressInfo::new(format!("Receiving payloads for {}", fmri.name))
.with_total(total_files)
.with_current(files_done)
.with_context(format!("Payload: {}", digest)),
);
let temp_file_path = temp_dir.path().join(digest);
debug!(
"Fetching payload {} to {}",
digest,
temp_file_path.display()
);
// Download the payload (now works with &self)
self.source
.fetch_payload(publisher, digest, &temp_file_path)?;
txn.add_file(file.clone(), &temp_file_path)?;
}
.fetch_payload(&publisher_str, digest, &temp_file_path)?;
// Update progress atomically
let current_count = {
let mut count = files_done.lock()
.map_err(|e| RepositoryError::Other(format!("Failed to lock progress counter: {}", e)))?;
*count += 1;
*count
};
progress.update(
&ProgressInfo::new(format!("Receiving payloads for {}", fmri_name))
.with_total(total_files)
.with_current(current_count)
.with_context(format!("Payload: {}", digest)),
);
Ok((file, temp_file_path))
})
.collect();
let download_info = download_results?;
// Add all files to the transaction
for (file, temp_file_path) in download_info {
txn.add_file((*file).clone(), &temp_file_path)?;
}
txn.update_manifest(manifest.clone());
@ -279,7 +306,7 @@ mod tests {
// Create dest repo
let dest_repo = FileBackend::create(dest_dir.path(), RepositoryVersion::V4)?;
let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo);
let mut receiver = PackageReceiver::new(&source_repo, dest_repo);
receiver.receive(Some("test"), &[Fmri::new("pkgA")], false)?;
// Verify dest repo has the package
@ -318,7 +345,7 @@ mod tests {
// Create dest repo
let dest_repo = FileBackend::create(dest_dir.path(), RepositoryVersion::V4)?;
let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo);
let mut receiver = PackageReceiver::new(&source_repo, dest_repo);
receiver.receive(Some("test"), &[Fmri::new("pkgA")], false)?;
// Verify dest repo has the package and the manifest is in IPS format

View file

@ -16,6 +16,7 @@ use std::fs::File;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info};
use walkdir::WalkDir;
@ -358,11 +359,11 @@ pub struct FileBackend {
pub path: PathBuf,
pub config: RepositoryConfig,
/// Catalog manager for handling catalog operations
/// Uses RefCell for interior mutability to allow mutation through immutable references
catalog_manager: Option<std::cell::RefCell<crate::repository::catalog::CatalogManager>>,
/// Uses Mutex for interior mutability to allow mutation through immutable references (thread-safe)
catalog_manager: Option<Mutex<crate::repository::catalog::CatalogManager>>,
/// Manager for obsoleted packages
obsoleted_manager:
Option<std::cell::RefCell<crate::repository::obsoleted::ObsoletedPackageManager>>,
Option<Mutex<crate::repository::obsoleted::ObsoletedPackageManager>>,
}
/// Transaction for publishing packages
@ -1467,7 +1468,7 @@ impl ReadableRepository for FileBackend {
Ok(package_contents)
}
fn fetch_payload(&mut self, publisher: &str, digest: &str, dest: &Path) -> Result<()> {
fn fetch_payload(&self, publisher: &str, digest: &str, dest: &Path) -> Result<()> {
// Parse digest; supports both raw hash and source:algorithm:hash
let parsed = match Digest::from_str(digest) {
Ok(d) => d,
@ -1549,7 +1550,7 @@ impl ReadableRepository for FileBackend {
}
fn fetch_manifest(
&mut self,
&self,
publisher: &str,
fmri: &crate::fmri::Fmri,
) -> Result<crate::actions::Manifest> {
@ -1596,7 +1597,7 @@ impl ReadableRepository for FileBackend {
)))
}
fn fetch_manifest_text(&mut self, publisher: &str, fmri: &Fmri) -> Result<String> {
fn fetch_manifest_text(&self, publisher: &str, fmri: &Fmri) -> Result<String> {
// Require a concrete version
let version = fmri.version();
if version.is_empty() {
@ -2387,8 +2388,9 @@ impl FileBackend {
&self.obsoleted_manager
{
obsoleted_manager
.borrow()
.is_obsoleted(publisher, &final_fmri)
.lock()
.map(|mgr| mgr.is_obsoleted(publisher, &final_fmri))
.unwrap_or(false)
} else {
false
};
@ -2853,40 +2855,46 @@ impl FileBackend {
/// Get or initialize the catalog manager
///
/// This method returns a mutable reference to the catalog manager.
/// It uses interior mutability with RefCell to allow mutation through an immutable reference.
/// It uses interior mutability with Mutex to allow mutation through an immutable reference.
///
/// The catalog manager is specific to the given publisher.
pub fn get_catalog_manager(
&mut self,
publisher: &str,
) -> Result<std::cell::RefMut<'_, crate::repository::catalog::CatalogManager>> {
) -> Result<std::sync::MutexGuard<'_, crate::repository::catalog::CatalogManager>> {
if self.catalog_manager.is_none() {
let publisher_dir = self.path.join("publisher");
let manager =
crate::repository::catalog::CatalogManager::new(&publisher_dir, publisher)?;
let refcell = std::cell::RefCell::new(manager);
self.catalog_manager = Some(refcell);
let mutex = Mutex::new(manager);
self.catalog_manager = Some(mutex);
}
// This is safe because we just checked that catalog_manager is Some
Ok(self.catalog_manager.as_ref().unwrap().borrow_mut())
self.catalog_manager
.as_ref()
.ok_or_else(|| RepositoryError::Other("Catalog manager not initialized".to_string()))?
.lock()
.map_err(|e| RepositoryError::Other(format!("Failed to lock catalog manager: {}", e)))
}
/// Get or initialize the obsoleted package manager
///
/// This method returns a mutable reference to the obsoleted package manager.
/// It uses interior mutability with RefCell to allow mutation through an immutable reference.
/// It uses interior mutability with Mutex to allow mutation through an immutable reference.
pub fn get_obsoleted_manager(
&mut self,
) -> Result<std::cell::RefMut<'_, crate::repository::obsoleted::ObsoletedPackageManager>> {
) -> Result<std::sync::MutexGuard<'_, crate::repository::obsoleted::ObsoletedPackageManager>> {
if self.obsoleted_manager.is_none() {
let manager = crate::repository::obsoleted::ObsoletedPackageManager::new(&self.path);
let refcell = std::cell::RefCell::new(manager);
self.obsoleted_manager = Some(refcell);
let mutex = Mutex::new(manager);
self.obsoleted_manager = Some(mutex);
}
// This is safe because we just checked that obsoleted_manager is Some
Ok(self.obsoleted_manager.as_ref().unwrap().borrow_mut())
self.obsoleted_manager
.as_ref()
.ok_or_else(|| RepositoryError::Other("Obsoleted manager not initialized".to_string()))?
.lock()
.map_err(|e| RepositoryError::Other(format!("Failed to lock obsoleted manager: {}", e)))
}
/// URL encode a string for use in a filename

View file

@ -334,19 +334,19 @@ pub trait ReadableRepository {
/// Fetch a content payload identified by digest into the destination path.
/// Implementations should download/copy the payload to a temporary path,
/// verify integrity, and atomically move into `dest`.
fn fetch_payload(&mut self, publisher: &str, digest: &str, dest: &Path) -> Result<()>;
fn fetch_payload(&self, publisher: &str, digest: &str, dest: &Path) -> Result<()>;
/// Fetch a package manifest by FMRI from the repository.
/// Implementations should retrieve and parse the manifest for the given
/// publisher and fully-qualified FMRI (name@version).
fn fetch_manifest(
&mut self,
&self,
publisher: &str,
fmri: &crate::fmri::Fmri,
) -> Result<crate::actions::Manifest>;
/// Fetch a package manifest as raw text by FMRI from the repository.
fn fetch_manifest_text(&mut self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result<String>;
fn fetch_manifest_text(&self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result<String>;
/// Search for packages in the repository
///

View file

@ -35,7 +35,7 @@ use std::fmt;
/// }
/// }
/// ```
pub trait ProgressReporter {
pub trait ProgressReporter: Send + Sync {
/// Called when an operation starts.
///
/// # Arguments

View file

@ -12,6 +12,7 @@ use tracing::{debug, info, warn};
use reqwest::blocking::Client;
use serde_json::Value;
use std::time::Duration;
use super::catalog::CatalogManager;
use super::{
@ -81,7 +82,7 @@ impl WritableRepository for RestBackend {
uri: uri_str,
config,
local_cache_path: None,
client: Client::new(),
client: Self::create_optimized_client(),
catalog_managers: HashMap::new(),
};
@ -263,7 +264,7 @@ impl WritableRepository for RestBackend {
uri: self.uri.clone(),
config: self.config.clone(),
local_cache_path: self.local_cache_path.clone(),
client: Client::new(),
client: Self::create_optimized_client(),
catalog_managers: HashMap::new(),
};
@ -334,7 +335,7 @@ impl ReadableRepository for RestBackend {
.to_string();
// Create an HTTP client
let client = Client::new();
let client = Self::create_optimized_client();
// Fetch the repository configuration from the remote server
// We'll try to get the publisher information using the publisher endpoint
@ -602,7 +603,7 @@ impl ReadableRepository for RestBackend {
Ok(package_contents)
}
fn fetch_payload(&mut self, publisher: &str, digest: &str, dest: &Path) -> Result<()> {
fn fetch_payload(&self, publisher: &str, digest: &str, dest: &Path) -> Result<()> {
// Determine hash and algorithm from the provided digest string
let mut hash = digest.to_string();
let mut algo: Option<crate::digest::DigestAlgorithm> = None;
@ -678,7 +679,7 @@ impl ReadableRepository for RestBackend {
}
fn fetch_manifest(
&mut self,
&self,
publisher: &str,
fmri: &crate::fmri::Fmri,
) -> Result<crate::actions::Manifest> {
@ -695,7 +696,7 @@ impl ReadableRepository for RestBackend {
todo!()
}
fn fetch_manifest_text(&mut self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result<String> {
fn fetch_manifest_text(&self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result<String> {
// Require versioned FMRI
let version = fmri.version();
if version.is_empty() {
@ -760,6 +761,19 @@ impl ReadableRepository for RestBackend {
}
impl RestBackend {
/// Create an optimized HTTP client with connection pooling and timeouts
fn create_optimized_client() -> Client {
Client::builder()
.pool_idle_timeout(Some(Duration::from_secs(90)))
.pool_max_idle_per_host(8)
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(300))
.tcp_keepalive(Some(Duration::from_secs(60)))
.http2_prior_knowledge()
.build()
.unwrap_or_else(|_| Client::new())
}
/// Sets the local path where catalog files will be cached.
///
/// This method creates the directory if it doesn't exist. The local cache path

View file

@ -69,17 +69,17 @@ fn main() -> Result<()> {
// Determine if source is a URL or a path and receive packages
if cli.source.starts_with("http://") || cli.source.starts_with("https://") {
let mut source_repo = RestBackend::open(&cli.source).into_diagnostic()?;
let source_repo = RestBackend::open(&cli.source).into_diagnostic()?;
let dest_repo = FileBackend::open(&cli.dest).into_diagnostic()?;
let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo);
let mut receiver = PackageReceiver::new(&source_repo, dest_repo);
receiver = receiver.with_progress(&progress);
receiver
.receive(cli.publisher.as_deref(), &fmris, cli.recursive)
.into_diagnostic()?;
} else {
let mut source_repo = FileBackend::open(&cli.source).into_diagnostic()?;
let source_repo = FileBackend::open(&cli.source).into_diagnostic()?;
let dest_repo = FileBackend::open(&cli.dest).into_diagnostic()?;
let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo);
let mut receiver = PackageReceiver::new(&source_repo, dest_repo);
receiver = receiver.with_progress(&progress);
receiver
.receive(cli.publisher.as_deref(), &fmris, cli.recursive)