complete statestream encoding

This commit is contained in:
2025-10-29 11:31:28 -07:00
parent 67c88683ab
commit 2fe31a6b34
3 changed files with 110 additions and 12 deletions
+95 -5
View File
@@ -220,16 +220,106 @@ pub(crate) struct Encoder<'w, 'c, W: std::io::Write> {
ctx: &'c mut Ctx,
}
/* Does not include the size of the str,arr,map,ext contents */
fn rmp_size(m: rmp::Marker) -> usize {
#[allow(
clippy::enum_glob_use,
reason = "If any new variants are added, the match will cease to be exhaustive"
)]
use rmp::Marker::*;
match m {
FixPos(_) | FixNeg(_) | Null | Reserved | False | True | FixMap(_) | FixArray(_)
| FixStr(_) => 1,
U8 | I8 | Bin8 | Str8 | FixExt1 | FixExt2 | FixExt4 | FixExt8 | FixExt16 => 2,
U16 | I16 | Bin16 | Ext8 | Str16 | Array16 | Map16 => 3,
Ext16 => 4,
U32 | I32 | F32 | Bin32 | Str32 | Array32 | Map32 => 5,
Ext32 => 6,
U64 | I64 | F64 => 9,
}
}
impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> {
pub(crate) fn new(writer: &'w mut W, ctx: &'c mut Ctx) -> Self {
Self { writer, ctx }
}
pub fn encode_checkpoint(mut self, checkpoint: &[u8], frame: u64) -> std::io::Result<u32> {
use rmp::encode as r;
r::write_uint(&mut self.writer, u64::from(u8::from(SSToken::Start)))?;
r::write_uint(&mut self.writer, frame)?;
todo!();
Ok(0)
let mut bytes_out = 0;
bytes_out += rmp_size(r::write_uint(
&mut self.writer,
u64::from(u8::from(SSToken::Start)),
)?);
bytes_out += rmp_size(r::write_uint(&mut self.writer, frame)?);
let block_size = self.ctx.block_size as usize;
let mut padded_block = vec![0; block_size];
let superblock_size = self.ctx.superblock_size as usize;
let superblock_size_bytes = block_size * superblock_size;
let superblock_count = ((checkpoint.len() - 1) / superblock_size_bytes) + 1;
let mut superblock_contents = vec![0_u32; superblock_size];
for (superblock_i, superblock_bytes) in checkpoint.chunks(superblock_size_bytes).enumerate()
{
if superblock_bytes.len() < superblock_size_bytes {
let block_count = (superblock_bytes.len() - 1) / block_size + 1;
if block_count + 1 < superblock_size {
superblock_contents[(block_count + 1)..].fill(0);
}
}
for (block_i, block_bytes) in superblock_bytes.chunks(block_size).enumerate() {
let found_block = if block_bytes.len() < block_size {
padded_block[block_bytes.len()..].fill(0);
padded_block[..block_bytes.len()].copy_from_slice(block_bytes);
self.ctx.block_index.insert(&padded_block, frame)
} else {
self.ctx.block_index.insert(block_bytes, frame)
};
superblock_contents[block_i] = found_block.index;
if found_block.is_new {
bytes_out += rmp_size(r::write_uint(
self.writer,
u64::from(u8::from(SSToken::NewBlock)),
)?);
bytes_out +=
rmp_size(r::write_uint(self.writer, u64::from(found_block.index))?);
bytes_out += rmp_size(r::write_bin_len(self.writer, self.ctx.block_size)?);
self.writer.write_all(block_bytes)?;
bytes_out += block_bytes.len();
}
}
let found_superblock = self
.ctx
.superblock_index
.insert(&superblock_contents, frame);
self.ctx.last_superseq[superblock_i] = found_superblock.index;
if found_superblock.is_new {
bytes_out += rmp_size(r::write_uint(
self.writer,
u64::from(u8::from(SSToken::NewSuperblock)),
)?);
bytes_out += rmp_size(r::write_uint(
self.writer,
u64::from(found_superblock.index),
)?);
bytes_out += rmp_size(r::write_array_len(self.writer, self.ctx.superblock_size)?);
for blkid in &superblock_contents {
bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*blkid))?);
}
}
}
bytes_out += rmp_size(r::write_uint(
self.writer,
u64::from(u8::from(SSToken::SuperblockSeq)),
)?);
bytes_out += rmp_size(r::write_array_len(
self.writer,
u32::try_from(superblock_count)
.map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e)))?,
)?);
for super_id in &self.ctx.last_superseq {
bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*super_id))?);
}
// commit
u32::try_from(bytes_out)
.map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e)))
}
}