diff --git a/src/lib.rs b/src/lib.rs index 0ecdfec..dfaf4c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,14 @@ mod rply; mod statestream; pub use rply::*; +#[derive(Debug, thiserror::Error)] +pub struct InvalidDeterminant(pub u8); +impl std::fmt::Display for InvalidDeterminant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/rply.rs b/src/rply.rs index 72b6e24..027fff6 100644 --- a/src/rply.rs +++ b/src/rply.rs @@ -1,14 +1,6 @@ -use crate::statestream; +use crate::{InvalidDeterminant, statestream}; use thiserror::Error; -#[derive(Debug, Error)] -pub struct InvalidDeterminant(pub u8); -impl std::fmt::Display for InvalidDeterminant { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - // #[repr(usize)] // pub enum HeaderV0V1Part { // Magic = 0, @@ -52,29 +44,6 @@ impl From for FrameToken { } } -#[repr(u8)] -#[non_exhaustive] -#[derive(Debug)] -pub enum SSToken { - Start = 0, - NewBlock = 1, - NewSuperblock = 2, - SuperblockSeq = 3, -} -impl TryFrom for SSToken { - type Error = InvalidDeterminant; - - fn try_from(value: u8) -> std::result::Result { - match value { - 0 => Ok(SSToken::Start), - 1 => Ok(SSToken::NewBlock), - 2 => Ok(SSToken::NewSuperblock), - 3 => Ok(SSToken::SuperblockSeq), - _ => Err(InvalidDeterminant(value)), - } - } -} - #[repr(u8)] #[non_exhaustive] #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -332,8 +301,10 @@ impl ReplayDecoder<'_, R> { // read a 4 byte uncompressed unencoded size let uc_ue_size = rply.read_u32::()? as usize; // read a 4 byte uncompressed encoded size + #[expect(unused)] let uc_enc_size = rply.read_u32::()? as usize; // read a 4 byte compressed encoded size + #[expect(unused)] let comp_enc_size = rply.read_u32::()? as usize; frame.checkpoint_bytes.resize(uc_ue_size, 0); // maybe decompress @@ -342,7 +313,8 @@ impl ReplayDecoder<'_, R> { rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?; } (Compression::None, Encoding::Statestream) => { - let mut ss_decoder = statestream::Decoder::new(rply, &mut self.ss_state); + let mut ss_decoder = + statestream::Decoder::new(rply, &mut self.ss_state, uc_ue_size); std::io::copy( &mut ss_decoder, &mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), @@ -359,7 +331,8 @@ impl ReplayDecoder<'_, R> { (Compression::Zlib, Encoding::Statestream) => { use flate2::bufread::ZlibDecoder; let mut decoder = ZlibDecoder::new(rply); - let mut ss_decoder = statestream::Decoder::new(&mut decoder, &mut self.ss_state); + let mut ss_decoder = + statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size); std::io::copy( &mut ss_decoder, &mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), @@ -376,7 +349,8 @@ impl ReplayDecoder<'_, R> { (Compression::Zstd, Encoding::Statestream) => { use zstd::Decoder; let mut decoder = Decoder::with_buffer(rply)?.single_frame(); - let mut ss_decoder = statestream::Decoder::new(&mut decoder, &mut self.ss_state); + let mut ss_decoder = + statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size); std::io::copy( &mut ss_decoder, &mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), diff --git a/src/statestream.rs b/src/statestream.rs index 53f1349..f515154 100644 --- a/src/statestream.rs +++ b/src/statestream.rs @@ -1,5 +1,30 @@ mod blockindex; +use crate::InvalidDeterminant; use blockindex::BlockIndex; +use std::io::Write; + +#[repr(u8)] +#[non_exhaustive] +#[derive(Debug)] +pub enum SSToken { + Start = 0, + NewBlock = 1, + NewSuperblock = 2, + SuperblockSeq = 3, +} +impl TryFrom for SSToken { + type Error = InvalidDeterminant; + + fn try_from(value: u8) -> std::result::Result { + match value { + 0 => Ok(SSToken::Start), + 1 => Ok(SSToken::NewBlock), + 2 => Ok(SSToken::NewSuperblock), + 3 => Ok(SSToken::SuperblockSeq), + _ => Err(InvalidDeterminant(value)), + } + } +} pub(crate) struct Ctx { block_size: u32, @@ -26,31 +51,152 @@ impl Ctx { pub(crate) struct Decoder<'r, 'c, R: std::io::Read> { reader: &'r mut R, ctx: &'c mut Ctx, + state_size: usize, finished: bool, + readout_cursor: usize, } impl<'r, 'c, R: std::io::Read> Decoder<'r, 'c, R> { - pub(crate) fn new(reader: &'r mut R, ctx: &'c mut Ctx) -> Self { + pub(crate) fn new(reader: &'r mut R, ctx: &'c mut Ctx, state_size: usize) -> Self { Self { reader, ctx, finished: false, + readout_cursor: 0, + state_size, + } + } + fn readout(&mut self, mut buf: &mut [u8]) -> std::io::Result { + match buf.write(&self.ctx.last_state[self.readout_cursor..]) { + Err(e) => Err(e), + Ok(sz) => { + self.readout_cursor += sz; + Ok(sz) + } } } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ParseState { + WaitForStart, + WaitForSuperblockSeq, + Finished, +} -impl<'r, 'c, R: std::io::Read> std::io::Read for Decoder<'r, 'c, R> { +#[derive(thiserror::Error, Debug)] +enum SSError { + #[error("Invalid token {0}")] + InvalidToken(#[from] InvalidDeterminant), + #[error("Too many start tokens in stream")] + TooManyStarts(), + #[error("Unexpected {1:?} during {0:?}")] + ParseError(ParseState, SSToken), + #[error("Block {0} is the wrong size")] + BlockWrongSize(u32), + #[error("Superblock {0} is the wrong size")] + SuperblockWrongSize(u32), + #[error("Couldn't insert block at {1} on frame {0}")] + BadBlockInsert(u64, u32), + #[error("Couldn't insert superblock at {1} on frame {0}")] + BadSuperblockInsert(u64, u32), +} + +impl std::io::Read for Decoder<'_, '_, R> { /* a slightly degenerate read implementation in that it will keep * calling read on the inner reader until a complete checkpoint is * read, then return 0 for subsequent reads */ - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + fn read(&mut self, outbuf: &mut [u8]) -> std::io::Result { + use ParseState as State; use rmp::decode as r; if self.finished { - return Ok(0); + if self.readout_cursor == self.state_size { + return Ok(0); + } + return self.readout(outbuf); } - - let sz = 0; - todo!(); - Ok(sz) + let mut frame = 0; + let mut state = State::WaitForStart; + let mut buf = vec![0_u8; self.ctx.block_size as usize]; + let mut superblock = vec![0_u32; self.ctx.superblock_size as usize]; + loop { + let tok: u8 = r::read_int(self.reader).map_err(std::io::Error::other)?; + match ( + state, + SSToken::try_from(tok) + .map_err(|e| std::io::Error::other(SSError::InvalidToken(e)))?, + ) { + (State::WaitForStart, SSToken::Start) => { + frame = r::read_int(self.reader).map_err(std::io::Error::other)?; + state = State::WaitForSuperblockSeq; + } + (_, SSToken::Start) => return Err(std::io::Error::other(SSError::TooManyStarts())), + (State::WaitForSuperblockSeq, SSToken::NewBlock) => { + let idx = r::read_int(self.reader).map_err(std::io::Error::other)?; + let bin_len = r::read_bin_len(self.reader).map_err(std::io::Error::other)?; + if bin_len != self.ctx.block_size { + return Err(std::io::Error::other(SSError::BlockWrongSize(bin_len))); + } + self.reader.read_exact(&mut buf)?; + if !self + .ctx + .block_index + .insert_exact(idx, Box::from(buf.clone()), frame) + { + return Err(std::io::Error::other(SSError::BadBlockInsert(frame, idx))); + } + } + (State::WaitForSuperblockSeq, SSToken::NewSuperblock) => { + let idx = r::read_int(self.reader).map_err(std::io::Error::other)?; + let arr_len = r::read_array_len(self.reader).map_err(std::io::Error::other)?; + if arr_len != self.ctx.superblock_size { + return Err(std::io::Error::other(SSError::SuperblockWrongSize(arr_len))); + } + for superblock_elt in &mut superblock { + *superblock_elt = + r::read_int(self.reader).map_err(std::io::Error::other)?; + } + if !self.ctx.superblock_index.insert_exact( + idx, + Box::from(superblock.clone()), + frame, + ) { + return Err(std::io::Error::other(SSError::BadSuperblockInsert( + frame, idx, + ))); + } + } + (State::WaitForSuperblockSeq, SSToken::SuperblockSeq) => { + let arr_len = + r::read_array_len(self.reader).map_err(std::io::Error::other)? as usize; + let block_byte_size = self.ctx.block_size as usize; + let superblock_byte_size = self.ctx.superblock_size as usize * block_byte_size; + self.ctx.last_state.resize(self.state_size, 0); + for superblock_i in 0..arr_len { + let superblock_idx = + r::read_int(self.reader).map_err(std::io::Error::other)?; + self.ctx.last_superseq[superblock_i] = superblock_idx; + let superblock_data = self.ctx.superblock_index.get(superblock_idx); + for (block_i, block_id) in superblock_data.iter().copied().enumerate() { + let block_start = (superblock_i * superblock_byte_size + + block_i * block_byte_size) + .min(self.state_size); + let block_end = (block_start + block_byte_size).min(self.state_size); + let block_bytes = self.ctx.block_index.get(block_id); + if block_end <= block_start { + // This can happen in the last superblock if it was padded with extra blocks + break; + } + self.ctx.last_state[block_start..block_end] + .copy_from_slice(&block_bytes[0..(block_end - block_start)]); + } + } + state = State::Finished; + break; + } + (s, tok) => return Err(std::io::Error::other(SSError::ParseError(s, tok))), + } + } + assert_eq!(state, State::Finished); + self.readout(outbuf) } } diff --git a/src/statestream/blockindex.rs b/src/statestream/blockindex.rs index 1a61125..1b7ab52 100644 --- a/src/statestream/blockindex.rs +++ b/src/statestream/blockindex.rs @@ -18,6 +18,7 @@ pub(crate) struct BlockIndex< object_size: usize, } +#[expect(unused)] pub(crate) struct Insertion { index: u32, is_new: bool, @@ -42,7 +43,9 @@ impl Insertion { + assert_eq!(obj.len(), self.object_size); let hash = hash(obj); match self.index.entry(hash) { std::collections::hash_map::Entry::Occupied(mut e) => { @@ -81,6 +84,7 @@ impl, _frame: u64) -> bool { + assert_eq!(obj.len(), self.object_size); if self.objects.len() != idx as usize { return false; } @@ -88,17 +92,19 @@ impl &[T] { &self.objects[which as usize] } + #[expect(unused)] pub fn clear(&mut self) { self.index.clear(); self.objects.truncate(1); self.hashes.truncate(1); self.index.insert(self.hashes[0], smallvec![0]); } + #[expect(unused)] pub fn len(&self) -> usize { self.objects.len() }