This commit is contained in:
2025-10-27 14:38:10 -07:00
parent 1ad81c5ba1
commit 0f3aa51427
6 changed files with 612 additions and 149 deletions
+140 -149
View File
@@ -1,3 +1,4 @@
use crate::statestream;
use thiserror::Error;
#[derive(Debug, Error)]
@@ -168,9 +169,74 @@ pub struct ReplayDecoder<'a, R: std::io::BufRead> {
pub header: Header,
pub initial_state: Vec<u8>,
pub frame_number: usize,
ss_state: statestream::Ctx,
}
impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
/// Creates a [`ReplayDecoder`] for the given buffered readable stream.
///
/// # Errors
/// [`ReplayError::IO`]: Some issue with the read stream, e.g. insufficient length or unexpected end
/// [`ReplayError::Magic`]: Invalid magic number at beginning of file
/// [`ReplayError::Version`]: Version identifier not recognized by parser
/// [`ReplayError::Compression`]: Unsupported compression scheme for checkpoints
pub fn new(rply: &mut R) -> Result<ReplayDecoder<'_, R>> {
use byteorder::{LittleEndian, ReadBytesExt};
let magic = rply.read_u32::<LittleEndian>()?;
if magic != MAGIC {
return Err(ReplayError::Magic(magic));
}
let version = rply.read_u32::<LittleEndian>()?;
if version > 2 {
return Err(ReplayError::Version(version));
}
let content_crc = rply.read_u32::<LittleEndian>()?;
let initial_state_size = rply.read_u32::<LittleEndian>()?;
let identifier = rply.read_u64::<LittleEndian>()?;
let base = HeaderBase {
version,
content_crc,
initial_state_size,
identifier,
};
let mut initial_state = vec![0; initial_state_size as usize];
if version < 2 {
rply.read_exact(initial_state.as_mut_slice())?;
return Ok(ReplayDecoder {
header: Header::V0V1(base),
rply,
initial_state,
frame_number: 0,
ss_state: statestream::Ctx::new(1, 1),
});
}
let frame_count = rply.read_u32::<LittleEndian>()?;
let block_size = rply.read_u32::<LittleEndian>()?;
let superblock_size = rply.read_u32::<LittleEndian>()?;
let cp_config = rply.read_u32::<LittleEndian>()?;
let checkpoint_commit_interval = (cp_config >> 24) as u8;
let checkpoint_commit_threshold = ((cp_config >> 16) & 0xFF) as u8;
let checkpoint_compression = Compression::try_from(((cp_config >> 8) & 0xFF) as u8)
.map_err(ReplayError::Compression)?;
rply.read_exact(initial_state.as_mut_slice())?;
// TODO: decode if version is 2
Ok(ReplayDecoder {
rply,
initial_state,
header: Header::V2(HeaderV2 {
base,
frame_count,
block_size,
superblock_size,
checkpoint_commit_interval,
checkpoint_commit_threshold,
checkpoint_compression,
}),
frame_number: 0,
ss_state: statestream::Ctx::new(block_size, superblock_size),
})
}
/// Reads a single frame at the current decoder position.
/// # Errors
/// [`ReplayError::IO`]: Unexpected end of stream or other I/O error
@@ -238,75 +304,83 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
FrameToken::Regular => {
frame.checkpoint_compression = Compression::None;
frame.checkpoint_encoding = Encoding::Raw;
frame.checkpoint_raw_bytes.clear();
frame.checkpoint_uncompressed_raw_bytes.clear();
frame.checkpoint_uncompressed_unencoded_bytes.clear();
frame.checkpoint_bytes.clear();
}
FrameToken::Checkpoint => {
frame.checkpoint_compression = Compression::None;
frame.checkpoint_encoding = Encoding::Raw;
let cp_size = usize::try_from(rply.read_u64::<LittleEndian>()?)
.map_err(ReplayError::CheckpointTooBig)?;
frame.checkpoint_raw_bytes.resize(cp_size, 0);
rply.read_exact(frame.checkpoint_raw_bytes.as_mut_slice())?;
frame.checkpoint_bytes.resize(cp_size, 0);
rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?;
}
FrameToken::Checkpoint2 => {
// read a 1 byte compression
let compression =
Compression::try_from(rply.read_u8()?).map_err(ReplayError::Compression)?;
// read a 1 byte encoding
let encoding =
Encoding::try_from(rply.read_u8()?).map_err(ReplayError::Encoding)?;
// read a 4 byte uncompressed unencoded size
let uc_ue_size = rply.read_u32::<LittleEndian>()? as usize;
// read a 4 byte uncompressed encoded size
let uc_enc_size = rply.read_u32::<LittleEndian>()? as usize;
// read a 4 byte compressed encoded size
let comp_enc_size = rply.read_u32::<LittleEndian>()? as usize;
// read the compressed encoded data (todo, make a reader instead)
frame.checkpoint_raw_bytes.resize(comp_enc_size, 0);
rply.read_exact(frame.checkpoint_raw_bytes.as_mut_slice())?;
// maybe decompress
match compression {
Compression::None => {}
Compression::Zlib => {
use flate2::bufread::ZlibDecoder;
frame
.checkpoint_uncompressed_raw_bytes
.resize(uc_enc_size, 0);
let mut decoder = ZlibDecoder::new(rply);
std::io::copy(
&mut decoder,
&mut std::io::Cursor::new(
frame.checkpoint_uncompressed_raw_bytes.as_mut_slice(),
),
)?;
}
Compression::Zstd => {
use zstd::Decoder;
frame
.checkpoint_uncompressed_raw_bytes
.resize(uc_enc_size, 0);
let mut decoder = Decoder::with_buffer(rply)?.single_frame();
std::io::copy(
&mut decoder,
&mut std::io::Cursor::new(
frame.checkpoint_uncompressed_raw_bytes.as_mut_slice(),
),
)?;
decoder.finish();
}
}
// maybe decode
match encoding {
Encoding::Raw => {}
Encoding::Statestream => {
frame
.checkpoint_uncompressed_unencoded_bytes
.resize(uc_ue_size, 0);
// statestream_decode(frame.checkpoint_decompressed_data().unwrap(), &mut frame.checkpoint_uncompressed_unencoded_bytes);
}
}
self.decode_checkpoint(frame)?;
}
}
Ok(())
}
fn decode_checkpoint(&mut self, frame: &mut Frame) -> Result<()> {
use byteorder::{LittleEndian, ReadBytesExt};
let rply = &mut *self.rply;
// read a 1 byte compression code
let compression =
Compression::try_from(rply.read_u8()?).map_err(ReplayError::Compression)?;
// read a 1 byte encoding code
let encoding = Encoding::try_from(rply.read_u8()?).map_err(ReplayError::Encoding)?;
// read a 4 byte uncompressed unencoded size
let uc_ue_size = rply.read_u32::<LittleEndian>()? as usize;
// read a 4 byte uncompressed encoded size
let uc_enc_size = rply.read_u32::<LittleEndian>()? as usize;
// read a 4 byte compressed encoded size
let comp_enc_size = rply.read_u32::<LittleEndian>()? as usize;
frame.checkpoint_bytes.resize(uc_ue_size, 0);
// maybe decompress
match (compression, encoding) {
(Compression::None, Encoding::Raw) => {
rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?;
}
(Compression::None, Encoding::Statestream) => {
let mut ss_decoder = statestream::Decoder::new(rply, &mut self.ss_state);
std::io::copy(
&mut ss_decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
)?;
}
(Compression::Zlib, Encoding::Raw) => {
use flate2::bufread::ZlibDecoder;
let mut decoder = ZlibDecoder::new(rply);
std::io::copy(
&mut decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
)?;
}
(Compression::Zlib, Encoding::Statestream) => {
use flate2::bufread::ZlibDecoder;
let mut decoder = ZlibDecoder::new(rply);
let mut ss_decoder = statestream::Decoder::new(&mut decoder, &mut self.ss_state);
std::io::copy(
&mut ss_decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
)?;
}
(Compression::Zstd, Encoding::Raw) => {
use zstd::Decoder;
let mut decoder = Decoder::with_buffer(rply)?.single_frame();
std::io::copy(
&mut decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
)?;
}
(Compression::Zstd, Encoding::Statestream) => {
use zstd::Decoder;
let mut decoder = Decoder::with_buffer(rply)?.single_frame();
let mut ss_decoder = statestream::Decoder::new(&mut decoder, &mut self.ss_state);
std::io::copy(
&mut ss_decoder,
&mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()),
)?;
}
}
Ok(())
@@ -316,64 +390,11 @@ impl<R: std::io::BufRead> ReplayDecoder<'_, R> {
/// Creates a [`ReplayDecoder`] for the given buffered readable stream.
///
/// # Errors
/// [`ReplayError::IO`]: Some issue with the read stream, e.g. insufficient length or unexpected end
/// [`ReplayError::Magic`]: Invalid magic number at beginning of file
/// [`ReplayError::Version`]: Version identifier not recognized by parser
/// [`ReplayError::Compression`]: Unsupported compression scheme for checkpoints
/// See [`ReplayDecoder::new`].
pub fn decode<R: std::io::BufRead>(rply: &mut R) -> Result<ReplayDecoder<'_, R>> {
use byteorder::{LittleEndian, ReadBytesExt};
let magic = rply.read_u32::<LittleEndian>()?;
if magic != MAGIC {
return Err(ReplayError::Magic(magic));
}
let version = rply.read_u32::<LittleEndian>()?;
if version > 2 {
return Err(ReplayError::Version(version));
}
let content_crc = rply.read_u32::<LittleEndian>()?;
let initial_state_size = rply.read_u32::<LittleEndian>()?;
let identifier = rply.read_u64::<LittleEndian>()?;
let base = HeaderBase {
version,
content_crc,
initial_state_size,
identifier,
};
let mut initial_state = vec![0; initial_state_size as usize];
if version < 2 {
rply.read_exact(initial_state.as_mut_slice())?;
return Ok(ReplayDecoder {
header: Header::V0V1(base),
rply,
initial_state,
frame_number: 0,
});
}
let frame_count = rply.read_u32::<LittleEndian>()?;
let block_size = rply.read_u32::<LittleEndian>()?;
let superblock_size = rply.read_u32::<LittleEndian>()?;
let cp_config = rply.read_u32::<LittleEndian>()?;
let checkpoint_commit_interval = (cp_config >> 24) as u8;
let checkpoint_commit_threshold = ((cp_config >> 16) & 0xFF) as u8;
let checkpoint_compression =
Compression::try_from(((cp_config >> 8) & 0xFF) as u8).map_err(ReplayError::Compression)?;
rply.read_exact(initial_state.as_mut_slice())?;
// TODO: decode if version is 2
Ok(ReplayDecoder {
rply,
initial_state,
header: Header::V2(HeaderV2 {
base,
frame_count,
block_size,
superblock_size,
checkpoint_commit_interval,
checkpoint_commit_threshold,
checkpoint_compression,
}),
frame_number: 0,
})
ReplayDecoder::new(rply)
}
impl Header {
#[must_use]
pub fn version(&self) -> u32 {
@@ -403,47 +424,17 @@ pub struct InputData {
pub struct Frame {
pub key_events: Vec<KeyData>,
pub input_events: Vec<InputData>,
checkpoint_raw_bytes: Vec<u8>,
checkpoint_uncompressed_raw_bytes: Vec<u8>,
checkpoint_uncompressed_unencoded_bytes: Vec<u8>,
checkpoint_bytes: Vec<u8>,
pub checkpoint_compression: Compression,
pub checkpoint_encoding: Encoding,
}
impl Frame {
#[must_use]
pub fn checkpoint_decompressed_data(&self) -> Option<&[u8]> {
if self.checkpoint_raw_bytes.is_empty() {
return None;
}
Some(match self.checkpoint_compression {
Compression::None => self.checkpoint_raw_bytes.as_slice(),
_ => self.checkpoint_uncompressed_raw_bytes.as_slice(),
})
}
#[must_use]
pub fn checkpoint_data(&self) -> Option<&[u8]> {
if self.checkpoint_raw_bytes.is_empty() {
return None;
}
Some(
match (self.checkpoint_compression, self.checkpoint_encoding) {
(Compression::None, Encoding::Raw) => self.checkpoint_raw_bytes.as_slice(),
(_, Encoding::Raw) => self.checkpoint_uncompressed_raw_bytes.as_slice(),
(_, _) => self.checkpoint_uncompressed_unencoded_bytes.as_slice(),
},
)
}
}
impl Default for Frame {
fn default() -> Self {
Self {
key_events: Vec::default(),
input_events: Vec::default(),
checkpoint_raw_bytes: Vec::default(),
checkpoint_uncompressed_raw_bytes: Vec::default(),
checkpoint_uncompressed_unencoded_bytes: Vec::default(),
checkpoint_bytes: Vec::default(),
checkpoint_compression: Compression::None,
checkpoint_encoding: Encoding::Raw,
}