diff --git a/Cargo.lock b/Cargo.lock index edbb8bd..771cde1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,6 +581,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1458,7 +1477,6 @@ checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" name = "libips" version = "0.5.3" dependencies = [ - "base64 0.22.1", "chrono", "diff-struct", "flate2", @@ -1469,6 +1487,7 @@ dependencies = [ "object", "pest", "pest_derive", + "rayon", "regex", "reqwest", "resolvo", @@ -2418,6 +2437,26 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.18" diff --git a/libips/Cargo.toml b/libips/Cargo.toml index 8d57fd0..cb7051e 100644 --- a/libips/Cargo.toml +++ b/libips/Cargo.toml @@ -34,7 +34,6 @@ serde = { version = "1.0.207", features = ["derive"] } serde_json = "1.0.124" flate2 = "1.0.28" lz4 = "1.24.0" -base64 = "0.22" semver = { version = "1.0.20", features = ["serde"] } diff-struct = "0.5.3" chrono = "0.4.41" @@ -44,6 +43,7 @@ rusqlite = { version = "0.31", default-features = false } rust-ini = "0.21" reqwest = { version = "0.12", features = ["blocking", "json", "gzip", "deflate"] } resolvo = "0.10" +rayon = "1.11" [features] default = ["bundled-sqlite"] diff --git a/libips/src/recv.rs b/libips/src/recv.rs index 25ac1b8..f1da74e 100644 --- a/libips/src/recv.rs +++ b/libips/src/recv.rs @@ -9,21 +9,23 @@ use crate::repository::{ FileBackend, NoopProgressReporter, ProgressInfo, ProgressReporter, ReadableRepository, RepositoryError, Result, WritableRepository, }; +use rayon::prelude::*; use std::collections::HashSet; +use std::sync::{Arc, Mutex}; use tempfile::tempdir; use tracing::{debug, info}; /// PackageReceiver handles downloading packages from a source repository /// and storing them in a destination repository. pub struct PackageReceiver<'a, S: ReadableRepository> { - source: &'a mut S, + source: &'a S, dest: FileBackend, progress: Option<&'a dyn ProgressReporter>, } -impl<'a, S: ReadableRepository> PackageReceiver<'a, S> { +impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> { /// Create a new PackageReceiver - pub fn new(source: &'a mut S, dest: FileBackend) -> Self { + pub fn new(source: &'a S, dest: FileBackend) -> Self { Self { source, dest, @@ -215,28 +217,53 @@ impl<'a, S: ReadableRepository> PackageReceiver<'a, S> { .collect(); let total_files = payload_files.len() as u64; - for (i, file) in payload_files.into_iter().enumerate() { - if let Some(payload) = &file.payload { - let files_done = (i + 1) as u64; + // Download all payloads in parallel + let files_done = Arc::new(Mutex::new(0u64)); + let publisher_str = publisher.to_string(); + let fmri_name = fmri.name.clone(); + let temp_dir_path = temp_dir.path().to_path_buf(); + + let download_results: std::result::Result, RepositoryError> = payload_files + .par_iter() + .map(|file| { + let payload = file.payload.as_ref().unwrap(); let digest = &payload.primary_identifier.hash; + let temp_file_path = temp_dir_path.join(digest); - progress.update( - &ProgressInfo::new(format!("Receiving payloads for {}", fmri.name)) - .with_total(total_files) - .with_current(files_done) - .with_context(format!("Payload: {}", digest)), - ); - - let temp_file_path = temp_dir.path().join(digest); debug!( "Fetching payload {} to {}", digest, temp_file_path.display() ); + + // Download the payload (now works with &self) self.source - .fetch_payload(publisher, digest, &temp_file_path)?; - txn.add_file(file.clone(), &temp_file_path)?; - } + .fetch_payload(&publisher_str, digest, &temp_file_path)?; + + // Update progress atomically + let current_count = { + let mut count = files_done.lock() + .map_err(|e| RepositoryError::Other(format!("Failed to lock progress counter: {}", e)))?; + *count += 1; + *count + }; + + progress.update( + &ProgressInfo::new(format!("Receiving payloads for {}", fmri_name)) + .with_total(total_files) + .with_current(current_count) + .with_context(format!("Payload: {}", digest)), + ); + + Ok((file, temp_file_path)) + }) + .collect(); + + let download_info = download_results?; + + // Add all files to the transaction + for (file, temp_file_path) in download_info { + txn.add_file((*file).clone(), &temp_file_path)?; } txn.update_manifest(manifest.clone()); @@ -279,7 +306,7 @@ mod tests { // Create dest repo let dest_repo = FileBackend::create(dest_dir.path(), RepositoryVersion::V4)?; - let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo); + let mut receiver = PackageReceiver::new(&source_repo, dest_repo); receiver.receive(Some("test"), &[Fmri::new("pkgA")], false)?; // Verify dest repo has the package @@ -318,7 +345,7 @@ mod tests { // Create dest repo let dest_repo = FileBackend::create(dest_dir.path(), RepositoryVersion::V4)?; - let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo); + let mut receiver = PackageReceiver::new(&source_repo, dest_repo); receiver.receive(Some("test"), &[Fmri::new("pkgA")], false)?; // Verify dest repo has the package and the manifest is in IPS format diff --git a/libips/src/repository/file_backend.rs b/libips/src/repository/file_backend.rs index 5dc186d..5ab40e7 100644 --- a/libips/src/repository/file_backend.rs +++ b/libips/src/repository/file_backend.rs @@ -16,6 +16,7 @@ use std::fs::File; use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{debug, error, info}; use walkdir::WalkDir; @@ -358,11 +359,11 @@ pub struct FileBackend { pub path: PathBuf, pub config: RepositoryConfig, /// Catalog manager for handling catalog operations - /// Uses RefCell for interior mutability to allow mutation through immutable references - catalog_manager: Option>, + /// Uses Mutex for interior mutability to allow mutation through immutable references (thread-safe) + catalog_manager: Option>, /// Manager for obsoleted packages obsoleted_manager: - Option>, + Option>, } /// Transaction for publishing packages @@ -1467,7 +1468,7 @@ impl ReadableRepository for FileBackend { Ok(package_contents) } - fn fetch_payload(&mut self, publisher: &str, digest: &str, dest: &Path) -> Result<()> { + fn fetch_payload(&self, publisher: &str, digest: &str, dest: &Path) -> Result<()> { // Parse digest; supports both raw hash and source:algorithm:hash let parsed = match Digest::from_str(digest) { Ok(d) => d, @@ -1549,7 +1550,7 @@ impl ReadableRepository for FileBackend { } fn fetch_manifest( - &mut self, + &self, publisher: &str, fmri: &crate::fmri::Fmri, ) -> Result { @@ -1596,7 +1597,7 @@ impl ReadableRepository for FileBackend { ))) } - fn fetch_manifest_text(&mut self, publisher: &str, fmri: &Fmri) -> Result { + fn fetch_manifest_text(&self, publisher: &str, fmri: &Fmri) -> Result { // Require a concrete version let version = fmri.version(); if version.is_empty() { @@ -2387,8 +2388,9 @@ impl FileBackend { &self.obsoleted_manager { obsoleted_manager - .borrow() - .is_obsoleted(publisher, &final_fmri) + .lock() + .map(|mgr| mgr.is_obsoleted(publisher, &final_fmri)) + .unwrap_or(false) } else { false }; @@ -2853,40 +2855,46 @@ impl FileBackend { /// Get or initialize the catalog manager /// /// This method returns a mutable reference to the catalog manager. - /// It uses interior mutability with RefCell to allow mutation through an immutable reference. + /// It uses interior mutability with Mutex to allow mutation through an immutable reference. /// /// The catalog manager is specific to the given publisher. pub fn get_catalog_manager( &mut self, publisher: &str, - ) -> Result> { + ) -> Result> { if self.catalog_manager.is_none() { let publisher_dir = self.path.join("publisher"); let manager = crate::repository::catalog::CatalogManager::new(&publisher_dir, publisher)?; - let refcell = std::cell::RefCell::new(manager); - self.catalog_manager = Some(refcell); + let mutex = Mutex::new(manager); + self.catalog_manager = Some(mutex); } - // This is safe because we just checked that catalog_manager is Some - Ok(self.catalog_manager.as_ref().unwrap().borrow_mut()) + self.catalog_manager + .as_ref() + .ok_or_else(|| RepositoryError::Other("Catalog manager not initialized".to_string()))? + .lock() + .map_err(|e| RepositoryError::Other(format!("Failed to lock catalog manager: {}", e))) } /// Get or initialize the obsoleted package manager /// /// This method returns a mutable reference to the obsoleted package manager. - /// It uses interior mutability with RefCell to allow mutation through an immutable reference. + /// It uses interior mutability with Mutex to allow mutation through an immutable reference. pub fn get_obsoleted_manager( &mut self, - ) -> Result> { + ) -> Result> { if self.obsoleted_manager.is_none() { let manager = crate::repository::obsoleted::ObsoletedPackageManager::new(&self.path); - let refcell = std::cell::RefCell::new(manager); - self.obsoleted_manager = Some(refcell); + let mutex = Mutex::new(manager); + self.obsoleted_manager = Some(mutex); } - // This is safe because we just checked that obsoleted_manager is Some - Ok(self.obsoleted_manager.as_ref().unwrap().borrow_mut()) + self.obsoleted_manager + .as_ref() + .ok_or_else(|| RepositoryError::Other("Obsoleted manager not initialized".to_string()))? + .lock() + .map_err(|e| RepositoryError::Other(format!("Failed to lock obsoleted manager: {}", e))) } /// URL encode a string for use in a filename diff --git a/libips/src/repository/mod.rs b/libips/src/repository/mod.rs index 7ce9657..40c67a9 100644 --- a/libips/src/repository/mod.rs +++ b/libips/src/repository/mod.rs @@ -334,19 +334,19 @@ pub trait ReadableRepository { /// Fetch a content payload identified by digest into the destination path. /// Implementations should download/copy the payload to a temporary path, /// verify integrity, and atomically move into `dest`. - fn fetch_payload(&mut self, publisher: &str, digest: &str, dest: &Path) -> Result<()>; + fn fetch_payload(&self, publisher: &str, digest: &str, dest: &Path) -> Result<()>; /// Fetch a package manifest by FMRI from the repository. /// Implementations should retrieve and parse the manifest for the given /// publisher and fully-qualified FMRI (name@version). fn fetch_manifest( - &mut self, + &self, publisher: &str, fmri: &crate::fmri::Fmri, ) -> Result; /// Fetch a package manifest as raw text by FMRI from the repository. - fn fetch_manifest_text(&mut self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result; + fn fetch_manifest_text(&self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result; /// Search for packages in the repository /// diff --git a/libips/src/repository/progress.rs b/libips/src/repository/progress.rs index 579cf3d..65e65de 100644 --- a/libips/src/repository/progress.rs +++ b/libips/src/repository/progress.rs @@ -35,7 +35,7 @@ use std::fmt; /// } /// } /// ``` -pub trait ProgressReporter { +pub trait ProgressReporter: Send + Sync { /// Called when an operation starts. /// /// # Arguments diff --git a/libips/src/repository/rest_backend.rs b/libips/src/repository/rest_backend.rs index 1e9bada..02f283b 100644 --- a/libips/src/repository/rest_backend.rs +++ b/libips/src/repository/rest_backend.rs @@ -12,6 +12,7 @@ use tracing::{debug, info, warn}; use reqwest::blocking::Client; use serde_json::Value; +use std::time::Duration; use super::catalog::CatalogManager; use super::{ @@ -81,7 +82,7 @@ impl WritableRepository for RestBackend { uri: uri_str, config, local_cache_path: None, - client: Client::new(), + client: Self::create_optimized_client(), catalog_managers: HashMap::new(), }; @@ -263,7 +264,7 @@ impl WritableRepository for RestBackend { uri: self.uri.clone(), config: self.config.clone(), local_cache_path: self.local_cache_path.clone(), - client: Client::new(), + client: Self::create_optimized_client(), catalog_managers: HashMap::new(), }; @@ -334,7 +335,7 @@ impl ReadableRepository for RestBackend { .to_string(); // Create an HTTP client - let client = Client::new(); + let client = Self::create_optimized_client(); // Fetch the repository configuration from the remote server // We'll try to get the publisher information using the publisher endpoint @@ -602,7 +603,7 @@ impl ReadableRepository for RestBackend { Ok(package_contents) } - fn fetch_payload(&mut self, publisher: &str, digest: &str, dest: &Path) -> Result<()> { + fn fetch_payload(&self, publisher: &str, digest: &str, dest: &Path) -> Result<()> { // Determine hash and algorithm from the provided digest string let mut hash = digest.to_string(); let mut algo: Option = None; @@ -678,7 +679,7 @@ impl ReadableRepository for RestBackend { } fn fetch_manifest( - &mut self, + &self, publisher: &str, fmri: &crate::fmri::Fmri, ) -> Result { @@ -695,7 +696,7 @@ impl ReadableRepository for RestBackend { todo!() } - fn fetch_manifest_text(&mut self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result { + fn fetch_manifest_text(&self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result { // Require versioned FMRI let version = fmri.version(); if version.is_empty() { @@ -760,6 +761,19 @@ impl ReadableRepository for RestBackend { } impl RestBackend { + /// Create an optimized HTTP client with connection pooling and timeouts + fn create_optimized_client() -> Client { + Client::builder() + .pool_idle_timeout(Some(Duration::from_secs(90))) + .pool_max_idle_per_host(8) + .connect_timeout(Duration::from_secs(30)) + .timeout(Duration::from_secs(300)) + .tcp_keepalive(Some(Duration::from_secs(60))) + .http2_prior_knowledge() + .build() + .unwrap_or_else(|_| Client::new()) + } + /// Sets the local path where catalog files will be cached. /// /// This method creates the directory if it doesn't exist. The local cache path diff --git a/pkg6recv/src/main.rs b/pkg6recv/src/main.rs index daa9c02..1411d25 100644 --- a/pkg6recv/src/main.rs +++ b/pkg6recv/src/main.rs @@ -69,17 +69,17 @@ fn main() -> Result<()> { // Determine if source is a URL or a path and receive packages if cli.source.starts_with("http://") || cli.source.starts_with("https://") { - let mut source_repo = RestBackend::open(&cli.source).into_diagnostic()?; + let source_repo = RestBackend::open(&cli.source).into_diagnostic()?; let dest_repo = FileBackend::open(&cli.dest).into_diagnostic()?; - let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo); + let mut receiver = PackageReceiver::new(&source_repo, dest_repo); receiver = receiver.with_progress(&progress); receiver .receive(cli.publisher.as_deref(), &fmris, cli.recursive) .into_diagnostic()?; } else { - let mut source_repo = FileBackend::open(&cli.source).into_diagnostic()?; + let source_repo = FileBackend::open(&cli.source).into_diagnostic()?; let dest_repo = FileBackend::open(&cli.dest).into_diagnostic()?; - let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo); + let mut receiver = PackageReceiver::new(&source_repo, dest_repo); receiver = receiver.with_progress(&progress); receiver .receive(cli.publisher.as_deref(), &fmris, cli.recursive)