From 2fe31a6b34365c02c7d27f08de6e82ca0a3889e8 Mon Sep 17 00:00:00 2001 From: "Joseph C. Osborn" Date: Wed, 29 Oct 2025 11:31:28 -0700 Subject: [PATCH] complete statestream encoding --- src/rply.rs | 16 +++++- src/statestream.rs | 100 ++++++++++++++++++++++++++++++++-- src/statestream/blockindex.rs | 6 +- 3 files changed, 110 insertions(+), 12 deletions(-) diff --git a/src/rply.rs b/src/rply.rs index cff46ca..23ae64f 100644 --- a/src/rply.rs +++ b/src/rply.rs @@ -572,7 +572,9 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> { self.rply .seek(std::io::SeekFrom::Start(HEADERV2_LEN_BYTES as u64))?; self.encode_checkpoint(checkpoint, 0)?; - self.header.set_initial_state_size(initial.len() as u32); + self.header.set_initial_state_size( + u32::try_from(initial.len()).map_err(ReplayError::CheckpointTooBig)?, + ); self.initial_state = initial; // Have to rewrite header to account for initial state size self.write_header()?; @@ -580,7 +582,13 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> { self.rply.seek(std::io::SeekFrom::Start(old_pos))?; Ok(()) } + /// Writes a single frame at the current encoder position. + /// # Errors + /// [`ReplayError::FrameTooLong`]: Frame encoded to more than 2^32 bytes, backrefs invalid + /// [`ReplayError::TooManyKeyEvents`]: More key events than allowed by spec + /// [`ReplayError::TooManyInputEvents`]: More input events than allowed by spec + /// [`ReplayError::CheckpointTooBig`]: Checkpoint data takes up more than 2^32 bytes pub fn write_frame(&mut self, frame: &Frame) -> Result<()> { use byteorder::{LittleEndian, WriteBytesExt}; let start_pos = self.rply.stream_position()?; @@ -619,6 +627,8 @@ impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> { Ok(()) } /// Finishes the encoding, writing the header in the process + /// # Errors + /// [`ReplayError::IO`]: Underlying writer fails to write header pub fn finish(&mut self) -> Result<()> { if self.finished { return Ok(()); @@ -639,9 +649,9 @@ impl Drop for ReplayEncoder<'_, W> { /// /// # Errors /// See [`ReplayEncoder::new`]. -pub fn encode<'w, 's, W: std::io::Write + std::io::Seek>( +pub fn encode<'w, W: std::io::Write + std::io::Seek>( header: Header, - initial_state: &'s [u8], + initial_state: &[u8], rply: &'w mut W, ) -> Result> { ReplayEncoder::new(header, initial_state, rply) diff --git a/src/statestream.rs b/src/statestream.rs index a38424c..91df561 100644 --- a/src/statestream.rs +++ b/src/statestream.rs @@ -220,16 +220,106 @@ pub(crate) struct Encoder<'w, 'c, W: std::io::Write> { ctx: &'c mut Ctx, } +/* Does not include the size of the str,arr,map,ext contents */ +fn rmp_size(m: rmp::Marker) -> usize { + #[allow( + clippy::enum_glob_use, + reason = "If any new variants are added, the match will cease to be exhaustive" + )] + use rmp::Marker::*; + match m { + FixPos(_) | FixNeg(_) | Null | Reserved | False | True | FixMap(_) | FixArray(_) + | FixStr(_) => 1, + U8 | I8 | Bin8 | Str8 | FixExt1 | FixExt2 | FixExt4 | FixExt8 | FixExt16 => 2, + U16 | I16 | Bin16 | Ext8 | Str16 | Array16 | Map16 => 3, + Ext16 => 4, + U32 | I32 | F32 | Bin32 | Str32 | Array32 | Map32 => 5, + Ext32 => 6, + U64 | I64 | F64 => 9, + } +} + impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> { pub(crate) fn new(writer: &'w mut W, ctx: &'c mut Ctx) -> Self { Self { writer, ctx } } pub fn encode_checkpoint(mut self, checkpoint: &[u8], frame: u64) -> std::io::Result { use rmp::encode as r; - r::write_uint(&mut self.writer, u64::from(u8::from(SSToken::Start)))?; - r::write_uint(&mut self.writer, frame)?; - todo!(); - - Ok(0) + let mut bytes_out = 0; + bytes_out += rmp_size(r::write_uint( + &mut self.writer, + u64::from(u8::from(SSToken::Start)), + )?); + bytes_out += rmp_size(r::write_uint(&mut self.writer, frame)?); + let block_size = self.ctx.block_size as usize; + let mut padded_block = vec![0; block_size]; + let superblock_size = self.ctx.superblock_size as usize; + let superblock_size_bytes = block_size * superblock_size; + let superblock_count = ((checkpoint.len() - 1) / superblock_size_bytes) + 1; + let mut superblock_contents = vec![0_u32; superblock_size]; + for (superblock_i, superblock_bytes) in checkpoint.chunks(superblock_size_bytes).enumerate() + { + if superblock_bytes.len() < superblock_size_bytes { + let block_count = (superblock_bytes.len() - 1) / block_size + 1; + if block_count + 1 < superblock_size { + superblock_contents[(block_count + 1)..].fill(0); + } + } + for (block_i, block_bytes) in superblock_bytes.chunks(block_size).enumerate() { + let found_block = if block_bytes.len() < block_size { + padded_block[block_bytes.len()..].fill(0); + padded_block[..block_bytes.len()].copy_from_slice(block_bytes); + self.ctx.block_index.insert(&padded_block, frame) + } else { + self.ctx.block_index.insert(block_bytes, frame) + }; + superblock_contents[block_i] = found_block.index; + if found_block.is_new { + bytes_out += rmp_size(r::write_uint( + self.writer, + u64::from(u8::from(SSToken::NewBlock)), + )?); + bytes_out += + rmp_size(r::write_uint(self.writer, u64::from(found_block.index))?); + bytes_out += rmp_size(r::write_bin_len(self.writer, self.ctx.block_size)?); + self.writer.write_all(block_bytes)?; + bytes_out += block_bytes.len(); + } + } + let found_superblock = self + .ctx + .superblock_index + .insert(&superblock_contents, frame); + self.ctx.last_superseq[superblock_i] = found_superblock.index; + if found_superblock.is_new { + bytes_out += rmp_size(r::write_uint( + self.writer, + u64::from(u8::from(SSToken::NewSuperblock)), + )?); + bytes_out += rmp_size(r::write_uint( + self.writer, + u64::from(found_superblock.index), + )?); + bytes_out += rmp_size(r::write_array_len(self.writer, self.ctx.superblock_size)?); + for blkid in &superblock_contents { + bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*blkid))?); + } + } + } + bytes_out += rmp_size(r::write_uint( + self.writer, + u64::from(u8::from(SSToken::SuperblockSeq)), + )?); + bytes_out += rmp_size(r::write_array_len( + self.writer, + u32::try_from(superblock_count) + .map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e)))?, + )?); + for super_id in &self.ctx.last_superseq { + bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*super_id))?); + } + // commit + u32::try_from(bytes_out) + .map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e))) } } diff --git a/src/statestream/blockindex.rs b/src/statestream/blockindex.rs index 1b7ab52..f039831 100644 --- a/src/statestream/blockindex.rs +++ b/src/statestream/blockindex.rs @@ -18,10 +18,9 @@ pub(crate) struct BlockIndex< object_size: usize, } -#[expect(unused)] pub(crate) struct Insertion { - index: u32, - is_new: bool, + pub index: u32, + pub is_new: bool, } fn hash(val: &[T]) -> u64 { @@ -43,7 +42,6 @@ impl Insertion { assert_eq!(obj.len(), self.object_size); let hash = hash(obj);