From 64a027481c48ec95d663f80fbdb52f2156292778 Mon Sep 17 00:00:00 2001 From: "Joseph C. Osborn" Date: Thu, 30 Oct 2025 13:35:42 -0700 Subject: [PATCH] optional memcmp, last-superseq optimizations --- src/bin/reencode.rs | 8 +++--- src/clock.rs | 4 +++ src/statestream.rs | 65 ++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/src/bin/reencode.rs b/src/bin/reencode.rs index 2de3cf7..4aeb519 100644 --- a/src/bin/reencode.rs +++ b/src/bin/reencode.rs @@ -15,8 +15,8 @@ fn main() { let header = &rply.header; println!("{header:?}"); let mut header_out = header.clone(); - header_out.set_block_size(64); - header_out.set_superblock_size(32); + header_out.set_block_size(128); + header_out.set_superblock_size(128); let mut out = encode(header_out, &rply.initial_state, &mut outfile).unwrap(); let mut frame = Frame::default(); while let Ok(()) = rply @@ -47,7 +47,7 @@ fn main() { Timer::DecodeFrame, Timer::DecodeCheckpoint, Timer::DecodeStatestream, - Timer::EncodeStatestream, + Timer::EncodeFrame, Timer::EncodeCheckpoint, Timer::EncodeStatestream, ] { @@ -57,6 +57,8 @@ fn main() { println!("{timer:?}: {} ({avg_time:.8}ms avg)", times.count,); } for counter in [ + Counter::DecSkippedSuperblocks, + Counter::DecSkippedBlocks, Counter::EncReusedBlocks, Counter::EncReusedSuperblocks, Counter::EncSkippedBlocks, diff --git a/src/clock.rs b/src/clock.rs index e26dcad..86e1b99 100644 --- a/src/clock.rs +++ b/src/clock.rs @@ -23,6 +23,8 @@ pub enum Counter { EncTotalSuperblocks, EncTotalKBsIn, EncTotalKBsOut, + DecSkippedSuperblocks, + DecSkippedBlocks, Count, } static TIME_ACC: [AtomicU64; Timer::Count as usize] = [ @@ -51,6 +53,8 @@ static COUNTS: [AtomicU64; Counter::Count as usize] = [ AtomicU64::new(0), AtomicU64::new(0), AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), ]; pub struct Stopwatch(Timer, std::time::Instant); diff --git a/src/statestream.rs b/src/statestream.rs index 80e8a90..b8fcb12 100644 --- a/src/statestream.rs +++ b/src/statestream.rs @@ -47,6 +47,7 @@ pub(crate) struct Ctx { last_superseq: Vec, block_index: BlockIndex, superblock_index: BlockIndex, + use_encode_state_comparisons: bool, } impl Ctx { @@ -58,6 +59,7 @@ impl Ctx { last_superseq: vec![], block_index: BlockIndex::new(block_size as usize), superblock_index: BlockIndex::new(superblock_size as usize), + use_encode_state_comparisons: true, } } } @@ -119,6 +121,7 @@ impl std::io::Read for Decoder<'_, '_, R> { /* a slightly degenerate read implementation in that it will keep * calling read on the inner reader until a complete checkpoint is * read, then return 0 for subsequent reads */ + #[allow(clippy::too_many_lines)] fn read(&mut self, outbuf: &mut [u8]) -> std::io::Result { use ParseState as State; use rmp::decode as r; @@ -185,16 +188,38 @@ impl std::io::Read for Decoder<'_, '_, R> { (State::WaitForSuperblockSeq, SSToken::SuperblockSeq) => { let arr_len = r::read_array_len(self.reader).map_err(std::io::Error::other)? as usize; + let last_state_valid = self.ctx.last_superseq.len() >= arr_len + && self.ctx.last_state.len() >= self.state_size; let block_byte_size = self.ctx.block_size as usize; let superblock_byte_size = self.ctx.superblock_size as usize * block_byte_size; let mut superseq = vec![0; arr_len]; self.ctx.last_state.resize(self.state_size, 0); + let mut skipped_superblocks = 0; + let mut skipped_blocks = 0; for (superblock_i, superseq_sblk) in superseq.iter_mut().enumerate() { let superblock_idx = r::read_int(self.reader).map_err(std::io::Error::other)?; *superseq_sblk = superblock_idx; + if last_state_valid + && self.ctx.last_superseq[superblock_i] == superblock_idx + { + // no need to copy bytes + skipped_superblocks += 1; + continue; + } let superblock_data = self.ctx.superblock_index.get(superblock_idx); for (block_i, block_id) in superblock_data.iter().copied().enumerate() { + if last_state_valid + && self + .ctx + .superblock_index + .get(self.ctx.last_superseq[superblock_i])[block_i] + == block_id + { + // no need to copy bytes + skipped_blocks += 1; + continue; + } let block_start = (superblock_i * superblock_byte_size + block_i * block_byte_size) .min(self.state_size); @@ -208,6 +233,8 @@ impl std::io::Read for Decoder<'_, '_, R> { .copy_from_slice(&block_bytes[0..(block_end - block_start)]); } } + clock::count(Counter::DecSkippedSuperblocks, skipped_superblocks); + clock::count(Counter::DecSkippedBlocks, skipped_blocks); self.ctx.last_superseq = superseq; state = State::Finished; self.finished = true; @@ -274,20 +301,50 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> { let mut reused_blocks = 0; let mut reused_superblocks = 0; let mut hashes = 0; + let mut skipped_blocks = 0; + let mut memcmps = 0; self.ctx .last_superseq .resize(superblock_count.max(self.ctx.last_superseq.len()), 0); let mut superblock_contents = vec![0_u32; superblock_size]; - for (superblock_i, superblock_bytes) in checkpoint.chunks(superblock_size_bytes).enumerate() + let can_compare_saves = if self.ctx.last_state.len() < checkpoint.len() { + self.ctx.last_state.clear(); + self.ctx.last_state.extend_from_slice(checkpoint); + false + } else { + self.ctx.last_state.truncate(checkpoint.len()); + self.ctx.use_encode_state_comparisons + }; + for (superblock_i, (superblock_bytes, last_state_superblock_bytes)) in (checkpoint + .chunks(superblock_size_bytes) + .zip(self.ctx.last_state.chunks(superblock_size_bytes))) + .enumerate() { + /* maybe: skip superblocks */ if superblock_bytes.len() < superblock_size_bytes { let block_count = (superblock_bytes.len() - 1) / block_size + 1; if block_count + 1 < superblock_size { superblock_contents[(block_count + 1)..].fill(0); } } - for (block_i, block_bytes) in superblock_bytes.chunks(block_size).enumerate() { - let found_block = if block_bytes.len() < block_size { + for (block_i, (block_bytes, last_state_block_bytes)) in (superblock_bytes + .chunks(block_size) + .zip(last_state_superblock_bytes.chunks(block_size))) + .enumerate() + { + memcmps += u64::from(can_compare_saves); + let found_block = if can_compare_saves + && block_bytes[..] == last_state_block_bytes[..block_bytes.len()] + { + skipped_blocks += 1; + blockindex::Insertion { + index: self + .ctx + .superblock_index + .get(self.ctx.last_superseq[superblock_i])[block_i], + is_new: false, + } + } else if block_bytes.len() < block_size { padded_block[block_bytes.len()..].fill(0); padded_block[..block_bytes.len()].copy_from_slice(block_bytes); hashes += 1; @@ -337,6 +394,8 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> { } clock::count(Counter::EncReusedBlocks, reused_blocks); clock::count(Counter::EncReusedSuperblocks, reused_superblocks); + clock::count(Counter::EncSkippedBlocks, skipped_blocks); + clock::count(Counter::EncMemCmps, memcmps); clock::count(Counter::EncHashes, hashes); self.ctx.last_superseq.truncate(superblock_count); bytes_out += rmp_size(r::write_uint(