From c2b5461f25ac1ecf98fe36d3b49a2219c19f6601 Mon Sep 17 00:00:00 2001 From: Truman Kilen Date: Sat, 18 Jan 2025 11:20:45 -0600 Subject: [PATCH] Add support for parallel file compression --- Cargo.lock | 60 +++++++++++++ repak/Cargo.toml | 2 + repak/src/data.rs | 190 ++++++++++++++++++++++++++++++++++++++++++ repak/src/entry.rs | 139 ++++-------------------------- repak/src/footer.rs | 11 ++- repak/src/lib.rs | 1 + repak/src/pak.rs | 101 ++++++++++++++++++++-- repak/tests/test.rs | 10 ++- repak_cli/src/main.rs | 21 +++-- 9 files changed, 389 insertions(+), 146 deletions(-) create mode 100644 repak/src/data.rs diff --git a/Cargo.lock b/Cargo.lock index c246f27..f6995a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,6 +232,28 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -251,6 +273,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -389,6 +420,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "hex" version = "0.4.3" @@ -646,6 +683,16 @@ dependencies = [ "adler2", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -669,6 +716,17 @@ dependencies = [ "ureq", ] +[[package]] +name = "pariter" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "324a62b9e7b5f270c0acc92a2040f8028bb643f959f9c068f11a7864f327e3d9" +dependencies = [ + "crossbeam", + "crossbeam-channel", + "num_cpus", +] + [[package]] name = "paste" version = "1.0.15" @@ -784,7 +842,9 @@ dependencies = [ "base64", "byteorder", "flate2", + "hex", "oodle_loader", + "pariter", "paste", "sha1", "strum", diff --git a/repak/Cargo.toml b/repak/Cargo.toml index c9e412f..07fea7e 100644 --- a/repak/Cargo.toml +++ b/repak/Cargo.toml @@ -25,6 +25,8 @@ oodle_loader = { path = "../oodle_loader", optional = true} thiserror = "2.0" sha1 = { workspace = true } strum = { workspace = true } +pariter = "0.5.1" +hex.workspace = true [dev-dependencies] base64 = { workspace = true } diff --git a/repak/src/data.rs b/repak/src/data.rs new file mode 100644 index 0000000..d6a1cca --- /dev/null +++ b/repak/src/data.rs @@ -0,0 +1,190 @@ +use crate::{ + entry::{Block, Entry}, + Compression, Error, Hash, Version, VersionMajor, +}; + +type Result = std::result::Result; + +pub(crate) struct PartialEntry { + compression: Option, + compressed_size: u64, + uncompressed_size: u64, + compression_block_size: u32, + pub(crate) blocks: Vec, + hash: Hash, +} +pub(crate) struct PartialBlock { + uncompressed_size: usize, + pub(crate) data: Vec, +} + +fn get_compression_slot( + version: Version, + compression_slots: &mut Vec>, + compression: Compression, +) -> Result { + let slot = compression_slots + .iter() + .enumerate() + .find(|(_, s)| **s == Some(compression)); + Ok(if let Some((i, _)) = slot { + // existing found + i + } else { + if version.version_major() < VersionMajor::FNameBasedCompression { + return Err(Error::Other(format!( + "cannot use {compression:?} prior to FNameBasedCompression (pak version 8)" + ))); + } + + // find empty slot + if let Some((i, empty_slot)) = compression_slots + .iter_mut() + .enumerate() + .find(|(_, s)| s.is_none()) + { + // empty found, set it to used compression type + *empty_slot = Some(compression); + i + } else { + // no empty slot found, add a new one + compression_slots.push(Some(compression)); + compression_slots.len() - 1 + } + } as u32) +} + +impl PartialEntry { + pub(crate) fn into_entry( + &self, + version: Version, + compression_slots: &mut Vec>, + file_offset: u64, + ) -> Result { + let compression_slot = self + .compression + .map(|c| get_compression_slot(version, compression_slots, c)) + .transpose()?; + + let blocks = (!self.blocks.is_empty()).then(|| { + let entry_size = + Entry::get_serialized_size(version, compression_slot, self.blocks.len() as u32); + + let mut offset = entry_size; + if version.version_major() < VersionMajor::RelativeChunkOffsets { + offset += file_offset; + }; + + self.blocks + .iter() + .map(|block| { + let start = offset; + offset += block.data.len() as u64; + let end = offset; + Block { start, end } + }) + .collect() + }); + + Ok(Entry { + offset: file_offset, + compressed: self.compressed_size, + uncompressed: self.uncompressed_size, + compression_slot, + timestamp: None, + hash: Some(self.hash.clone()), + blocks, + flags: 0, + compression_block_size: self.compression_block_size, + }) + } +} + +pub(crate) fn build_partial_entry( + //version: Version, + allowed_compression: &[Compression], + data: &[u8], +) -> Result { + // TODO hash needs to be post-compression/encryption + use sha1::{Digest, Sha1}; + let mut hasher = Sha1::new(); + + // TODO possibly select best compression based on some criteria instead of picking first + let compression = allowed_compression.first().cloned(); + let uncompressed_size = data.len() as u64; + let compression_block_size; + + let (blocks, compressed_size) = match compression { + #[cfg(not(feature = "compression"))] + Some(_) => { + unreachable!("should not be able to reach this point without compression feature") + } + #[cfg(feature = "compression")] + Some(compression) => { + compression_block_size = 0x10000; + let mut compressed_size = 0; + let mut blocks = vec![]; + for chunk in data.chunks(compression_block_size as usize) { + let data = compress(compression, chunk)?; + compressed_size += data.len() as u64; + hasher.update(&data); + blocks.push(PartialBlock { + uncompressed_size: chunk.len(), + data, + }) + } + + (blocks, compressed_size) + } + None => { + compression_block_size = 0; + hasher.update(&data); + (vec![], uncompressed_size) + } + }; + + Ok(PartialEntry { + compression, + compressed_size, + uncompressed_size, + compression_block_size, + blocks, + hash: Hash(hasher.finalize().into()), + }) +} + +fn compress(compression: Compression, data: &[u8]) -> Result> { + use std::io::Write; + + let compressed = match compression { + Compression::Zlib => { + let mut compress = + flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::fast()); + compress.write_all(data.as_ref())?; + compress.finish()? + } + Compression::Gzip => { + let mut compress = + flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast()); + compress.write_all(data.as_ref())?; + compress.finish()? + } + Compression::Zstd => zstd::stream::encode_all(data, 0)?, + Compression::Oodle => { + let mut output = vec![]; + oodle_loader::oodle() + .unwrap() + .compress( + data.as_ref(), + &mut output, + oodle_loader::Compressor::Mermaid, + oodle_loader::CompressionLevel::Normal, + ) + .unwrap(); + output + //return Err(Error::Other("writing Oodle compression unsupported".into())) + } + }; + + Ok(compressed) +} diff --git a/repak/src/entry.rs b/repak/src/entry.rs index a361eab..6e353e4 100644 --- a/repak/src/entry.rs +++ b/repak/src/entry.rs @@ -1,7 +1,8 @@ -use crate::Error; +use crate::{data::build_partial_entry, Error, Hash}; use super::{ext::BoolExt, ext::ReadExt, Compression, Version, VersionMajor}; use byteorder::{ReadBytesExt, WriteBytesExt, LE}; +use oodle_loader::oodle; use std::io; #[derive(Debug, PartialEq, Clone, Copy)] @@ -10,7 +11,7 @@ pub(crate) enum EntryLocation { Index, } -#[derive(Debug)] +#[derive(Debug, Default, Clone)] pub(crate) struct Block { pub start: u64, pub end: u64, @@ -55,7 +56,7 @@ pub(crate) struct Entry { pub uncompressed: u64, pub compression_slot: Option, pub timestamp: Option, - pub hash: Option<[u8; 20]>, + pub hash: Option, pub blocks: Option>, pub flags: u8, pub compression_block_size: u32, @@ -103,127 +104,19 @@ impl Entry { version: Version, compression_slots: &mut Vec>, allowed_compression: &[Compression], - data: impl AsRef<[u8]>, - ) -> Result { - // TODO hash needs to be post-compression - use sha1::{Digest, Sha1}; - let mut hasher = Sha1::new(); - hasher.update(&data); - - let offset = writer.stream_position()?; - let len = data.as_ref().len() as u64; - - // TODO possibly select best compression based on some criteria instead of picking first - let compression = allowed_compression.first().cloned(); - - let compression_slot = if let Some(compression) = compression { - // find existing - let slot = compression_slots - .iter() - .enumerate() - .find(|(_, s)| **s == Some(compression)); - Some(if let Some((i, _)) = slot { - // existing found - i - } else { - if version.version_major() < VersionMajor::FNameBasedCompression { - return Err(Error::Other(format!( - "cannot use {compression:?} prior to FNameBasedCompression (pak version 8)" - ))); - } - - // find empty slot - if let Some((i, empty_slot)) = compression_slots - .iter_mut() - .enumerate() - .find(|(_, s)| s.is_none()) - { - // empty found, set it to used compression type - *empty_slot = Some(compression); - i - } else { - // no empty slot found, add a new one - compression_slots.push(Some(compression)); - compression_slots.len() - 1 - } - } as u32) + data: &[u8], + ) -> Result { + let partial_entry = build_partial_entry(allowed_compression, data)?; + let stream_position = writer.stream_position()?; + let entry = partial_entry.into_entry(version, compression_slots, stream_position)?; + entry.write(writer, version, crate::entry::EntryLocation::Data)?; + if partial_entry.blocks.is_empty() { + writer.write_all(&data)?; } else { - None - }; - - let (blocks, compressed) = match compression { - #[cfg(not(feature = "compression"))] - Some(_) => { - unreachable!("should not be able to reach this point without compression feature") + for block in partial_entry.blocks { + writer.write_all(&block.data)?; } - #[cfg(feature = "compression")] - Some(compression) => { - use std::io::Write; - - let entry_size = Entry::get_serialized_size(version, compression_slot, 1); - let data_offset = offset + entry_size; - - let compressed = match compression { - Compression::Zlib => { - let mut compress = flate2::write::ZlibEncoder::new( - Vec::new(), - flate2::Compression::fast(), - ); - compress.write_all(data.as_ref())?; - compress.finish()? - } - Compression::Gzip => { - let mut compress = - flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast()); - compress.write_all(data.as_ref())?; - compress.finish()? - } - Compression::Zstd => zstd::stream::encode_all(data.as_ref(), 0)?, - Compression::Oodle => { - return Err(Error::Other("writing Oodle compression unsupported".into())) - } - }; - - let compute_offset = |index: usize| -> u64 { - match version.version_major() >= VersionMajor::RelativeChunkOffsets { - true => index as u64 + (data_offset - offset), - false => index as u64 + data_offset, - } - }; - - let blocks = vec![Block { - start: compute_offset(0), - end: compute_offset(compressed.len()), - }]; - - (Some(blocks), Some(compressed)) - } - None => (None, None), - }; - - let entry = super::entry::Entry { - offset, - compressed: compressed - .as_ref() - .map(|c: &Vec| c.len() as u64) - .unwrap_or(len), - uncompressed: len, - compression_slot, - timestamp: None, - hash: Some(hasher.finalize().into()), - blocks, - flags: 0, - compression_block_size: compressed.as_ref().map(|_| len as u32).unwrap_or_default(), - }; - - entry.write(writer, version, EntryLocation::Data)?; - - if let Some(compressed) = compressed { - writer.write_all(&compressed)?; - } else { - writer.write_all(data.as_ref())?; } - Ok(entry) } @@ -243,7 +136,7 @@ impl Entry { n => Some(n - 1), }; let timestamp = (ver == VersionMajor::Initial).then_try(|| reader.read_u64::())?; - let hash = Some(reader.read_guid()?); + let hash = Some(Hash(reader.read_guid()?)); let blocks = (ver >= VersionMajor::CompressionEncryption && compression.is_some()) .then_try(|| reader.read_array(Block::read))?; let flags = (ver >= VersionMajor::CompressionEncryption) @@ -287,7 +180,7 @@ impl Entry { writer.write_u64::(self.timestamp.unwrap_or_default())?; } if let Some(hash) = self.hash { - writer.write_all(&hash)?; + writer.write_all(&hash.0)?; } else { panic!("hash missing"); } diff --git a/repak/src/footer.rs b/repak/src/footer.rs index 0bc35d5..7e3c1c4 100644 --- a/repak/src/footer.rs +++ b/repak/src/footer.rs @@ -1,4 +1,7 @@ -use crate::ext::{BoolExt, WriteExt}; +use crate::{ + ext::{BoolExt, WriteExt}, + Hash, +}; use super::{ext::ReadExt, Compression, Version, VersionMajor}; use byteorder::{ReadBytesExt, WriteBytesExt, LE}; @@ -13,7 +16,7 @@ pub struct Footer { pub version_major: VersionMajor, pub index_offset: u64, pub index_size: u64, - pub hash: [u8; 20], + pub hash: Hash, pub frozen: bool, pub compression: Vec>, } @@ -29,7 +32,7 @@ impl Footer { VersionMajor::from_repr(reader.read_u32::()?).unwrap_or(version.version_major()); let index_offset = reader.read_u64::()?; let index_size = reader.read_u64::()?; - let hash = reader.read_guid()?; + let hash = Hash(reader.read_guid()?); let frozen = version.version_major() == VersionMajor::FrozenIndex && reader.read_bool()?; let compression = { let mut compression = Vec::with_capacity(match version { @@ -91,7 +94,7 @@ impl Footer { writer.write_u32::(self.version_major as u32)?; writer.write_u64::(self.index_offset)?; writer.write_u64::(self.index_size)?; - writer.write_all(&self.hash)?; + writer.write_all(&self.hash.0)?; if self.version_major == VersionMajor::FrozenIndex { writer.write_bool(self.frozen)?; } diff --git a/repak/src/lib.rs b/repak/src/lib.rs index fd98618..5ff71a8 100644 --- a/repak/src/lib.rs +++ b/repak/src/lib.rs @@ -1,4 +1,5 @@ #![allow(dead_code)] +mod data; mod entry; mod error; mod ext; diff --git a/repak/src/pak.rs b/repak/src/pak.rs index 2fdc8d7..3b4d728 100644 --- a/repak/src/pak.rs +++ b/repak/src/pak.rs @@ -1,11 +1,21 @@ +use crate::data::build_partial_entry; use crate::entry::Entry; -use crate::Compression; +use crate::{Compression, Error}; use super::ext::{ReadExt, WriteExt}; use super::{Version, VersionMajor}; use byteorder::{ReadBytesExt, WriteBytesExt, LE}; use std::collections::BTreeMap; use std::io::{self, Read, Seek, Write}; +use std::sync::Arc; + +#[derive(Default, Clone, Copy)] +pub(crate) struct Hash(pub(crate) [u8; 20]); +impl std::fmt::Debug for Hash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Hash({})", hex::encode(self.0)) + } +} #[derive(Debug)] pub struct PakBuilder { @@ -89,6 +99,10 @@ pub struct PakWriter { allowed_compression: Vec, } +pub struct ParallelPakWriter { + tx: std::sync::mpsc::SyncSender<(String, Arc>)>, +} + #[derive(Debug)] pub(crate) struct Pak { version: Version, @@ -281,19 +295,94 @@ impl PakWriter { self.pak.version, &mut self.pak.compression, &self.allowed_compression, - data, + data.as_ref(), )?, ); Ok(()) } + pub fn parallel(&mut self, mut f: F) -> Result<&mut Self, E> + where + F: Send + Sync + FnMut(&mut ParallelPakWriter) -> Result<(), E>, + E: From + Send, + { + { + use pariter::IteratorExt as _; + + let (tx, rx) = std::sync::mpsc::sync_channel(0); + + pariter::scope(|scope| -> Result<(), E> { + let handle = scope.spawn(|_| -> Result<(), E> { + f(&mut ParallelPakWriter { tx })?; + Ok(()) + }); + + let result = rx + .into_iter() + .parallel_map_scoped( + scope, + |(path, data): (String, Arc>)| -> Result<_, Error> { + let partial_entry = + build_partial_entry(&self.allowed_compression, &data)?; + let data = partial_entry.blocks.is_empty().then(|| Arc::new(data)); + Ok((path, data, partial_entry)) + }, + ) + .try_for_each(|message| -> Result<(), Error> { + let stream_position = self.writer.stream_position()?; + let (path, data, partial_entry) = message?; + + let entry = partial_entry.into_entry( + self.pak.version, + &mut self.pak.compression, + stream_position, + )?; + + entry.write( + &mut self.writer, + self.pak.version, + crate::entry::EntryLocation::Data, + )?; + + self.pak.index.add_entry(&path, entry); + + if let Some(data) = data { + self.writer.write_all(&data)?; + } else { + for block in partial_entry.blocks { + self.writer.write_all(&block.data)?; + } + } + Ok(()) + }); + + if let Err(err) = handle.join().unwrap() { + Err(err.into()) // prioritize error from user code + } else if let Err(err) = result { + Err(err.into()) // user code was successful, check pak writer error + } else { + Ok(()) // neither returned error so return success + } + }) + .unwrap()?; + } + Ok(self) + } + pub fn write_index(mut self) -> Result { self.pak.write(&mut self.writer, &self.key)?; Ok(self.writer) } } +impl ParallelPakWriter { + pub fn write_file(&mut self, path: String, data: Vec) -> Result<(), Error> { + self.tx.send((path, Arc::new(data))).unwrap(); + Ok(()) + } +} + impl Pak { fn read( reader: &mut R, @@ -541,12 +630,12 @@ impl Pak { index_writer.write_u32::(1)?; // we have path hash index index_writer.write_u64::(path_hash_index_offset)?; index_writer.write_u64::(phi_buf.len() as u64)?; // path hash index size - index_writer.write_all(&hash(&phi_buf))?; + index_writer.write_all(&hash(&phi_buf).0)?; index_writer.write_u32::(1)?; // we have full directory index index_writer.write_u64::(full_directory_index_offset)?; index_writer.write_u64::(fdi_buf.len() as u64)?; // path hash index size - index_writer.write_all(&hash(&fdi_buf))?; + index_writer.write_all(&hash(&fdi_buf).0)?; index_writer.write_u32::(encoded_entries.len() as u32)?; index_writer.write_all(&encoded_entries)?; @@ -584,11 +673,11 @@ impl Pak { } } -fn hash(data: &[u8]) -> [u8; 20] { +fn hash(data: &[u8]) -> Hash { use sha1::{Digest, Sha1}; let mut hasher = Sha1::new(); hasher.update(data); - hasher.finalize().into() + Hash(hasher.finalize().into()) } fn generate_path_hash_index( diff --git a/repak/tests/test.rs b/repak/tests/test.rs index 5c63f8c..51e6d22 100644 --- a/repak/tests/test.rs +++ b/repak/tests/test.rs @@ -183,10 +183,12 @@ fn test_write(_version: repak::Version, _file_name: &str, bytes: &[u8]) { Some(0x205C5A7D), ); - for path in pak_reader.files() { - let data = pak_reader.get(&path, &mut reader).unwrap(); - pak_writer.write_file(&path, data).unwrap(); - } + pak_writer.parallel(|writer| { + for path in pak_reader.files() { + let data = pak_reader.get(&path, &mut reader).unwrap(); + writer.write_file(path, data).unwrap(); + } + }).unwrap(); assert!(pak_writer.write_index().unwrap().into_inner() == reader.into_inner()); } diff --git a/repak_cli/src/main.rs b/repak_cli/src/main.rs index 1c13b51..b2ae887 100644 --- a/repak_cli/src/main.rs +++ b/repak_cli/src/main.rs @@ -498,16 +498,19 @@ fn pack(args: ActionPack) -> Result<(), repak::Error> { (Output::Stdout, itertools::Either::Right(iter)) }; let log = log.clone(); - iter.try_for_each(|p| { - let rel = &p - .strip_prefix(input_path) - .expect("file not in input directory") - .to_slash() - .expect("failed to convert to slash path"); - if args.verbose { - log.println(format!("packing {}", &rel)); + pak.parallel(|writer| -> Result<(), repak::Error> { + for p in &mut iter { + let rel = &p + .strip_prefix(input_path) + .expect("file not in input directory") + .to_slash() + .expect("failed to convert to slash path"); + if args.verbose { + log.println(format!("packing {}", &rel)); + } + writer.write_file(rel.to_string(), std::fs::read(p)?)?; } - pak.write_file(rel, std::fs::read(p)?) + Ok(()) })?; pak.write_index()?;