diff --git a/src/bin/reencode.rs b/src/bin/reencode.rs index 6a5af96..4bd474e 100644 --- a/src/bin/reencode.rs +++ b/src/bin/reencode.rs @@ -1,4 +1,4 @@ -use rply_codec::{Frame, decode, encode}; +use rply_codec::{Counter, Frame, Timer, counts, decode, encode, stats}; fn main() { let args: Vec<_> = std::env::args().collect(); @@ -16,6 +16,7 @@ fn main() { println!("{header:?}"); let mut header_out = header.clone(); header_out.set_block_size(64); + header_out.set_superblock_size(32); let mut out = encode(header_out, &rply.initial_state, &mut outfile).unwrap(); let mut frame = Frame::default(); while let Ok(()) = rply @@ -42,4 +43,32 @@ fn main() { assert_eq!(out.frame_number, rply.frame_number); assert_eq!(out.header.frame_count(), rply.header.frame_count()); assert_eq!(out.header.frame_count(), Some(out.frame_number)); + for timer in [ + Timer::DecodeFrame, + Timer::DecodeCheckpoint, + Timer::DecodeStatestream, + Timer::EncodeStatestream, + Timer::EncodeCheckpoint, + Timer::EncodeStatestream, + ] { + let times = stats(timer); + println!( + "{timer:?}: {} ({:.8}ms avg)", + times.count, + ((times.micros as f64 / times.count as f64) / 1000.0) + ); + } + for counter in [ + Counter::EncReusedBlocks, + Counter::EncReusedSuperblocks, + Counter::EncSkippedBlocks, + Counter::EncMemCmps, + Counter::EncHashes, + Counter::EncTotalBlocks, + Counter::EncTotalSuperblocks, + Counter::EncTotalKBsIn, + Counter::EncTotalKBsOut, + ] { + println!("{counter:?}: {}", counts(counter)); + } } diff --git a/src/clock.rs b/src/clock.rs new file mode 100644 index 0000000..e26dcad --- /dev/null +++ b/src/clock.rs @@ -0,0 +1,90 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +#[repr(usize)] +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum Timer { + DecodeFrame = 0, + DecodeCheckpoint, + DecodeStatestream, + EncodeFrame, + EncodeCheckpoint, + EncodeStatestream, + Count, +} +#[repr(usize)] +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum Counter { + EncReusedBlocks = 0, + EncReusedSuperblocks, + EncSkippedBlocks, + EncMemCmps, + EncHashes, + EncTotalBlocks, + EncTotalSuperblocks, + EncTotalKBsIn, + EncTotalKBsOut, + Count, +} +static TIME_ACC: [AtomicU64; Timer::Count as usize] = [ + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), +]; +static TIME_COUNTS: [AtomicU64; Timer::Count as usize] = [ + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), +]; +static COUNTS: [AtomicU64; Counter::Count as usize] = [ + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), + AtomicU64::new(0), +]; + +pub struct Stopwatch(Timer, std::time::Instant); +impl Stopwatch { + fn new(t: Timer) -> Self { + Self(t, std::time::Instant::now()) + } +} +impl Drop for Stopwatch { + fn drop(&mut self) { + TIME_ACC[self.0 as usize].fetch_add( + u64::try_from(self.1.elapsed().as_micros()).unwrap_or(u64::MAX), + Ordering::Relaxed, + ); + TIME_COUNTS[self.0 as usize].fetch_add(1, Ordering::Relaxed); + } +} + +pub fn time(t: Timer) -> Stopwatch { + Stopwatch::new(t) +} +pub fn count(c: Counter, amt: u64) -> u64 { + COUNTS[c as usize].fetch_add(amt, Ordering::Relaxed) + amt +} +pub struct Times { + pub count: u64, + pub micros: u64, +} +pub fn stats(t: Timer) -> Times { + Times { + count: TIME_COUNTS[t as usize].load(Ordering::Relaxed), + micros: TIME_ACC[t as usize].load(Ordering::Relaxed), + } +} +pub fn counts(c: Counter) -> u64 { + COUNTS[c as usize].load(Ordering::Relaxed) +} diff --git a/src/lib.rs b/src/lib.rs index dfaf4c5..5244ef6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ +mod clock; mod rply; mod statestream; +pub use clock::{Counter, Timer, Times, counts, stats}; pub use rply::*; #[derive(Debug, thiserror::Error)] diff --git a/src/rply.rs b/src/rply.rs index 28abd1c..893a0c7 100644 --- a/src/rply.rs +++ b/src/rply.rs @@ -1,6 +1,10 @@ use std::io::Write; -use crate::{InvalidDeterminant, statestream}; +use crate::{ + InvalidDeterminant, + clock::{self, Timer}, + statestream, +}; use thiserror::Error; // #[repr(usize)] @@ -260,6 +264,7 @@ impl ReplayDecoder<'_, R> { #[allow(clippy::too_many_lines)] pub fn read_frame(&mut self, frame: &mut Frame) -> Result<()> { use byteorder::{LittleEndian, ReadBytesExt}; + let stopwatch = clock::time(Timer::DecodeFrame); let vsn = self.header.version(); let rply = &mut *self.rply; if vsn == 0 { @@ -331,6 +336,7 @@ impl ReplayDecoder<'_, R> { } } self.frame_number += 1; + drop(stopwatch); Ok(()) } @@ -343,6 +349,7 @@ impl ReplayDecoder<'_, R> { fn decode_checkpoint(&mut self, checkpoint_bytes: &mut Vec) -> Result<()> { use byteorder::{LittleEndian, ReadBytesExt}; + let stopwatch = clock::time(Timer::DecodeCheckpoint); let rply = &mut *self.rply; // read a 1 byte compression code let compression = @@ -408,6 +415,7 @@ impl ReplayDecoder<'_, R> { )?; } } + drop(stopwatch); Ok(()) } } @@ -494,6 +502,7 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> { } fn encode_checkpoint(&mut self, checkpoint: &[u8], frame: u64) -> Result<()> { use byteorder::{LittleEndian, WriteBytesExt}; + let stopwatch = clock::time(Timer::EncodeCheckpoint); let compression = self.header.checkpoint_compression(); let encoding = Encoding::Statestream; self.rply.write_u8(u8::from(compression))?; @@ -568,6 +577,7 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> { // write encoded compressed bytes self.rply.write_u32::(compressed_size)?; self.rply.seek(std::io::SeekFrom::Start(end_pos))?; + drop(stopwatch); Ok(()) } fn encode_initial_checkpoint(&mut self, checkpoint: &[u8]) -> Result<()> { @@ -592,6 +602,7 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> { /// [`ReplayError::CheckpointTooBig`]: Checkpoint data takes up more than 2^32 bytes pub fn write_frame(&mut self, frame: &Frame) -> Result<()> { use byteorder::{LittleEndian, WriteBytesExt}; + let stopwatch = clock::time(Timer::EncodeFrame); let start_pos = self.rply.stream_position()?; self.rply.write_u32::( u32::try_from(start_pos - self.last_pos).map_err(ReplayError::FrameTooLong)?, @@ -625,6 +636,7 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> { } self.frame_number += 1; self.last_pos = start_pos; + drop(stopwatch); Ok(()) } /// Finishes the encoding, writing the header in the process diff --git a/src/statestream.rs b/src/statestream.rs index 9f417d8..e2bc1f4 100644 --- a/src/statestream.rs +++ b/src/statestream.rs @@ -1,5 +1,8 @@ mod blockindex; -use crate::InvalidDeterminant; +use crate::{ + InvalidDeterminant, + clock::{self, Counter, Timer}, +}; use blockindex::BlockIndex; use std::io::Write; @@ -125,6 +128,7 @@ impl std::io::Read for Decoder<'_, '_, R> { } return self.readout(outbuf); } + let stopwatch = clock::time(Timer::DecodeStatestream); let mut frame = 0; let mut state = State::WaitForStart; let mut buf = vec![0_u8; self.ctx.block_size as usize]; @@ -148,6 +152,7 @@ impl std::io::Read for Decoder<'_, '_, R> { return Err(std::io::Error::other(SSError::BlockWrongSize(bin_len))); } self.reader.read_exact(&mut buf)?; + // hashes += 1; if !self .ctx .block_index @@ -166,6 +171,7 @@ impl std::io::Read for Decoder<'_, '_, R> { *superblock_elt = r::read_int(self.reader).map_err(std::io::Error::other)?; } + // hashes += 1; if !self.ctx.superblock_index.insert_exact( idx, Box::from(superblock.clone()), @@ -211,6 +217,7 @@ impl std::io::Read for Decoder<'_, '_, R> { } } assert_eq!(state, State::Finished); + drop(stopwatch); self.readout(outbuf) } } @@ -245,6 +252,8 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> { } pub fn encode_checkpoint(mut self, checkpoint: &[u8], frame: u64) -> std::io::Result { use rmp::encode as r; + let stopwatch = clock::time(Timer::EncodeStatestream); + clock::count(Counter::EncTotalKBsIn, (checkpoint.len() / 1024) as u64); let mut bytes_out = 0; bytes_out += rmp_size(r::write_uint( &mut self.writer, @@ -256,6 +265,14 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> { let superblock_size = self.ctx.superblock_size as usize; let superblock_size_bytes = block_size * superblock_size; let superblock_count = ((checkpoint.len() - 1) / superblock_size_bytes) + 1; + clock::count(Counter::EncTotalSuperblocks, superblock_count as u64); + clock::count( + Counter::EncTotalBlocks, + (((checkpoint.len() - 1) / block_size) + 1) as u64, + ); + let mut reused_blocks = 0; + let mut reused_superblocks = 0; + let mut hashes = 0; self.ctx .last_superseq .resize(superblock_count.max(self.ctx.last_superseq.len()), 0); @@ -272,8 +289,10 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> { let found_block = if block_bytes.len() < block_size { padded_block[block_bytes.len()..].fill(0); padded_block[..block_bytes.len()].copy_from_slice(block_bytes); + hashes += 1; self.ctx.block_index.insert(&padded_block, frame) } else { + hashes += 1; self.ctx.block_index.insert(block_bytes, frame) }; superblock_contents[block_i] = found_block.index; @@ -288,8 +307,11 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> { bytes_out += rmp_size(r::write_bin_len(self.writer, self.ctx.block_size)?); self.writer.write_all(block_out_bytes)?; bytes_out += block_out_bytes.len(); + } else { + reused_blocks += 1; } } + hashes += 1; let found_superblock = self .ctx .superblock_index @@ -308,8 +330,13 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> { for blkid in &superblock_contents { bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*blkid))?); } + } else { + reused_superblocks += 1; } } + clock::count(Counter::EncReusedBlocks, reused_blocks); + clock::count(Counter::EncReusedSuperblocks, reused_superblocks); + clock::count(Counter::EncHashes, hashes); self.ctx.last_superseq.truncate(superblock_count); bytes_out += rmp_size(r::write_uint( self.writer, @@ -323,7 +350,8 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> { for super_id in &self.ctx.last_superseq { bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*super_id))?); } - // commit + drop(stopwatch); + clock::count(Counter::EncTotalKBsOut, (bytes_out / 1024) as u64); u32::try_from(bytes_out) .map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e))) }