draft of statestream reading
This commit is contained in:
@@ -2,6 +2,14 @@ mod rply;
|
|||||||
mod statestream;
|
mod statestream;
|
||||||
pub use rply::*;
|
pub use rply::*;
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub struct InvalidDeterminant(pub u8);
|
||||||
|
impl std::fmt::Display for InvalidDeterminant {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{}", self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
+9
-35
@@ -1,14 +1,6 @@
|
|||||||
use crate::statestream;
|
use crate::{InvalidDeterminant, statestream};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub struct InvalidDeterminant(pub u8);
|
|
||||||
impl std::fmt::Display for InvalidDeterminant {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(f, "{}", self.0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// #[repr(usize)]
|
// #[repr(usize)]
|
||||||
// pub enum HeaderV0V1Part {
|
// pub enum HeaderV0V1Part {
|
||||||
// Magic = 0,
|
// Magic = 0,
|
||||||
@@ -52,29 +44,6 @@ impl From<u8> for FrameToken {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[repr(u8)]
|
|
||||||
#[non_exhaustive]
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum SSToken {
|
|
||||||
Start = 0,
|
|
||||||
NewBlock = 1,
|
|
||||||
NewSuperblock = 2,
|
|
||||||
SuperblockSeq = 3,
|
|
||||||
}
|
|
||||||
impl TryFrom<u8> for SSToken {
|
|
||||||
type Error = InvalidDeterminant;
|
|
||||||
|
|
||||||
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
|
|
||||||
match value {
|
|
||||||
0 => Ok(SSToken::Start),
|
|
||||||
1 => Ok(SSToken::NewBlock),
|
|
||||||
2 => Ok(SSToken::NewSuperblock),
|
|
||||||
3 => Ok(SSToken::SuperblockSeq),
|
|
||||||
_ => Err(InvalidDeterminant(value)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[repr(u8)]
|
#[repr(u8)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||||
@@ -332,8 +301,10 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
|
|||||||
// read a 4 byte uncompressed unencoded size
|
// read a 4 byte uncompressed unencoded size
|
||||||
let uc_ue_size = rply.read_u32::<LittleEndian>()? as usize;
|
let uc_ue_size = rply.read_u32::<LittleEndian>()? as usize;
|
||||||
// read a 4 byte uncompressed encoded size
|
// read a 4 byte uncompressed encoded size
|
||||||
|
#[expect(unused)]
|
||||||
let uc_enc_size = rply.read_u32::<LittleEndian>()? as usize;
|
let uc_enc_size = rply.read_u32::<LittleEndian>()? as usize;
|
||||||
// read a 4 byte compressed encoded size
|
// read a 4 byte compressed encoded size
|
||||||
|
#[expect(unused)]
|
||||||
let comp_enc_size = rply.read_u32::<LittleEndian>()? as usize;
|
let comp_enc_size = rply.read_u32::<LittleEndian>()? as usize;
|
||||||
frame.checkpoint_bytes.resize(uc_ue_size, 0);
|
frame.checkpoint_bytes.resize(uc_ue_size, 0);
|
||||||
// maybe decompress
|
// maybe decompress
|
||||||
@@ -342,7 +313,8 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
|
|||||||
rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?;
|
rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?;
|
||||||
}
|
}
|
||||||
(Compression::None, Encoding::Statestream) => {
|
(Compression::None, Encoding::Statestream) => {
|
||||||
let mut ss_decoder = statestream::Decoder::new(rply, &mut self.ss_state);
|
let mut ss_decoder =
|
||||||
|
statestream::Decoder::new(rply, &mut self.ss_state, uc_ue_size);
|
||||||
std::io::copy(
|
std::io::copy(
|
||||||
&mut ss_decoder,
|
&mut ss_decoder,
|
||||||
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
|
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
|
||||||
@@ -359,7 +331,8 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
|
|||||||
(Compression::Zlib, Encoding::Statestream) => {
|
(Compression::Zlib, Encoding::Statestream) => {
|
||||||
use flate2::bufread::ZlibDecoder;
|
use flate2::bufread::ZlibDecoder;
|
||||||
let mut decoder = ZlibDecoder::new(rply);
|
let mut decoder = ZlibDecoder::new(rply);
|
||||||
let mut ss_decoder = statestream::Decoder::new(&mut decoder, &mut self.ss_state);
|
let mut ss_decoder =
|
||||||
|
statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size);
|
||||||
std::io::copy(
|
std::io::copy(
|
||||||
&mut ss_decoder,
|
&mut ss_decoder,
|
||||||
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
|
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
|
||||||
@@ -376,7 +349,8 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
|
|||||||
(Compression::Zstd, Encoding::Statestream) => {
|
(Compression::Zstd, Encoding::Statestream) => {
|
||||||
use zstd::Decoder;
|
use zstd::Decoder;
|
||||||
let mut decoder = Decoder::with_buffer(rply)?.single_frame();
|
let mut decoder = Decoder::with_buffer(rply)?.single_frame();
|
||||||
let mut ss_decoder = statestream::Decoder::new(&mut decoder, &mut self.ss_state);
|
let mut ss_decoder =
|
||||||
|
statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size);
|
||||||
std::io::copy(
|
std::io::copy(
|
||||||
&mut ss_decoder,
|
&mut ss_decoder,
|
||||||
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
|
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
|
||||||
|
|||||||
+161
-15
@@ -1,5 +1,30 @@
|
|||||||
mod blockindex;
|
mod blockindex;
|
||||||
|
use crate::InvalidDeterminant;
|
||||||
use blockindex::BlockIndex;
|
use blockindex::BlockIndex;
|
||||||
|
use std::io::Write;
|
||||||
|
|
||||||
|
#[repr(u8)]
|
||||||
|
#[non_exhaustive]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum SSToken {
|
||||||
|
Start = 0,
|
||||||
|
NewBlock = 1,
|
||||||
|
NewSuperblock = 2,
|
||||||
|
SuperblockSeq = 3,
|
||||||
|
}
|
||||||
|
impl TryFrom<u8> for SSToken {
|
||||||
|
type Error = InvalidDeterminant;
|
||||||
|
|
||||||
|
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
|
||||||
|
match value {
|
||||||
|
0 => Ok(SSToken::Start),
|
||||||
|
1 => Ok(SSToken::NewBlock),
|
||||||
|
2 => Ok(SSToken::NewSuperblock),
|
||||||
|
3 => Ok(SSToken::SuperblockSeq),
|
||||||
|
_ => Err(InvalidDeterminant(value)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) struct Ctx {
|
pub(crate) struct Ctx {
|
||||||
block_size: u32,
|
block_size: u32,
|
||||||
@@ -26,31 +51,152 @@ impl Ctx {
|
|||||||
pub(crate) struct Decoder<'r, 'c, R: std::io::Read> {
|
pub(crate) struct Decoder<'r, 'c, R: std::io::Read> {
|
||||||
reader: &'r mut R,
|
reader: &'r mut R,
|
||||||
ctx: &'c mut Ctx,
|
ctx: &'c mut Ctx,
|
||||||
|
state_size: usize,
|
||||||
finished: bool,
|
finished: bool,
|
||||||
|
readout_cursor: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'r, 'c, R: std::io::Read> Decoder<'r, 'c, R> {
|
impl<'r, 'c, R: std::io::Read> Decoder<'r, 'c, R> {
|
||||||
pub(crate) fn new(reader: &'r mut R, ctx: &'c mut Ctx) -> Self {
|
pub(crate) fn new(reader: &'r mut R, ctx: &'c mut Ctx, state_size: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
reader,
|
reader,
|
||||||
ctx,
|
ctx,
|
||||||
finished: false,
|
finished: false,
|
||||||
|
readout_cursor: 0,
|
||||||
|
state_size,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
fn readout(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
match buf.write(&self.ctx.last_state[self.readout_cursor..]) {
|
||||||
impl<'r, 'c, R: std::io::Read> std::io::Read for Decoder<'r, 'c, R> {
|
Err(e) => Err(e),
|
||||||
/* a slightly degenerate read implementation in that it will keep
|
Ok(sz) => {
|
||||||
* calling read on the inner reader until a complete checkpoint is
|
self.readout_cursor += sz;
|
||||||
* read, then return 0 for subsequent reads */
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
|
||||||
use rmp::decode as r;
|
|
||||||
if self.finished {
|
|
||||||
return Ok(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
let sz = 0;
|
|
||||||
todo!();
|
|
||||||
Ok(sz)
|
Ok(sz)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum ParseState {
|
||||||
|
WaitForStart,
|
||||||
|
WaitForSuperblockSeq,
|
||||||
|
Finished,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(thiserror::Error, Debug)]
|
||||||
|
enum SSError {
|
||||||
|
#[error("Invalid token {0}")]
|
||||||
|
InvalidToken(#[from] InvalidDeterminant),
|
||||||
|
#[error("Too many start tokens in stream")]
|
||||||
|
TooManyStarts(),
|
||||||
|
#[error("Unexpected {1:?} during {0:?}")]
|
||||||
|
ParseError(ParseState, SSToken),
|
||||||
|
#[error("Block {0} is the wrong size")]
|
||||||
|
BlockWrongSize(u32),
|
||||||
|
#[error("Superblock {0} is the wrong size")]
|
||||||
|
SuperblockWrongSize(u32),
|
||||||
|
#[error("Couldn't insert block at {1} on frame {0}")]
|
||||||
|
BadBlockInsert(u64, u32),
|
||||||
|
#[error("Couldn't insert superblock at {1} on frame {0}")]
|
||||||
|
BadSuperblockInsert(u64, u32),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: std::io::Read> std::io::Read for Decoder<'_, '_, R> {
|
||||||
|
/* a slightly degenerate read implementation in that it will keep
|
||||||
|
* calling read on the inner reader until a complete checkpoint is
|
||||||
|
* read, then return 0 for subsequent reads */
|
||||||
|
fn read(&mut self, outbuf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
use ParseState as State;
|
||||||
|
use rmp::decode as r;
|
||||||
|
if self.finished {
|
||||||
|
if self.readout_cursor == self.state_size {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
return self.readout(outbuf);
|
||||||
|
}
|
||||||
|
let mut frame = 0;
|
||||||
|
let mut state = State::WaitForStart;
|
||||||
|
let mut buf = vec![0_u8; self.ctx.block_size as usize];
|
||||||
|
let mut superblock = vec![0_u32; self.ctx.superblock_size as usize];
|
||||||
|
loop {
|
||||||
|
let tok: u8 = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||||
|
match (
|
||||||
|
state,
|
||||||
|
SSToken::try_from(tok)
|
||||||
|
.map_err(|e| std::io::Error::other(SSError::InvalidToken(e)))?,
|
||||||
|
) {
|
||||||
|
(State::WaitForStart, SSToken::Start) => {
|
||||||
|
frame = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||||
|
state = State::WaitForSuperblockSeq;
|
||||||
|
}
|
||||||
|
(_, SSToken::Start) => return Err(std::io::Error::other(SSError::TooManyStarts())),
|
||||||
|
(State::WaitForSuperblockSeq, SSToken::NewBlock) => {
|
||||||
|
let idx = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||||
|
let bin_len = r::read_bin_len(self.reader).map_err(std::io::Error::other)?;
|
||||||
|
if bin_len != self.ctx.block_size {
|
||||||
|
return Err(std::io::Error::other(SSError::BlockWrongSize(bin_len)));
|
||||||
|
}
|
||||||
|
self.reader.read_exact(&mut buf)?;
|
||||||
|
if !self
|
||||||
|
.ctx
|
||||||
|
.block_index
|
||||||
|
.insert_exact(idx, Box::from(buf.clone()), frame)
|
||||||
|
{
|
||||||
|
return Err(std::io::Error::other(SSError::BadBlockInsert(frame, idx)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(State::WaitForSuperblockSeq, SSToken::NewSuperblock) => {
|
||||||
|
let idx = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||||
|
let arr_len = r::read_array_len(self.reader).map_err(std::io::Error::other)?;
|
||||||
|
if arr_len != self.ctx.superblock_size {
|
||||||
|
return Err(std::io::Error::other(SSError::SuperblockWrongSize(arr_len)));
|
||||||
|
}
|
||||||
|
for superblock_elt in &mut superblock {
|
||||||
|
*superblock_elt =
|
||||||
|
r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||||
|
}
|
||||||
|
if !self.ctx.superblock_index.insert_exact(
|
||||||
|
idx,
|
||||||
|
Box::from(superblock.clone()),
|
||||||
|
frame,
|
||||||
|
) {
|
||||||
|
return Err(std::io::Error::other(SSError::BadSuperblockInsert(
|
||||||
|
frame, idx,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(State::WaitForSuperblockSeq, SSToken::SuperblockSeq) => {
|
||||||
|
let arr_len =
|
||||||
|
r::read_array_len(self.reader).map_err(std::io::Error::other)? as usize;
|
||||||
|
let block_byte_size = self.ctx.block_size as usize;
|
||||||
|
let superblock_byte_size = self.ctx.superblock_size as usize * block_byte_size;
|
||||||
|
self.ctx.last_state.resize(self.state_size, 0);
|
||||||
|
for superblock_i in 0..arr_len {
|
||||||
|
let superblock_idx =
|
||||||
|
r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||||
|
self.ctx.last_superseq[superblock_i] = superblock_idx;
|
||||||
|
let superblock_data = self.ctx.superblock_index.get(superblock_idx);
|
||||||
|
for (block_i, block_id) in superblock_data.iter().copied().enumerate() {
|
||||||
|
let block_start = (superblock_i * superblock_byte_size
|
||||||
|
+ block_i * block_byte_size)
|
||||||
|
.min(self.state_size);
|
||||||
|
let block_end = (block_start + block_byte_size).min(self.state_size);
|
||||||
|
let block_bytes = self.ctx.block_index.get(block_id);
|
||||||
|
if block_end <= block_start {
|
||||||
|
// This can happen in the last superblock if it was padded with extra blocks
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
self.ctx.last_state[block_start..block_end]
|
||||||
|
.copy_from_slice(&block_bytes[0..(block_end - block_start)]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
state = State::Finished;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
(s, tok) => return Err(std::io::Error::other(SSError::ParseError(s, tok))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert_eq!(state, State::Finished);
|
||||||
|
self.readout(outbuf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ pub(crate) struct BlockIndex<
|
|||||||
object_size: usize,
|
object_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[expect(unused)]
|
||||||
pub(crate) struct Insertion {
|
pub(crate) struct Insertion {
|
||||||
index: u32,
|
index: u32,
|
||||||
is_new: bool,
|
is_new: bool,
|
||||||
@@ -42,7 +43,9 @@ impl<T: bytemuck::Zeroable + bytemuck::AnyBitPattern + bytemuck::NoUninit + Part
|
|||||||
hashes: vec![zero_hash],
|
hashes: vec![zero_hash],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#[expect(unused)]
|
||||||
pub fn insert(&mut self, obj: &[T], _frame: u64) -> Insertion {
|
pub fn insert(&mut self, obj: &[T], _frame: u64) -> Insertion {
|
||||||
|
assert_eq!(obj.len(), self.object_size);
|
||||||
let hash = hash(obj);
|
let hash = hash(obj);
|
||||||
match self.index.entry(hash) {
|
match self.index.entry(hash) {
|
||||||
std::collections::hash_map::Entry::Occupied(mut e) => {
|
std::collections::hash_map::Entry::Occupied(mut e) => {
|
||||||
@@ -81,6 +84,7 @@ impl<T: bytemuck::Zeroable + bytemuck::AnyBitPattern + bytemuck::NoUninit + Part
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn insert_exact(&mut self, idx: u32, obj: Box<[T]>, _frame: u64) -> bool {
|
pub fn insert_exact(&mut self, idx: u32, obj: Box<[T]>, _frame: u64) -> bool {
|
||||||
|
assert_eq!(obj.len(), self.object_size);
|
||||||
if self.objects.len() != idx as usize {
|
if self.objects.len() != idx as usize {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -88,17 +92,19 @@ impl<T: bytemuck::Zeroable + bytemuck::AnyBitPattern + bytemuck::NoUninit + Part
|
|||||||
self.index.entry(hash).or_default().push(idx);
|
self.index.entry(hash).or_default().push(idx);
|
||||||
self.objects.push(obj);
|
self.objects.push(obj);
|
||||||
self.hashes.push(hash);
|
self.hashes.push(hash);
|
||||||
return true;
|
true
|
||||||
}
|
}
|
||||||
pub fn get(&self, which: u32) -> &[T] {
|
pub fn get(&self, which: u32) -> &[T] {
|
||||||
&self.objects[which as usize]
|
&self.objects[which as usize]
|
||||||
}
|
}
|
||||||
|
#[expect(unused)]
|
||||||
pub fn clear(&mut self) {
|
pub fn clear(&mut self) {
|
||||||
self.index.clear();
|
self.index.clear();
|
||||||
self.objects.truncate(1);
|
self.objects.truncate(1);
|
||||||
self.hashes.truncate(1);
|
self.hashes.truncate(1);
|
||||||
self.index.insert(self.hashes[0], smallvec![0]);
|
self.index.insert(self.hashes[0], smallvec![0]);
|
||||||
}
|
}
|
||||||
|
#[expect(unused)]
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.objects.len()
|
self.objects.len()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user