Remove non-functional internal ParallelPakWriter

This commit is contained in:
Truman Kilen 2025-01-21 13:29:08 -06:00
parent 1a80db37d5
commit 29bf6e7859
5 changed files with 31 additions and 152 deletions

59
Cargo.lock generated
View file

@ -232,28 +232,6 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-deque" name = "crossbeam-deque"
version = "0.8.6" version = "0.8.6"
@ -273,15 +251,6 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.21" version = "0.8.21"
@ -420,12 +389,6 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]] [[package]]
name = "hex" name = "hex"
version = "0.4.3" version = "0.4.3"
@ -692,16 +655,6 @@ dependencies = [
"adler2", "adler2",
] ]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]] [[package]]
name = "number_prefix" name = "number_prefix"
version = "0.4.0" version = "0.4.0"
@ -725,17 +678,6 @@ dependencies = [
"ureq", "ureq",
] ]
[[package]]
name = "pariter"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "324a62b9e7b5f270c0acc92a2040f8028bb643f959f9c068f11a7864f327e3d9"
dependencies = [
"crossbeam",
"crossbeam-channel",
"num_cpus",
]
[[package]] [[package]]
name = "paste" name = "paste"
version = "1.0.15" version = "1.0.15"
@ -854,7 +796,6 @@ dependencies = [
"hex", "hex",
"lz4_flex", "lz4_flex",
"oodle_loader", "oodle_loader",
"pariter",
"paste", "paste",
"sha1", "sha1",
"strum", "strum",

View file

@ -23,7 +23,6 @@ oodle_loader = { path = "../oodle_loader", optional = true}
thiserror = "2.0" thiserror = "2.0"
sha1 = { workspace = true } sha1 = { workspace = true }
strum = { workspace = true } strum = { workspace = true }
pariter = "0.5.1"
hex.workspace = true hex.workspace = true
[dev-dependencies] [dev-dependencies]

View file

@ -322,65 +322,12 @@ impl<W: Write + Seek> PakWriter<W> {
Ok(()) Ok(())
} }
pub fn parallel<'scope, F, E>(&mut self, f: F) -> Result<&mut Self, E>
where
F: Send + Sync + FnOnce(ParallelPakWriter<'scope>) -> Result<(), E>,
E: From<Error> + Send,
{
use pariter::IteratorExt as _;
pariter::scope(|scope: &pariter::Scope<'_>| -> Result<(), E> {
let (tx, rx) = std::sync::mpsc::sync_channel(0);
let handle = scope.spawn(|_| f(ParallelPakWriter { tx }));
let entry_builder = self.entry_builder();
let result = rx
.into_iter()
.parallel_map_scoped(scope, move |(path, compress, data)| -> Result<_, Error> {
Ok((path, entry_builder.build_entry(compress, data)?))
})
.try_for_each(|message| -> Result<(), Error> {
let (path, partial_entry) = message?;
self.write_entry(path, partial_entry)
});
if let Err(err) = handle.join().unwrap() {
Err(err) // prioritize error from user code
} else if let Err(err) = result {
Err(err.into()) // user code was successful, check pak writer error
} else {
Ok(()) // neither returned error so return success
}
})
.unwrap()?;
Ok(self)
}
pub fn write_index(mut self) -> Result<W, super::Error> { pub fn write_index(mut self) -> Result<W, super::Error> {
self.pak.write(&mut self.writer, &self.key)?; self.pak.write(&mut self.writer, &self.key)?;
Ok(self.writer) Ok(self.writer)
} }
} }
pub struct ParallelPakWriter<'scope> {
tx: std::sync::mpsc::SyncSender<(String, bool, Data<'scope>)>,
}
impl<'scope> ParallelPakWriter<'scope> {
pub fn write_file<D: AsRef<[u8]> + Send + Sync + 'scope>(
&self,
path: String,
compress: bool,
data: D,
) -> Result<(), Error> {
self.tx
.send((path, compress, Data(Box::new(data))))
.unwrap();
Ok(())
}
}
struct Data<'d>(Box<dyn AsRef<[u8]> + Send + Sync + 'd>); struct Data<'d>(Box<dyn AsRef<[u8]> + Send + Sync + 'd>);
impl AsRef<[u8]> for Data<'_> { impl AsRef<[u8]> for Data<'_> {
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {

View file

@ -88,33 +88,6 @@ mod test {
} }
} }
#[test]
fn test_parallel_writer() -> Result<(), repak::Error> {
let mut cur = Cursor::new(vec![]);
let mut writer = repak::PakBuilder::new().writer(
&mut cur,
repak::Version::V11,
"../../../".to_string(),
Some(0x12345678),
);
let outside_scope1 = vec![1, 2, 3];
let outside_scope2 = vec![4, 5, 6];
writer.parallel(|writer| -> Result<(), repak::Error> {
let inside_scope = vec![7, 8, 9];
writer.write_file("pass/takes/ownership".to_string(), true, outside_scope1)?;
writer.write_file("pass/outlives/scope".to_string(), true, &outside_scope2)?;
writer.write_file("pass/takes/ownership".to_string(), true, inside_scope)?;
// writer.write_file("fail/doesnt/outlive/scope".to_string(), true, &inside_scope)?;
Ok(())
})?;
Ok(())
}
static AES_KEY: &str = "lNJbw660IOC+kU7cnVQ1oeqrXyhk4J6UAZrCBbcnp94="; static AES_KEY: &str = "lNJbw660IOC+kU7cnVQ1oeqrXyhk4J6UAZrCBbcnp94=";
fn test_read(version: repak::Version, _file_name: &str, bytes: &[u8]) { fn test_read(version: repak::Version, _file_name: &str, bytes: &[u8]) {

View file

@ -487,7 +487,7 @@ fn pack(args: ActionPack) -> Result<(), repak::Error> {
use indicatif::ProgressIterator; use indicatif::ProgressIterator;
let iter = paths.iter(); let iter = paths.iter();
let (log, mut iter) = if !args.quiet { let (log, iter) = if !args.quiet {
let iter = let iter =
iter.progress_with_style(indicatif::ProgressStyle::with_template(STYLE).unwrap()); iter.progress_with_style(indicatif::ProgressStyle::with_template(STYLE).unwrap());
( (
@ -498,8 +498,17 @@ fn pack(args: ActionPack) -> Result<(), repak::Error> {
(Output::Stdout, itertools::Either::Right(iter)) (Output::Stdout, itertools::Either::Right(iter))
}; };
let log = log.clone(); let log = log.clone();
pak.parallel(|writer| -> Result<(), repak::Error> {
for p in &mut iter { let mut result = None;
let result_ref = &mut result;
rayon::in_place_scope(|scope| -> Result<(), repak::Error> {
let (tx, rx) = std::sync::mpsc::sync_channel(0);
let entry_builder = pak.entry_builder();
scope.spawn(move |_| {
*result_ref = Some(
iter.par_bridge()
.try_for_each(|p| -> Result<(), repak::Error> {
let rel = &p let rel = &p
.strip_prefix(input_path) .strip_prefix(input_path)
.expect("file not in input directory") .expect("file not in input directory")
@ -508,10 +517,20 @@ fn pack(args: ActionPack) -> Result<(), repak::Error> {
if args.verbose { if args.verbose {
log.println(format!("packing {}", &rel)); log.println(format!("packing {}", &rel));
} }
writer.write_file(rel.to_string(), true, std::fs::read(p)?)?; let entry = entry_builder.build_entry(true, std::fs::read(p)?)?;
tx.send((rel.to_string(), entry)).unwrap();
Ok(())
}),
);
});
for (path, entry) in rx {
pak.write_entry(path, entry)?;
} }
Ok(()) Ok(())
})?; })?;
result.unwrap()?;
pak.write_index()?; pak.write_index()?;