Add support for parallel file compression

This commit is contained in:
Truman Kilen 2025-01-18 11:20:45 -06:00
parent dbe16c9001
commit c2b5461f25
9 changed files with 389 additions and 146 deletions

60
Cargo.lock generated
View file

@ -232,6 +232,28 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
@ -251,6 +273,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@ -389,6 +420,12 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "hex"
version = "0.4.3"
@ -646,6 +683,16 @@ dependencies = [
"adler2",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
@ -669,6 +716,17 @@ dependencies = [
"ureq",
]
[[package]]
name = "pariter"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "324a62b9e7b5f270c0acc92a2040f8028bb643f959f9c068f11a7864f327e3d9"
dependencies = [
"crossbeam",
"crossbeam-channel",
"num_cpus",
]
[[package]]
name = "paste"
version = "1.0.15"
@ -784,7 +842,9 @@ dependencies = [
"base64",
"byteorder",
"flate2",
"hex",
"oodle_loader",
"pariter",
"paste",
"sha1",
"strum",

View file

@ -25,6 +25,8 @@ oodle_loader = { path = "../oodle_loader", optional = true}
thiserror = "2.0"
sha1 = { workspace = true }
strum = { workspace = true }
pariter = "0.5.1"
hex.workspace = true
[dev-dependencies]
base64 = { workspace = true }

190
repak/src/data.rs Normal file
View file

@ -0,0 +1,190 @@
use crate::{
entry::{Block, Entry},
Compression, Error, Hash, Version, VersionMajor,
};
type Result<T, E = Error> = std::result::Result<T, E>;
pub(crate) struct PartialEntry {
compression: Option<Compression>,
compressed_size: u64,
uncompressed_size: u64,
compression_block_size: u32,
pub(crate) blocks: Vec<PartialBlock>,
hash: Hash,
}
pub(crate) struct PartialBlock {
uncompressed_size: usize,
pub(crate) data: Vec<u8>,
}
fn get_compression_slot(
version: Version,
compression_slots: &mut Vec<Option<Compression>>,
compression: Compression,
) -> Result<u32> {
let slot = compression_slots
.iter()
.enumerate()
.find(|(_, s)| **s == Some(compression));
Ok(if let Some((i, _)) = slot {
// existing found
i
} else {
if version.version_major() < VersionMajor::FNameBasedCompression {
return Err(Error::Other(format!(
"cannot use {compression:?} prior to FNameBasedCompression (pak version 8)"
)));
}
// find empty slot
if let Some((i, empty_slot)) = compression_slots
.iter_mut()
.enumerate()
.find(|(_, s)| s.is_none())
{
// empty found, set it to used compression type
*empty_slot = Some(compression);
i
} else {
// no empty slot found, add a new one
compression_slots.push(Some(compression));
compression_slots.len() - 1
}
} as u32)
}
impl PartialEntry {
pub(crate) fn into_entry(
&self,
version: Version,
compression_slots: &mut Vec<Option<Compression>>,
file_offset: u64,
) -> Result<Entry> {
let compression_slot = self
.compression
.map(|c| get_compression_slot(version, compression_slots, c))
.transpose()?;
let blocks = (!self.blocks.is_empty()).then(|| {
let entry_size =
Entry::get_serialized_size(version, compression_slot, self.blocks.len() as u32);
let mut offset = entry_size;
if version.version_major() < VersionMajor::RelativeChunkOffsets {
offset += file_offset;
};
self.blocks
.iter()
.map(|block| {
let start = offset;
offset += block.data.len() as u64;
let end = offset;
Block { start, end }
})
.collect()
});
Ok(Entry {
offset: file_offset,
compressed: self.compressed_size,
uncompressed: self.uncompressed_size,
compression_slot,
timestamp: None,
hash: Some(self.hash.clone()),
blocks,
flags: 0,
compression_block_size: self.compression_block_size,
})
}
}
pub(crate) fn build_partial_entry(
//version: Version,
allowed_compression: &[Compression],
data: &[u8],
) -> Result<PartialEntry> {
// TODO hash needs to be post-compression/encryption
use sha1::{Digest, Sha1};
let mut hasher = Sha1::new();
// TODO possibly select best compression based on some criteria instead of picking first
let compression = allowed_compression.first().cloned();
let uncompressed_size = data.len() as u64;
let compression_block_size;
let (blocks, compressed_size) = match compression {
#[cfg(not(feature = "compression"))]
Some(_) => {
unreachable!("should not be able to reach this point without compression feature")
}
#[cfg(feature = "compression")]
Some(compression) => {
compression_block_size = 0x10000;
let mut compressed_size = 0;
let mut blocks = vec![];
for chunk in data.chunks(compression_block_size as usize) {
let data = compress(compression, chunk)?;
compressed_size += data.len() as u64;
hasher.update(&data);
blocks.push(PartialBlock {
uncompressed_size: chunk.len(),
data,
})
}
(blocks, compressed_size)
}
None => {
compression_block_size = 0;
hasher.update(&data);
(vec![], uncompressed_size)
}
};
Ok(PartialEntry {
compression,
compressed_size,
uncompressed_size,
compression_block_size,
blocks,
hash: Hash(hasher.finalize().into()),
})
}
fn compress(compression: Compression, data: &[u8]) -> Result<Vec<u8>> {
use std::io::Write;
let compressed = match compression {
Compression::Zlib => {
let mut compress =
flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::fast());
compress.write_all(data.as_ref())?;
compress.finish()?
}
Compression::Gzip => {
let mut compress =
flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
compress.write_all(data.as_ref())?;
compress.finish()?
}
Compression::Zstd => zstd::stream::encode_all(data, 0)?,
Compression::Oodle => {
let mut output = vec![];
oodle_loader::oodle()
.unwrap()
.compress(
data.as_ref(),
&mut output,
oodle_loader::Compressor::Mermaid,
oodle_loader::CompressionLevel::Normal,
)
.unwrap();
output
//return Err(Error::Other("writing Oodle compression unsupported".into()))
}
};
Ok(compressed)
}

View file

@ -1,7 +1,8 @@
use crate::Error;
use crate::{data::build_partial_entry, Error, Hash};
use super::{ext::BoolExt, ext::ReadExt, Compression, Version, VersionMajor};
use byteorder::{ReadBytesExt, WriteBytesExt, LE};
use oodle_loader::oodle;
use std::io;
#[derive(Debug, PartialEq, Clone, Copy)]
@ -10,7 +11,7 @@ pub(crate) enum EntryLocation {
Index,
}
#[derive(Debug)]
#[derive(Debug, Default, Clone)]
pub(crate) struct Block {
pub start: u64,
pub end: u64,
@ -55,7 +56,7 @@ pub(crate) struct Entry {
pub uncompressed: u64,
pub compression_slot: Option<u32>,
pub timestamp: Option<u64>,
pub hash: Option<[u8; 20]>,
pub hash: Option<Hash>,
pub blocks: Option<Vec<Block>>,
pub flags: u8,
pub compression_block_size: u32,
@ -103,127 +104,19 @@ impl Entry {
version: Version,
compression_slots: &mut Vec<Option<Compression>>,
allowed_compression: &[Compression],
data: impl AsRef<[u8]>,
) -> Result<Self, super::Error> {
// TODO hash needs to be post-compression
use sha1::{Digest, Sha1};
let mut hasher = Sha1::new();
hasher.update(&data);
let offset = writer.stream_position()?;
let len = data.as_ref().len() as u64;
// TODO possibly select best compression based on some criteria instead of picking first
let compression = allowed_compression.first().cloned();
let compression_slot = if let Some(compression) = compression {
// find existing
let slot = compression_slots
.iter()
.enumerate()
.find(|(_, s)| **s == Some(compression));
Some(if let Some((i, _)) = slot {
// existing found
i
data: &[u8],
) -> Result<Self, Error> {
let partial_entry = build_partial_entry(allowed_compression, data)?;
let stream_position = writer.stream_position()?;
let entry = partial_entry.into_entry(version, compression_slots, stream_position)?;
entry.write(writer, version, crate::entry::EntryLocation::Data)?;
if partial_entry.blocks.is_empty() {
writer.write_all(&data)?;
} else {
if version.version_major() < VersionMajor::FNameBasedCompression {
return Err(Error::Other(format!(
"cannot use {compression:?} prior to FNameBasedCompression (pak version 8)"
)));
for block in partial_entry.blocks {
writer.write_all(&block.data)?;
}
// find empty slot
if let Some((i, empty_slot)) = compression_slots
.iter_mut()
.enumerate()
.find(|(_, s)| s.is_none())
{
// empty found, set it to used compression type
*empty_slot = Some(compression);
i
} else {
// no empty slot found, add a new one
compression_slots.push(Some(compression));
compression_slots.len() - 1
}
} as u32)
} else {
None
};
let (blocks, compressed) = match compression {
#[cfg(not(feature = "compression"))]
Some(_) => {
unreachable!("should not be able to reach this point without compression feature")
}
#[cfg(feature = "compression")]
Some(compression) => {
use std::io::Write;
let entry_size = Entry::get_serialized_size(version, compression_slot, 1);
let data_offset = offset + entry_size;
let compressed = match compression {
Compression::Zlib => {
let mut compress = flate2::write::ZlibEncoder::new(
Vec::new(),
flate2::Compression::fast(),
);
compress.write_all(data.as_ref())?;
compress.finish()?
}
Compression::Gzip => {
let mut compress =
flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
compress.write_all(data.as_ref())?;
compress.finish()?
}
Compression::Zstd => zstd::stream::encode_all(data.as_ref(), 0)?,
Compression::Oodle => {
return Err(Error::Other("writing Oodle compression unsupported".into()))
}
};
let compute_offset = |index: usize| -> u64 {
match version.version_major() >= VersionMajor::RelativeChunkOffsets {
true => index as u64 + (data_offset - offset),
false => index as u64 + data_offset,
}
};
let blocks = vec![Block {
start: compute_offset(0),
end: compute_offset(compressed.len()),
}];
(Some(blocks), Some(compressed))
}
None => (None, None),
};
let entry = super::entry::Entry {
offset,
compressed: compressed
.as_ref()
.map(|c: &Vec<u8>| c.len() as u64)
.unwrap_or(len),
uncompressed: len,
compression_slot,
timestamp: None,
hash: Some(hasher.finalize().into()),
blocks,
flags: 0,
compression_block_size: compressed.as_ref().map(|_| len as u32).unwrap_or_default(),
};
entry.write(writer, version, EntryLocation::Data)?;
if let Some(compressed) = compressed {
writer.write_all(&compressed)?;
} else {
writer.write_all(data.as_ref())?;
}
Ok(entry)
}
@ -243,7 +136,7 @@ impl Entry {
n => Some(n - 1),
};
let timestamp = (ver == VersionMajor::Initial).then_try(|| reader.read_u64::<LE>())?;
let hash = Some(reader.read_guid()?);
let hash = Some(Hash(reader.read_guid()?));
let blocks = (ver >= VersionMajor::CompressionEncryption && compression.is_some())
.then_try(|| reader.read_array(Block::read))?;
let flags = (ver >= VersionMajor::CompressionEncryption)
@ -287,7 +180,7 @@ impl Entry {
writer.write_u64::<LE>(self.timestamp.unwrap_or_default())?;
}
if let Some(hash) = self.hash {
writer.write_all(&hash)?;
writer.write_all(&hash.0)?;
} else {
panic!("hash missing");
}

View file

@ -1,4 +1,7 @@
use crate::ext::{BoolExt, WriteExt};
use crate::{
ext::{BoolExt, WriteExt},
Hash,
};
use super::{ext::ReadExt, Compression, Version, VersionMajor};
use byteorder::{ReadBytesExt, WriteBytesExt, LE};
@ -13,7 +16,7 @@ pub struct Footer {
pub version_major: VersionMajor,
pub index_offset: u64,
pub index_size: u64,
pub hash: [u8; 20],
pub hash: Hash,
pub frozen: bool,
pub compression: Vec<Option<Compression>>,
}
@ -29,7 +32,7 @@ impl Footer {
VersionMajor::from_repr(reader.read_u32::<LE>()?).unwrap_or(version.version_major());
let index_offset = reader.read_u64::<LE>()?;
let index_size = reader.read_u64::<LE>()?;
let hash = reader.read_guid()?;
let hash = Hash(reader.read_guid()?);
let frozen = version.version_major() == VersionMajor::FrozenIndex && reader.read_bool()?;
let compression = {
let mut compression = Vec::with_capacity(match version {
@ -91,7 +94,7 @@ impl Footer {
writer.write_u32::<LE>(self.version_major as u32)?;
writer.write_u64::<LE>(self.index_offset)?;
writer.write_u64::<LE>(self.index_size)?;
writer.write_all(&self.hash)?;
writer.write_all(&self.hash.0)?;
if self.version_major == VersionMajor::FrozenIndex {
writer.write_bool(self.frozen)?;
}

View file

@ -1,4 +1,5 @@
#![allow(dead_code)]
mod data;
mod entry;
mod error;
mod ext;

View file

@ -1,11 +1,21 @@
use crate::data::build_partial_entry;
use crate::entry::Entry;
use crate::Compression;
use crate::{Compression, Error};
use super::ext::{ReadExt, WriteExt};
use super::{Version, VersionMajor};
use byteorder::{ReadBytesExt, WriteBytesExt, LE};
use std::collections::BTreeMap;
use std::io::{self, Read, Seek, Write};
use std::sync::Arc;
#[derive(Default, Clone, Copy)]
pub(crate) struct Hash(pub(crate) [u8; 20]);
impl std::fmt::Debug for Hash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Hash({})", hex::encode(self.0))
}
}
#[derive(Debug)]
pub struct PakBuilder {
@ -89,6 +99,10 @@ pub struct PakWriter<W: Write + Seek> {
allowed_compression: Vec<Compression>,
}
pub struct ParallelPakWriter {
tx: std::sync::mpsc::SyncSender<(String, Arc<Vec<u8>>)>,
}
#[derive(Debug)]
pub(crate) struct Pak {
version: Version,
@ -281,19 +295,94 @@ impl<W: Write + Seek> PakWriter<W> {
self.pak.version,
&mut self.pak.compression,
&self.allowed_compression,
data,
data.as_ref(),
)?,
);
Ok(())
}
pub fn parallel<F, E>(&mut self, mut f: F) -> Result<&mut Self, E>
where
F: Send + Sync + FnMut(&mut ParallelPakWriter) -> Result<(), E>,
E: From<Error> + Send,
{
{
use pariter::IteratorExt as _;
let (tx, rx) = std::sync::mpsc::sync_channel(0);
pariter::scope(|scope| -> Result<(), E> {
let handle = scope.spawn(|_| -> Result<(), E> {
f(&mut ParallelPakWriter { tx })?;
Ok(())
});
let result = rx
.into_iter()
.parallel_map_scoped(
scope,
|(path, data): (String, Arc<Vec<u8>>)| -> Result<_, Error> {
let partial_entry =
build_partial_entry(&self.allowed_compression, &data)?;
let data = partial_entry.blocks.is_empty().then(|| Arc::new(data));
Ok((path, data, partial_entry))
},
)
.try_for_each(|message| -> Result<(), Error> {
let stream_position = self.writer.stream_position()?;
let (path, data, partial_entry) = message?;
let entry = partial_entry.into_entry(
self.pak.version,
&mut self.pak.compression,
stream_position,
)?;
entry.write(
&mut self.writer,
self.pak.version,
crate::entry::EntryLocation::Data,
)?;
self.pak.index.add_entry(&path, entry);
if let Some(data) = data {
self.writer.write_all(&data)?;
} else {
for block in partial_entry.blocks {
self.writer.write_all(&block.data)?;
}
}
Ok(())
});
if let Err(err) = handle.join().unwrap() {
Err(err.into()) // prioritize error from user code
} else if let Err(err) = result {
Err(err.into()) // user code was successful, check pak writer error
} else {
Ok(()) // neither returned error so return success
}
})
.unwrap()?;
}
Ok(self)
}
pub fn write_index(mut self) -> Result<W, super::Error> {
self.pak.write(&mut self.writer, &self.key)?;
Ok(self.writer)
}
}
impl ParallelPakWriter {
pub fn write_file(&mut self, path: String, data: Vec<u8>) -> Result<(), Error> {
self.tx.send((path, Arc::new(data))).unwrap();
Ok(())
}
}
impl Pak {
fn read<R: Read + Seek>(
reader: &mut R,
@ -541,12 +630,12 @@ impl Pak {
index_writer.write_u32::<LE>(1)?; // we have path hash index
index_writer.write_u64::<LE>(path_hash_index_offset)?;
index_writer.write_u64::<LE>(phi_buf.len() as u64)?; // path hash index size
index_writer.write_all(&hash(&phi_buf))?;
index_writer.write_all(&hash(&phi_buf).0)?;
index_writer.write_u32::<LE>(1)?; // we have full directory index
index_writer.write_u64::<LE>(full_directory_index_offset)?;
index_writer.write_u64::<LE>(fdi_buf.len() as u64)?; // path hash index size
index_writer.write_all(&hash(&fdi_buf))?;
index_writer.write_all(&hash(&fdi_buf).0)?;
index_writer.write_u32::<LE>(encoded_entries.len() as u32)?;
index_writer.write_all(&encoded_entries)?;
@ -584,11 +673,11 @@ impl Pak {
}
}
fn hash(data: &[u8]) -> [u8; 20] {
fn hash(data: &[u8]) -> Hash {
use sha1::{Digest, Sha1};
let mut hasher = Sha1::new();
hasher.update(data);
hasher.finalize().into()
Hash(hasher.finalize().into())
}
fn generate_path_hash_index<W: Write>(

View file

@ -183,10 +183,12 @@ fn test_write(_version: repak::Version, _file_name: &str, bytes: &[u8]) {
Some(0x205C5A7D),
);
pak_writer.parallel(|writer| {
for path in pak_reader.files() {
let data = pak_reader.get(&path, &mut reader).unwrap();
pak_writer.write_file(&path, data).unwrap();
writer.write_file(path, data).unwrap();
}
}).unwrap();
assert!(pak_writer.write_index().unwrap().into_inner() == reader.into_inner());
}

View file

@ -498,7 +498,8 @@ fn pack(args: ActionPack) -> Result<(), repak::Error> {
(Output::Stdout, itertools::Either::Right(iter))
};
let log = log.clone();
iter.try_for_each(|p| {
pak.parallel(|writer| -> Result<(), repak::Error> {
for p in &mut iter {
let rel = &p
.strip_prefix(input_path)
.expect("file not in input directory")
@ -507,7 +508,9 @@ fn pack(args: ActionPack) -> Result<(), repak::Error> {
if args.verbose {
log.println(format!("packing {}", &rel));
}
pak.write_file(rel, std::fs::read(p)?)
writer.write_file(rel.to_string(), std::fs::read(p)?)?;
}
Ok(())
})?;
pak.write_index()?;