add timing code to record stats

This commit is contained in:
2025-10-30 11:17:20 -07:00
parent 44cab660bc
commit 2fc777848a
5 changed files with 165 additions and 4 deletions
+30 -1
View File
@@ -1,4 +1,4 @@
use rply_codec::{Frame, decode, encode};
use rply_codec::{Counter, Frame, Timer, counts, decode, encode, stats};
fn main() {
let args: Vec<_> = std::env::args().collect();
@@ -16,6 +16,7 @@ fn main() {
println!("{header:?}");
let mut header_out = header.clone();
header_out.set_block_size(64);
header_out.set_superblock_size(32);
let mut out = encode(header_out, &rply.initial_state, &mut outfile).unwrap();
let mut frame = Frame::default();
while let Ok(()) = rply
@@ -42,4 +43,32 @@ fn main() {
assert_eq!(out.frame_number, rply.frame_number);
assert_eq!(out.header.frame_count(), rply.header.frame_count());
assert_eq!(out.header.frame_count(), Some(out.frame_number));
for timer in [
Timer::DecodeFrame,
Timer::DecodeCheckpoint,
Timer::DecodeStatestream,
Timer::EncodeStatestream,
Timer::EncodeCheckpoint,
Timer::EncodeStatestream,
] {
let times = stats(timer);
println!(
"{timer:?}: {} ({:.8}ms avg)",
times.count,
((times.micros as f64 / times.count as f64) / 1000.0)
);
}
for counter in [
Counter::EncReusedBlocks,
Counter::EncReusedSuperblocks,
Counter::EncSkippedBlocks,
Counter::EncMemCmps,
Counter::EncHashes,
Counter::EncTotalBlocks,
Counter::EncTotalSuperblocks,
Counter::EncTotalKBsIn,
Counter::EncTotalKBsOut,
] {
println!("{counter:?}: {}", counts(counter));
}
}
+90
View File
@@ -0,0 +1,90 @@
use std::sync::atomic::{AtomicU64, Ordering};
#[repr(usize)]
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Timer {
DecodeFrame = 0,
DecodeCheckpoint,
DecodeStatestream,
EncodeFrame,
EncodeCheckpoint,
EncodeStatestream,
Count,
}
#[repr(usize)]
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Counter {
EncReusedBlocks = 0,
EncReusedSuperblocks,
EncSkippedBlocks,
EncMemCmps,
EncHashes,
EncTotalBlocks,
EncTotalSuperblocks,
EncTotalKBsIn,
EncTotalKBsOut,
Count,
}
static TIME_ACC: [AtomicU64; Timer::Count as usize] = [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
];
static TIME_COUNTS: [AtomicU64; Timer::Count as usize] = [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
];
static COUNTS: [AtomicU64; Counter::Count as usize] = [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
];
pub struct Stopwatch(Timer, std::time::Instant);
impl Stopwatch {
fn new(t: Timer) -> Self {
Self(t, std::time::Instant::now())
}
}
impl Drop for Stopwatch {
fn drop(&mut self) {
TIME_ACC[self.0 as usize].fetch_add(
u64::try_from(self.1.elapsed().as_micros()).unwrap_or(u64::MAX),
Ordering::Relaxed,
);
TIME_COUNTS[self.0 as usize].fetch_add(1, Ordering::Relaxed);
}
}
pub fn time(t: Timer) -> Stopwatch {
Stopwatch::new(t)
}
pub fn count(c: Counter, amt: u64) -> u64 {
COUNTS[c as usize].fetch_add(amt, Ordering::Relaxed) + amt
}
pub struct Times {
pub count: u64,
pub micros: u64,
}
pub fn stats(t: Timer) -> Times {
Times {
count: TIME_COUNTS[t as usize].load(Ordering::Relaxed),
micros: TIME_ACC[t as usize].load(Ordering::Relaxed),
}
}
pub fn counts(c: Counter) -> u64 {
COUNTS[c as usize].load(Ordering::Relaxed)
}
+2
View File
@@ -1,5 +1,7 @@
mod clock;
mod rply;
mod statestream;
pub use clock::{Counter, Timer, Times, counts, stats};
pub use rply::*;
#[derive(Debug, thiserror::Error)]
+13 -1
View File
@@ -1,6 +1,10 @@
use std::io::Write;
use crate::{InvalidDeterminant, statestream};
use crate::{
InvalidDeterminant,
clock::{self, Timer},
statestream,
};
use thiserror::Error;
// #[repr(usize)]
@@ -260,6 +264,7 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
#[allow(clippy::too_many_lines)]
pub fn read_frame(&mut self, frame: &mut Frame) -> Result<()> {
use byteorder::{LittleEndian, ReadBytesExt};
let stopwatch = clock::time(Timer::DecodeFrame);
let vsn = self.header.version();
let rply = &mut *self.rply;
if vsn == 0 {
@@ -331,6 +336,7 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
}
}
self.frame_number += 1;
drop(stopwatch);
Ok(())
}
@@ -343,6 +349,7 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
fn decode_checkpoint(&mut self, checkpoint_bytes: &mut Vec<u8>) -> Result<()> {
use byteorder::{LittleEndian, ReadBytesExt};
let stopwatch = clock::time(Timer::DecodeCheckpoint);
let rply = &mut *self.rply;
// read a 1 byte compression code
let compression =
@@ -408,6 +415,7 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
)?;
}
}
drop(stopwatch);
Ok(())
}
}
@@ -494,6 +502,7 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> {
}
fn encode_checkpoint(&mut self, checkpoint: &[u8], frame: u64) -> Result<()> {
use byteorder::{LittleEndian, WriteBytesExt};
let stopwatch = clock::time(Timer::EncodeCheckpoint);
let compression = self.header.checkpoint_compression();
let encoding = Encoding::Statestream;
self.rply.write_u8(u8::from(compression))?;
@@ -568,6 +577,7 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> {
// write encoded compressed bytes
self.rply.write_u32::<LittleEndian>(compressed_size)?;
self.rply.seek(std::io::SeekFrom::Start(end_pos))?;
drop(stopwatch);
Ok(())
}
fn encode_initial_checkpoint(&mut self, checkpoint: &[u8]) -> Result<()> {
@@ -592,6 +602,7 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> {
/// [`ReplayError::CheckpointTooBig`]: Checkpoint data takes up more than 2^32 bytes
pub fn write_frame(&mut self, frame: &Frame) -> Result<()> {
use byteorder::{LittleEndian, WriteBytesExt};
let stopwatch = clock::time(Timer::EncodeFrame);
let start_pos = self.rply.stream_position()?;
self.rply.write_u32::<LittleEndian>(
u32::try_from(start_pos - self.last_pos).map_err(ReplayError::FrameTooLong)?,
@@ -625,6 +636,7 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> {
}
self.frame_number += 1;
self.last_pos = start_pos;
drop(stopwatch);
Ok(())
}
/// Finishes the encoding, writing the header in the process
+30 -2
View File
@@ -1,5 +1,8 @@
mod blockindex;
use crate::InvalidDeterminant;
use crate::{
InvalidDeterminant,
clock::{self, Counter, Timer},
};
use blockindex::BlockIndex;
use std::io::Write;
@@ -125,6 +128,7 @@ impl<R: std::io::Read> std::io::Read for Decoder<'_, '_, R> {
}
return self.readout(outbuf);
}
let stopwatch = clock::time(Timer::DecodeStatestream);
let mut frame = 0;
let mut state = State::WaitForStart;
let mut buf = vec![0_u8; self.ctx.block_size as usize];
@@ -148,6 +152,7 @@ impl<R: std::io::Read> std::io::Read for Decoder<'_, '_, R> {
return Err(std::io::Error::other(SSError::BlockWrongSize(bin_len)));
}
self.reader.read_exact(&mut buf)?;
// hashes += 1;
if !self
.ctx
.block_index
@@ -166,6 +171,7 @@ impl<R: std::io::Read> std::io::Read for Decoder<'_, '_, R> {
*superblock_elt =
r::read_int(self.reader).map_err(std::io::Error::other)?;
}
// hashes += 1;
if !self.ctx.superblock_index.insert_exact(
idx,
Box::from(superblock.clone()),
@@ -211,6 +217,7 @@ impl<R: std::io::Read> std::io::Read for Decoder<'_, '_, R> {
}
}
assert_eq!(state, State::Finished);
drop(stopwatch);
self.readout(outbuf)
}
}
@@ -245,6 +252,8 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> {
}
pub fn encode_checkpoint(mut self, checkpoint: &[u8], frame: u64) -> std::io::Result<u32> {
use rmp::encode as r;
let stopwatch = clock::time(Timer::EncodeStatestream);
clock::count(Counter::EncTotalKBsIn, (checkpoint.len() / 1024) as u64);
let mut bytes_out = 0;
bytes_out += rmp_size(r::write_uint(
&mut self.writer,
@@ -256,6 +265,14 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> {
let superblock_size = self.ctx.superblock_size as usize;
let superblock_size_bytes = block_size * superblock_size;
let superblock_count = ((checkpoint.len() - 1) / superblock_size_bytes) + 1;
clock::count(Counter::EncTotalSuperblocks, superblock_count as u64);
clock::count(
Counter::EncTotalBlocks,
(((checkpoint.len() - 1) / block_size) + 1) as u64,
);
let mut reused_blocks = 0;
let mut reused_superblocks = 0;
let mut hashes = 0;
self.ctx
.last_superseq
.resize(superblock_count.max(self.ctx.last_superseq.len()), 0);
@@ -272,8 +289,10 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> {
let found_block = if block_bytes.len() < block_size {
padded_block[block_bytes.len()..].fill(0);
padded_block[..block_bytes.len()].copy_from_slice(block_bytes);
hashes += 1;
self.ctx.block_index.insert(&padded_block, frame)
} else {
hashes += 1;
self.ctx.block_index.insert(block_bytes, frame)
};
superblock_contents[block_i] = found_block.index;
@@ -288,8 +307,11 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> {
bytes_out += rmp_size(r::write_bin_len(self.writer, self.ctx.block_size)?);
self.writer.write_all(block_out_bytes)?;
bytes_out += block_out_bytes.len();
} else {
reused_blocks += 1;
}
}
hashes += 1;
let found_superblock = self
.ctx
.superblock_index
@@ -308,8 +330,13 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> {
for blkid in &superblock_contents {
bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*blkid))?);
}
} else {
reused_superblocks += 1;
}
}
clock::count(Counter::EncReusedBlocks, reused_blocks);
clock::count(Counter::EncReusedSuperblocks, reused_superblocks);
clock::count(Counter::EncHashes, hashes);
self.ctx.last_superseq.truncate(superblock_count);
bytes_out += rmp_size(r::write_uint(
self.writer,
@@ -323,7 +350,8 @@ impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> {
for super_id in &self.ctx.last_superseq {
bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*super_id))?);
}
// commit
drop(stopwatch);
clock::count(Counter::EncTotalKBsOut, (bytes_out / 1024) as u64);
u32::try_from(bytes_out)
.map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e)))
}