2025-10-27 14:38:10 -07:00
|
|
|
mod blockindex;
|
2025-10-28 09:40:49 -07:00
|
|
|
use crate::InvalidDeterminant;
|
2025-10-27 14:38:10 -07:00
|
|
|
use blockindex::BlockIndex;
|
2025-10-28 09:40:49 -07:00
|
|
|
use std::io::Write;
|
|
|
|
|
|
|
|
|
|
#[repr(u8)]
|
|
|
|
|
#[non_exhaustive]
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub enum SSToken {
|
|
|
|
|
Start = 0,
|
|
|
|
|
NewBlock = 1,
|
|
|
|
|
NewSuperblock = 2,
|
|
|
|
|
SuperblockSeq = 3,
|
|
|
|
|
}
|
|
|
|
|
impl TryFrom<u8> for SSToken {
|
|
|
|
|
type Error = InvalidDeterminant;
|
|
|
|
|
|
|
|
|
|
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
|
|
|
|
|
match value {
|
|
|
|
|
0 => Ok(SSToken::Start),
|
|
|
|
|
1 => Ok(SSToken::NewBlock),
|
|
|
|
|
2 => Ok(SSToken::NewSuperblock),
|
|
|
|
|
3 => Ok(SSToken::SuperblockSeq),
|
|
|
|
|
_ => Err(InvalidDeterminant(value)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-10-27 14:38:10 -07:00
|
|
|
|
2025-10-28 15:38:44 -07:00
|
|
|
impl From<SSToken> for u8 {
|
|
|
|
|
fn from(value: SSToken) -> Self {
|
|
|
|
|
match value {
|
|
|
|
|
SSToken::Start => 0,
|
|
|
|
|
SSToken::NewBlock => 1,
|
|
|
|
|
SSToken::NewSuperblock => 2,
|
|
|
|
|
SSToken::SuperblockSeq => 3,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-27 14:38:10 -07:00
|
|
|
pub(crate) struct Ctx {
|
|
|
|
|
block_size: u32,
|
|
|
|
|
superblock_size: u32,
|
|
|
|
|
last_state: Vec<u8>,
|
|
|
|
|
last_superseq: Vec<u32>,
|
|
|
|
|
block_index: BlockIndex<u8>,
|
|
|
|
|
superblock_index: BlockIndex<u32>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Ctx {
|
|
|
|
|
pub fn new(block_size: u32, superblock_size: u32) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
block_size,
|
|
|
|
|
superblock_size,
|
|
|
|
|
last_state: vec![],
|
|
|
|
|
last_superseq: vec![],
|
|
|
|
|
block_index: BlockIndex::new(block_size as usize),
|
|
|
|
|
superblock_index: BlockIndex::new(superblock_size as usize),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) struct Decoder<'r, 'c, R: std::io::Read> {
|
|
|
|
|
reader: &'r mut R,
|
|
|
|
|
ctx: &'c mut Ctx,
|
2025-10-28 09:40:49 -07:00
|
|
|
state_size: usize,
|
2025-10-27 14:38:10 -07:00
|
|
|
finished: bool,
|
2025-10-28 09:40:49 -07:00
|
|
|
readout_cursor: usize,
|
2025-10-27 14:38:10 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'r, 'c, R: std::io::Read> Decoder<'r, 'c, R> {
|
2025-10-28 09:40:49 -07:00
|
|
|
pub(crate) fn new(reader: &'r mut R, ctx: &'c mut Ctx, state_size: usize) -> Self {
|
2025-10-27 14:38:10 -07:00
|
|
|
Self {
|
|
|
|
|
reader,
|
|
|
|
|
ctx,
|
|
|
|
|
finished: false,
|
2025-10-28 09:40:49 -07:00
|
|
|
readout_cursor: 0,
|
|
|
|
|
state_size,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fn readout(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
|
|
|
|
|
match buf.write(&self.ctx.last_state[self.readout_cursor..]) {
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
Ok(sz) => {
|
|
|
|
|
self.readout_cursor += sz;
|
|
|
|
|
Ok(sz)
|
|
|
|
|
}
|
2025-10-27 14:38:10 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-10-28 09:40:49 -07:00
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
|
|
|
pub enum ParseState {
|
|
|
|
|
WaitForStart,
|
|
|
|
|
WaitForSuperblockSeq,
|
|
|
|
|
Finished,
|
|
|
|
|
}
|
2025-10-27 14:38:10 -07:00
|
|
|
|
2025-10-28 09:40:49 -07:00
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
|
|
|
enum SSError {
|
|
|
|
|
#[error("Invalid token {0}")]
|
|
|
|
|
InvalidToken(#[from] InvalidDeterminant),
|
|
|
|
|
#[error("Too many start tokens in stream")]
|
|
|
|
|
TooManyStarts(),
|
|
|
|
|
#[error("Unexpected {1:?} during {0:?}")]
|
|
|
|
|
ParseError(ParseState, SSToken),
|
|
|
|
|
#[error("Block {0} is the wrong size")]
|
|
|
|
|
BlockWrongSize(u32),
|
|
|
|
|
#[error("Superblock {0} is the wrong size")]
|
|
|
|
|
SuperblockWrongSize(u32),
|
|
|
|
|
#[error("Couldn't insert block at {1} on frame {0}")]
|
|
|
|
|
BadBlockInsert(u64, u32),
|
|
|
|
|
#[error("Couldn't insert superblock at {1} on frame {0}")]
|
|
|
|
|
BadSuperblockInsert(u64, u32),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<R: std::io::Read> std::io::Read for Decoder<'_, '_, R> {
|
2025-10-27 14:38:10 -07:00
|
|
|
/* a slightly degenerate read implementation in that it will keep
|
|
|
|
|
* calling read on the inner reader until a complete checkpoint is
|
|
|
|
|
* read, then return 0 for subsequent reads */
|
2025-10-28 09:40:49 -07:00
|
|
|
fn read(&mut self, outbuf: &mut [u8]) -> std::io::Result<usize> {
|
|
|
|
|
use ParseState as State;
|
2025-10-27 14:38:10 -07:00
|
|
|
use rmp::decode as r;
|
|
|
|
|
if self.finished {
|
2025-10-28 09:40:49 -07:00
|
|
|
if self.readout_cursor == self.state_size {
|
|
|
|
|
return Ok(0);
|
|
|
|
|
}
|
|
|
|
|
return self.readout(outbuf);
|
2025-10-27 14:38:10 -07:00
|
|
|
}
|
2025-10-28 09:40:49 -07:00
|
|
|
let mut frame = 0;
|
|
|
|
|
let mut state = State::WaitForStart;
|
|
|
|
|
let mut buf = vec![0_u8; self.ctx.block_size as usize];
|
|
|
|
|
let mut superblock = vec![0_u32; self.ctx.superblock_size as usize];
|
|
|
|
|
loop {
|
|
|
|
|
let tok: u8 = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
|
|
|
|
match (
|
|
|
|
|
state,
|
|
|
|
|
SSToken::try_from(tok)
|
|
|
|
|
.map_err(|e| std::io::Error::other(SSError::InvalidToken(e)))?,
|
|
|
|
|
) {
|
|
|
|
|
(State::WaitForStart, SSToken::Start) => {
|
|
|
|
|
frame = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
|
|
|
|
state = State::WaitForSuperblockSeq;
|
|
|
|
|
}
|
|
|
|
|
(_, SSToken::Start) => return Err(std::io::Error::other(SSError::TooManyStarts())),
|
|
|
|
|
(State::WaitForSuperblockSeq, SSToken::NewBlock) => {
|
|
|
|
|
let idx = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
|
|
|
|
let bin_len = r::read_bin_len(self.reader).map_err(std::io::Error::other)?;
|
|
|
|
|
if bin_len != self.ctx.block_size {
|
|
|
|
|
return Err(std::io::Error::other(SSError::BlockWrongSize(bin_len)));
|
|
|
|
|
}
|
|
|
|
|
self.reader.read_exact(&mut buf)?;
|
|
|
|
|
if !self
|
|
|
|
|
.ctx
|
|
|
|
|
.block_index
|
|
|
|
|
.insert_exact(idx, Box::from(buf.clone()), frame)
|
|
|
|
|
{
|
|
|
|
|
return Err(std::io::Error::other(SSError::BadBlockInsert(frame, idx)));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
(State::WaitForSuperblockSeq, SSToken::NewSuperblock) => {
|
|
|
|
|
let idx = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
|
|
|
|
let arr_len = r::read_array_len(self.reader).map_err(std::io::Error::other)?;
|
|
|
|
|
if arr_len != self.ctx.superblock_size {
|
|
|
|
|
return Err(std::io::Error::other(SSError::SuperblockWrongSize(arr_len)));
|
|
|
|
|
}
|
|
|
|
|
for superblock_elt in &mut superblock {
|
|
|
|
|
*superblock_elt =
|
|
|
|
|
r::read_int(self.reader).map_err(std::io::Error::other)?;
|
|
|
|
|
}
|
|
|
|
|
if !self.ctx.superblock_index.insert_exact(
|
|
|
|
|
idx,
|
|
|
|
|
Box::from(superblock.clone()),
|
|
|
|
|
frame,
|
|
|
|
|
) {
|
|
|
|
|
return Err(std::io::Error::other(SSError::BadSuperblockInsert(
|
|
|
|
|
frame, idx,
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
(State::WaitForSuperblockSeq, SSToken::SuperblockSeq) => {
|
|
|
|
|
let arr_len =
|
|
|
|
|
r::read_array_len(self.reader).map_err(std::io::Error::other)? as usize;
|
|
|
|
|
let block_byte_size = self.ctx.block_size as usize;
|
|
|
|
|
let superblock_byte_size = self.ctx.superblock_size as usize * block_byte_size;
|
2025-10-28 10:59:47 -07:00
|
|
|
let mut superseq = vec![0; arr_len];
|
2025-10-28 09:40:49 -07:00
|
|
|
self.ctx.last_state.resize(self.state_size, 0);
|
2025-10-28 10:59:47 -07:00
|
|
|
for (superblock_i, superseq_sblk) in superseq.iter_mut().enumerate() {
|
2025-10-28 09:40:49 -07:00
|
|
|
let superblock_idx =
|
|
|
|
|
r::read_int(self.reader).map_err(std::io::Error::other)?;
|
2025-10-28 10:59:47 -07:00
|
|
|
*superseq_sblk = superblock_idx;
|
2025-10-28 09:40:49 -07:00
|
|
|
let superblock_data = self.ctx.superblock_index.get(superblock_idx);
|
|
|
|
|
for (block_i, block_id) in superblock_data.iter().copied().enumerate() {
|
|
|
|
|
let block_start = (superblock_i * superblock_byte_size
|
|
|
|
|
+ block_i * block_byte_size)
|
|
|
|
|
.min(self.state_size);
|
|
|
|
|
let block_end = (block_start + block_byte_size).min(self.state_size);
|
|
|
|
|
let block_bytes = self.ctx.block_index.get(block_id);
|
|
|
|
|
if block_end <= block_start {
|
|
|
|
|
// This can happen in the last superblock if it was padded with extra blocks
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
self.ctx.last_state[block_start..block_end]
|
|
|
|
|
.copy_from_slice(&block_bytes[0..(block_end - block_start)]);
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-10-28 10:59:47 -07:00
|
|
|
self.ctx.last_superseq = superseq;
|
2025-10-28 09:40:49 -07:00
|
|
|
state = State::Finished;
|
2025-10-28 10:59:47 -07:00
|
|
|
self.finished = true;
|
2025-10-28 09:40:49 -07:00
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
(s, tok) => return Err(std::io::Error::other(SSError::ParseError(s, tok))),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
assert_eq!(state, State::Finished);
|
|
|
|
|
self.readout(outbuf)
|
2025-10-27 14:38:10 -07:00
|
|
|
}
|
|
|
|
|
}
|
2025-10-28 15:38:00 -07:00
|
|
|
|
|
|
|
|
pub(crate) struct Encoder<'w, 'c, W: std::io::Write> {
|
|
|
|
|
writer: &'w mut W,
|
|
|
|
|
ctx: &'c mut Ctx,
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-29 11:31:28 -07:00
|
|
|
/* Does not include the size of the str,arr,map,ext contents */
|
|
|
|
|
fn rmp_size(m: rmp::Marker) -> usize {
|
|
|
|
|
#[allow(
|
|
|
|
|
clippy::enum_glob_use,
|
|
|
|
|
reason = "If any new variants are added, the match will cease to be exhaustive"
|
|
|
|
|
)]
|
|
|
|
|
use rmp::Marker::*;
|
|
|
|
|
match m {
|
|
|
|
|
FixPos(_) | FixNeg(_) | Null | Reserved | False | True | FixMap(_) | FixArray(_)
|
|
|
|
|
| FixStr(_) => 1,
|
|
|
|
|
U8 | I8 | Bin8 | Str8 | FixExt1 | FixExt2 | FixExt4 | FixExt8 | FixExt16 => 2,
|
|
|
|
|
U16 | I16 | Bin16 | Ext8 | Str16 | Array16 | Map16 => 3,
|
|
|
|
|
Ext16 => 4,
|
|
|
|
|
U32 | I32 | F32 | Bin32 | Str32 | Array32 | Map32 => 5,
|
|
|
|
|
Ext32 => 6,
|
|
|
|
|
U64 | I64 | F64 => 9,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-10-28 15:38:00 -07:00
|
|
|
impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> {
|
|
|
|
|
pub(crate) fn new(writer: &'w mut W, ctx: &'c mut Ctx) -> Self {
|
|
|
|
|
Self { writer, ctx }
|
|
|
|
|
}
|
|
|
|
|
pub fn encode_checkpoint(mut self, checkpoint: &[u8], frame: u64) -> std::io::Result<u32> {
|
|
|
|
|
use rmp::encode as r;
|
2025-10-29 11:31:28 -07:00
|
|
|
let mut bytes_out = 0;
|
|
|
|
|
bytes_out += rmp_size(r::write_uint(
|
|
|
|
|
&mut self.writer,
|
|
|
|
|
u64::from(u8::from(SSToken::Start)),
|
|
|
|
|
)?);
|
|
|
|
|
bytes_out += rmp_size(r::write_uint(&mut self.writer, frame)?);
|
|
|
|
|
let block_size = self.ctx.block_size as usize;
|
|
|
|
|
let mut padded_block = vec![0; block_size];
|
|
|
|
|
let superblock_size = self.ctx.superblock_size as usize;
|
|
|
|
|
let superblock_size_bytes = block_size * superblock_size;
|
|
|
|
|
let superblock_count = ((checkpoint.len() - 1) / superblock_size_bytes) + 1;
|
2025-10-29 12:37:33 -07:00
|
|
|
self.ctx
|
|
|
|
|
.last_superseq
|
|
|
|
|
.resize(superblock_count.max(self.ctx.last_superseq.len()), 0);
|
2025-10-29 11:31:28 -07:00
|
|
|
let mut superblock_contents = vec![0_u32; superblock_size];
|
|
|
|
|
for (superblock_i, superblock_bytes) in checkpoint.chunks(superblock_size_bytes).enumerate()
|
|
|
|
|
{
|
|
|
|
|
if superblock_bytes.len() < superblock_size_bytes {
|
|
|
|
|
let block_count = (superblock_bytes.len() - 1) / block_size + 1;
|
|
|
|
|
if block_count + 1 < superblock_size {
|
|
|
|
|
superblock_contents[(block_count + 1)..].fill(0);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for (block_i, block_bytes) in superblock_bytes.chunks(block_size).enumerate() {
|
|
|
|
|
let found_block = if block_bytes.len() < block_size {
|
|
|
|
|
padded_block[block_bytes.len()..].fill(0);
|
|
|
|
|
padded_block[..block_bytes.len()].copy_from_slice(block_bytes);
|
|
|
|
|
self.ctx.block_index.insert(&padded_block, frame)
|
|
|
|
|
} else {
|
|
|
|
|
self.ctx.block_index.insert(block_bytes, frame)
|
|
|
|
|
};
|
|
|
|
|
superblock_contents[block_i] = found_block.index;
|
|
|
|
|
if found_block.is_new {
|
2025-10-30 10:24:02 -07:00
|
|
|
let block_out_bytes = self.ctx.block_index.get(found_block.index);
|
2025-10-29 11:31:28 -07:00
|
|
|
bytes_out += rmp_size(r::write_uint(
|
|
|
|
|
self.writer,
|
|
|
|
|
u64::from(u8::from(SSToken::NewBlock)),
|
|
|
|
|
)?);
|
|
|
|
|
bytes_out +=
|
|
|
|
|
rmp_size(r::write_uint(self.writer, u64::from(found_block.index))?);
|
|
|
|
|
bytes_out += rmp_size(r::write_bin_len(self.writer, self.ctx.block_size)?);
|
2025-10-30 10:24:02 -07:00
|
|
|
self.writer.write_all(block_out_bytes)?;
|
|
|
|
|
bytes_out += block_out_bytes.len();
|
2025-10-29 11:31:28 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
let found_superblock = self
|
|
|
|
|
.ctx
|
|
|
|
|
.superblock_index
|
|
|
|
|
.insert(&superblock_contents, frame);
|
|
|
|
|
self.ctx.last_superseq[superblock_i] = found_superblock.index;
|
|
|
|
|
if found_superblock.is_new {
|
|
|
|
|
bytes_out += rmp_size(r::write_uint(
|
|
|
|
|
self.writer,
|
|
|
|
|
u64::from(u8::from(SSToken::NewSuperblock)),
|
|
|
|
|
)?);
|
|
|
|
|
bytes_out += rmp_size(r::write_uint(
|
|
|
|
|
self.writer,
|
|
|
|
|
u64::from(found_superblock.index),
|
|
|
|
|
)?);
|
|
|
|
|
bytes_out += rmp_size(r::write_array_len(self.writer, self.ctx.superblock_size)?);
|
|
|
|
|
for blkid in &superblock_contents {
|
|
|
|
|
bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*blkid))?);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-10-29 12:37:33 -07:00
|
|
|
self.ctx.last_superseq.truncate(superblock_count);
|
2025-10-29 11:31:28 -07:00
|
|
|
bytes_out += rmp_size(r::write_uint(
|
|
|
|
|
self.writer,
|
|
|
|
|
u64::from(u8::from(SSToken::SuperblockSeq)),
|
|
|
|
|
)?);
|
|
|
|
|
bytes_out += rmp_size(r::write_array_len(
|
|
|
|
|
self.writer,
|
|
|
|
|
u32::try_from(superblock_count)
|
|
|
|
|
.map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e)))?,
|
|
|
|
|
)?);
|
|
|
|
|
for super_id in &self.ctx.last_superseq {
|
|
|
|
|
bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*super_id))?);
|
|
|
|
|
}
|
|
|
|
|
// commit
|
|
|
|
|
u32::try_from(bytes_out)
|
|
|
|
|
.map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e)))
|
2025-10-28 15:38:00 -07:00
|
|
|
}
|
|
|
|
|
}
|