refactor, read initial checkpoint

This commit is contained in:
2025-10-28 09:47:02 -07:00
parent 8da2da77f8
commit b02a4e9f35
+24 -13
View File
@@ -187,9 +187,7 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
let checkpoint_commit_threshold = ((cp_config >> 16) & 0xFF) as u8; let checkpoint_commit_threshold = ((cp_config >> 16) & 0xFF) as u8;
let checkpoint_compression = Compression::try_from(((cp_config >> 8) & 0xFF) as u8) let checkpoint_compression = Compression::try_from(((cp_config >> 8) & 0xFF) as u8)
.map_err(ReplayError::Compression)?; .map_err(ReplayError::Compression)?;
rply.read_exact(initial_state.as_mut_slice())?; let mut replay = ReplayDecoder {
// TODO: decode if version is 2
Ok(ReplayDecoder {
rply, rply,
initial_state, initial_state,
header: Header::V2(HeaderV2 { header: Header::V2(HeaderV2 {
@@ -203,7 +201,13 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
}), }),
frame_number: 0, frame_number: 0,
ss_state: statestream::Ctx::new(block_size, superblock_size), ss_state: statestream::Ctx::new(block_size, superblock_size),
}) };
if replay.header.version() == 1 {
replay.rply.read_exact(&mut replay.initial_state)?;
} else {
replay.decode_initial_checkpoint()?;
}
Ok(replay)
} }
/// Reads a single frame at the current decoder position. /// Reads a single frame at the current decoder position.
@@ -284,13 +288,20 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?; rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?;
} }
FrameToken::Checkpoint2 => { FrameToken::Checkpoint2 => {
self.decode_checkpoint(frame)?; self.decode_checkpoint(&mut frame.checkpoint_bytes)?;
} }
} }
Ok(()) Ok(())
} }
fn decode_checkpoint(&mut self, frame: &mut Frame) -> Result<()> { fn decode_initial_checkpoint(&mut self) -> Result<()> {
let mut initial_state = std::mem::take(&mut self.initial_state);
self.decode_checkpoint(&mut initial_state)?;
self.initial_state = initial_state;
Ok(())
}
fn decode_checkpoint(&mut self, checkpoint_bytes: &mut Vec<u8>) -> Result<()> {
use byteorder::{LittleEndian, ReadBytesExt}; use byteorder::{LittleEndian, ReadBytesExt};
let rply = &mut *self.rply; let rply = &mut *self.rply;
// read a 1 byte compression code // read a 1 byte compression code
@@ -306,18 +317,18 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
// read a 4 byte compressed encoded size // read a 4 byte compressed encoded size
#[expect(unused)] #[expect(unused)]
let comp_enc_size = rply.read_u32::<LittleEndian>()? as usize; let comp_enc_size = rply.read_u32::<LittleEndian>()? as usize;
frame.checkpoint_bytes.resize(uc_ue_size, 0); checkpoint_bytes.resize(uc_ue_size, 0);
// maybe decompress // maybe decompress
match (compression, encoding) { match (compression, encoding) {
(Compression::None, Encoding::Raw) => { (Compression::None, Encoding::Raw) => {
rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?; rply.read_exact(checkpoint_bytes.as_mut_slice())?;
} }
(Compression::None, Encoding::Statestream) => { (Compression::None, Encoding::Statestream) => {
let mut ss_decoder = let mut ss_decoder =
statestream::Decoder::new(rply, &mut self.ss_state, uc_ue_size); statestream::Decoder::new(rply, &mut self.ss_state, uc_ue_size);
std::io::copy( std::io::copy(
&mut ss_decoder, &mut ss_decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), &mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
)?; )?;
} }
(Compression::Zlib, Encoding::Raw) => { (Compression::Zlib, Encoding::Raw) => {
@@ -325,7 +336,7 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
let mut decoder = ZlibDecoder::new(rply); let mut decoder = ZlibDecoder::new(rply);
std::io::copy( std::io::copy(
&mut decoder, &mut decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), &mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
)?; )?;
} }
(Compression::Zlib, Encoding::Statestream) => { (Compression::Zlib, Encoding::Statestream) => {
@@ -335,7 +346,7 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size); statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size);
std::io::copy( std::io::copy(
&mut ss_decoder, &mut ss_decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), &mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
)?; )?;
} }
(Compression::Zstd, Encoding::Raw) => { (Compression::Zstd, Encoding::Raw) => {
@@ -343,7 +354,7 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
let mut decoder = Decoder::with_buffer(rply)?.single_frame(); let mut decoder = Decoder::with_buffer(rply)?.single_frame();
std::io::copy( std::io::copy(
&mut decoder, &mut decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), &mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
)?; )?;
} }
(Compression::Zstd, Encoding::Statestream) => { (Compression::Zstd, Encoding::Statestream) => {
@@ -353,7 +364,7 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size); statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size);
std::io::copy( std::io::copy(
&mut ss_decoder, &mut ss_decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), &mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
)?; )?;
} }
} }