workspacify
This commit is contained in:
@@ -0,0 +1,94 @@
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
#[repr(usize)]
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
pub enum Timer {
|
||||
DecodeFrame = 0,
|
||||
DecodeCheckpoint,
|
||||
DecodeStatestream,
|
||||
EncodeFrame,
|
||||
EncodeCheckpoint,
|
||||
EncodeStatestream,
|
||||
Count,
|
||||
}
|
||||
#[repr(usize)]
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
pub enum Counter {
|
||||
EncReusedBlocks = 0,
|
||||
EncReusedSuperblocks,
|
||||
EncSkippedBlocks,
|
||||
EncMemCmps,
|
||||
EncHashes,
|
||||
EncTotalBlocks,
|
||||
EncTotalSuperblocks,
|
||||
EncTotalKBsIn,
|
||||
EncTotalKBsOut,
|
||||
DecSkippedSuperblocks,
|
||||
DecSkippedBlocks,
|
||||
Count,
|
||||
}
|
||||
static TIME_ACC: [AtomicU64; Timer::Count as usize] = [
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
];
|
||||
static TIME_COUNTS: [AtomicU64; Timer::Count as usize] = [
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
];
|
||||
static COUNTS: [AtomicU64; Counter::Count as usize] = [
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
AtomicU64::new(0),
|
||||
];
|
||||
|
||||
pub struct Stopwatch(Timer, std::time::Instant);
|
||||
impl Stopwatch {
|
||||
fn new(t: Timer) -> Self {
|
||||
Self(t, std::time::Instant::now())
|
||||
}
|
||||
}
|
||||
impl Drop for Stopwatch {
|
||||
fn drop(&mut self) {
|
||||
TIME_ACC[self.0 as usize].fetch_add(
|
||||
u64::try_from(self.1.elapsed().as_micros()).unwrap_or(u64::MAX),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
TIME_COUNTS[self.0 as usize].fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn time(t: Timer) -> Stopwatch {
|
||||
Stopwatch::new(t)
|
||||
}
|
||||
pub fn count(c: Counter, amt: u64) -> u64 {
|
||||
COUNTS[c as usize].fetch_add(amt, Ordering::Relaxed) + amt
|
||||
}
|
||||
pub struct Times {
|
||||
pub count: u64,
|
||||
pub micros: u64,
|
||||
}
|
||||
pub fn stats(t: Timer) -> Times {
|
||||
Times {
|
||||
count: TIME_COUNTS[t as usize].load(Ordering::Relaxed),
|
||||
micros: TIME_ACC[t as usize].load(Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
pub fn counts(c: Counter) -> u64 {
|
||||
COUNTS[c as usize].load(Ordering::Relaxed)
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
mod clock;
|
||||
mod rply;
|
||||
mod statestream;
|
||||
pub use clock::{Counter, Timer, Times, counts, stats};
|
||||
pub use rply::*;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub struct InvalidDeterminant(pub u8);
|
||||
impl std::fmt::Display for InvalidDeterminant {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn v2_header() {
|
||||
let mut file =
|
||||
std::io::BufReader::new(std::fs::File::open("examples/bobl.replay").unwrap());
|
||||
let header = match rply::decode(&mut file).unwrap().header {
|
||||
rply::Header::V0V1(_) => panic!("Version too low"),
|
||||
rply::Header::V2(h) => h,
|
||||
};
|
||||
assert_eq!(header.base.version, 2);
|
||||
assert_eq!(header.base.content_crc, 2_199_475_946);
|
||||
assert_eq!(header.base.initial_state_size, 2531);
|
||||
assert_eq!(header.base.identifier, 1_761_326_589);
|
||||
assert_eq!(header.frame_count, 6383);
|
||||
assert_eq!(header.block_size, 128);
|
||||
assert_eq!(header.superblock_size, 16);
|
||||
assert_eq!(header.checkpoint_commit_interval, 4);
|
||||
assert_eq!(header.checkpoint_commit_threshold, 2);
|
||||
assert_eq!(header.checkpoint_compression, rply::Compression::None);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,885 @@
|
||||
use std::io::Write;
|
||||
|
||||
use crate::{
|
||||
InvalidDeterminant,
|
||||
clock::{self, Timer},
|
||||
statestream,
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
// #[repr(usize)]
|
||||
// pub enum HeaderV0V1Part {
|
||||
// Magic = 0,
|
||||
// Version = 4,
|
||||
// CRC = 8,
|
||||
// StateSize = 12,
|
||||
// Identifier = 16,
|
||||
// HeaderLen = 24,
|
||||
// }
|
||||
// #[repr(usize)]
|
||||
// pub enum HeaderV2Part {
|
||||
// FrameCount = 24,
|
||||
// BlockSize = 28,
|
||||
// SuperblockSize = 32,
|
||||
// CheckpointConfig = 36,
|
||||
// HeaderLen = 40,
|
||||
// }
|
||||
// const HEADER_V0V1_LEN_BYTES: usize = HeaderV0V1Part::HeaderLen as usize;
|
||||
const HEADERV2_LEN_BYTES: usize = 40;
|
||||
|
||||
// const VERSION: u32 = 2;
|
||||
const MAGIC: u32 = 0x4253_5632;
|
||||
|
||||
#[repr(u8)]
|
||||
#[non_exhaustive]
|
||||
#[derive(Debug)]
|
||||
pub enum FrameToken {
|
||||
Invalid = 0,
|
||||
Regular = b'f',
|
||||
Checkpoint = b'c',
|
||||
Checkpoint2 = b'C',
|
||||
}
|
||||
impl From<u8> for FrameToken {
|
||||
fn from(value: u8) -> Self {
|
||||
match value {
|
||||
b'f' => FrameToken::Regular,
|
||||
b'c' => FrameToken::Checkpoint,
|
||||
b'C' => FrameToken::Checkpoint2,
|
||||
_ => FrameToken::Invalid,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl From<FrameToken> for u8 {
|
||||
fn from(value: FrameToken) -> Self {
|
||||
match value {
|
||||
FrameToken::Invalid => 0,
|
||||
FrameToken::Regular => b'f',
|
||||
FrameToken::Checkpoint => b'c',
|
||||
FrameToken::Checkpoint2 => b'C',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(u8)]
|
||||
#[non_exhaustive]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub enum Compression {
|
||||
None = 0,
|
||||
Zlib = 1,
|
||||
Zstd = 2,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for Compression {
|
||||
type Error = InvalidDeterminant;
|
||||
|
||||
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
|
||||
match value {
|
||||
0 => Ok(Compression::None),
|
||||
1 => Ok(Compression::Zlib),
|
||||
2 => Ok(Compression::Zstd),
|
||||
_ => Err(InvalidDeterminant(value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Compression> for u8 {
|
||||
fn from(value: Compression) -> Self {
|
||||
match value {
|
||||
Compression::None => 0,
|
||||
Compression::Zlib => 1,
|
||||
Compression::Zstd => 2,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(u8)]
|
||||
#[non_exhaustive]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub enum Encoding {
|
||||
Raw = 0,
|
||||
Statestream = 1,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for Encoding {
|
||||
type Error = InvalidDeterminant;
|
||||
|
||||
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
|
||||
match value {
|
||||
0 => Ok(Encoding::Raw),
|
||||
1 => Ok(Encoding::Statestream),
|
||||
_ => Err(InvalidDeterminant(value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Encoding> for u8 {
|
||||
fn from(value: Encoding) -> Self {
|
||||
match value {
|
||||
Encoding::Raw => 0,
|
||||
Encoding::Statestream => 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HeaderBase {
|
||||
pub version: u32,
|
||||
pub content_crc: u32,
|
||||
pub initial_state_size: u32,
|
||||
pub identifier: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HeaderV2 {
|
||||
pub base: HeaderBase,
|
||||
pub frame_count: u32,
|
||||
pub block_size: u32,
|
||||
pub superblock_size: u32,
|
||||
pub checkpoint_commit_interval: u8,
|
||||
pub checkpoint_commit_threshold: u8,
|
||||
pub checkpoint_compression: Compression,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Header {
|
||||
V0V1(HeaderBase),
|
||||
V2(HeaderV2),
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum ReplayError {
|
||||
#[error("Invalid replay magic {0}")]
|
||||
Magic(u32),
|
||||
#[error("Unsupported version {0}")]
|
||||
Version(u32),
|
||||
#[error("Unsupported compression scheme {0}")]
|
||||
Compression(InvalidDeterminant),
|
||||
#[error("Unsupported encoding scheme {0}")]
|
||||
Encoding(InvalidDeterminant),
|
||||
#[error("I/O Error")]
|
||||
IO(#[from] std::io::Error),
|
||||
#[error("Too many frames to {0} fit framecount header")]
|
||||
TooManyFrames(std::num::TryFromIntError),
|
||||
#[error("Coreless frame read for version 0 not possible")]
|
||||
NoCoreRead(),
|
||||
#[error("Checkpoint too big {0}")]
|
||||
CheckpointTooBig(std::num::TryFromIntError),
|
||||
#[error("Frame too long {0}")]
|
||||
FrameTooLong(std::num::TryFromIntError),
|
||||
#[error("Frame has too many key events {0}")]
|
||||
TooManyKeyEvents(std::num::TryFromIntError),
|
||||
#[error("Frame has too many input events {0}")]
|
||||
TooManyInputEvents(std::num::TryFromIntError),
|
||||
#[error("Invalid frame token {0}")]
|
||||
BadFrameToken(u8),
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, ReplayError>;
|
||||
|
||||
pub struct ReplayDecoder<R: std::io::BufRead> {
|
||||
rply: R,
|
||||
pub header: Header,
|
||||
pub initial_state: Vec<u8>,
|
||||
pub frame_number: u64,
|
||||
ss_state: statestream::Ctx,
|
||||
}
|
||||
|
||||
impl<R: std::io::BufRead> ReplayDecoder<R> {
|
||||
/// Creates a [`ReplayDecoder`] for the given buffered readable stream.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`ReplayError::IO`]: Some issue with the read stream, e.g. insufficient length or unexpected end
|
||||
/// [`ReplayError::Magic`]: Invalid magic number at beginning of file
|
||||
/// [`ReplayError::Version`]: Version identifier not recognized by parser
|
||||
/// [`ReplayError::Compression`]: Unsupported compression scheme for checkpoints
|
||||
pub fn new(mut rply: R) -> Result<ReplayDecoder<R>> {
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
let magic = rply.read_u32::<LittleEndian>()?;
|
||||
if magic != MAGIC {
|
||||
return Err(ReplayError::Magic(magic));
|
||||
}
|
||||
let version = rply.read_u32::<LittleEndian>()?;
|
||||
if version > 2 {
|
||||
return Err(ReplayError::Version(version));
|
||||
}
|
||||
let content_crc = rply.read_u32::<LittleEndian>()?;
|
||||
let initial_state_size = rply.read_u32::<LittleEndian>()?;
|
||||
let identifier = rply.read_u64::<LittleEndian>()?;
|
||||
let base = HeaderBase {
|
||||
version,
|
||||
content_crc,
|
||||
initial_state_size,
|
||||
identifier,
|
||||
};
|
||||
let mut initial_state = vec![0; initial_state_size as usize];
|
||||
if version < 2 {
|
||||
rply.read_exact(initial_state.as_mut_slice())?;
|
||||
return Ok(ReplayDecoder {
|
||||
header: Header::V0V1(base),
|
||||
rply,
|
||||
initial_state,
|
||||
frame_number: 0,
|
||||
ss_state: statestream::Ctx::new(1, 1),
|
||||
});
|
||||
}
|
||||
let frame_count = rply.read_u32::<LittleEndian>()?;
|
||||
let block_size = rply.read_u32::<LittleEndian>()?;
|
||||
let superblock_size = rply.read_u32::<LittleEndian>()?;
|
||||
let cp_config = rply.read_u32::<LittleEndian>()?;
|
||||
let checkpoint_commit_interval = (cp_config >> 24) as u8;
|
||||
let checkpoint_commit_threshold = ((cp_config >> 16) & 0xFF) as u8;
|
||||
let checkpoint_compression = Compression::try_from(((cp_config >> 8) & 0xFF) as u8)
|
||||
.map_err(ReplayError::Compression)?;
|
||||
let mut replay = ReplayDecoder {
|
||||
rply,
|
||||
initial_state,
|
||||
header: Header::V2(HeaderV2 {
|
||||
base,
|
||||
frame_count,
|
||||
block_size,
|
||||
superblock_size,
|
||||
checkpoint_commit_interval,
|
||||
checkpoint_commit_threshold,
|
||||
checkpoint_compression,
|
||||
}),
|
||||
frame_number: 0,
|
||||
ss_state: statestream::Ctx::new(block_size, superblock_size),
|
||||
};
|
||||
replay.decode_initial_checkpoint()?;
|
||||
Ok(replay)
|
||||
}
|
||||
|
||||
pub fn inner(&mut self) -> &mut R {
|
||||
&mut self.rply
|
||||
}
|
||||
|
||||
/// Reads keyboard event records at the current input position. Only really appropriate to explicitly call for v0 replays.
|
||||
/// # Errors
|
||||
/// [`ReplayError::IO`]: Unexpected end of stream or other I/O error
|
||||
pub fn read_key_events(&mut self, frame: &mut Frame) -> Result<()> {
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
let rply = &mut self.rply;
|
||||
let key_count = rply.read_u8()? as usize;
|
||||
frame.key_events.resize_with(key_count, Default::default);
|
||||
for ki in 0..key_count {
|
||||
/*
|
||||
down, padding, mod_x2, code_x4, char_x4
|
||||
*/
|
||||
let down = rply.read_u8()?;
|
||||
let _ = rply.read_u8()?; // padding
|
||||
let modf = rply.read_u16::<LittleEndian>()?;
|
||||
let code = rply.read_u32::<LittleEndian>()?;
|
||||
let chr = rply.read_u32::<LittleEndian>()?;
|
||||
let key_data = KeyData {
|
||||
down,
|
||||
/* buf[1] is padding */
|
||||
modf,
|
||||
code,
|
||||
chr,
|
||||
};
|
||||
frame.key_events[ki] = key_data;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reads an end of frame marker at the current input position. Only really appropriate to explicitly call for v0 replays.
|
||||
/// # Errors
|
||||
/// [`ReplayError::IO`]: Unexpected end of stream or other I/O error
|
||||
/// [`ReplayError::Compression`]: Unsupported compression scheme
|
||||
/// [`ReplayError::Encoding`]: Unsupported encoding scheme
|
||||
/// [`ReplayError::BadFrameToken`]: Frame token not recognized or misaligned
|
||||
/// [`ReplayError::CheckpointTooBig`]: Tried to read a checkpoint bigger than the address space
|
||||
pub fn read_end_of_frame(&mut self, frame: &mut Frame) -> Result<()> {
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
let rply = &mut self.rply;
|
||||
let tok = rply.read_u8()?;
|
||||
match FrameToken::from(tok) {
|
||||
FrameToken::Regular => {
|
||||
frame.checkpoint_compression = Compression::None;
|
||||
frame.checkpoint_encoding = Encoding::Raw;
|
||||
frame.checkpoint_bytes.clear();
|
||||
}
|
||||
FrameToken::Checkpoint => {
|
||||
frame.checkpoint_compression = Compression::None;
|
||||
frame.checkpoint_encoding = Encoding::Raw;
|
||||
let cp_size = usize::try_from(rply.read_u64::<LittleEndian>()?)
|
||||
.map_err(ReplayError::CheckpointTooBig)?;
|
||||
frame.checkpoint_bytes.resize(cp_size, 0);
|
||||
rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?;
|
||||
}
|
||||
FrameToken::Checkpoint2 => {
|
||||
self.decode_checkpoint(&mut frame.checkpoint_bytes)?;
|
||||
}
|
||||
_ => return Err(ReplayError::BadFrameToken(tok)),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reads a single button value at the current input position. Only appropriate for v0 replays and only if you are implementing an input callback for a core.
|
||||
/// # Errors
|
||||
/// [`ReplayError::IO`]: Unexpected end of stream or other I/O error
|
||||
pub fn read_v0_button(&mut self) -> Result<i16> {
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
self.rply
|
||||
.read_i16::<LittleEndian>()
|
||||
.map_err(ReplayError::IO)
|
||||
}
|
||||
|
||||
/// Reads a single frame at the current decoder position.
|
||||
/// # Errors
|
||||
/// [`ReplayError::IO`]: Unexpected end of stream or other I/O error
|
||||
/// [`ReplayError::Compression`]: Unsupported compression scheme
|
||||
/// [`ReplayError::Encoding`]: Unsupported encoding scheme
|
||||
/// [`ReplayError::BadFrameToken`]: Frame token not recognized or misaligned
|
||||
/// [`ReplayError::NoCoreRead`]: Tried to read a frame on a version 0 replay without a loaded core
|
||||
/// [`ReplayError::CheckpointTooBig`]: Tried to read a checkpoint bigger than the address space
|
||||
#[allow(clippy::too_many_lines)]
|
||||
pub fn read_frame(&mut self, frame: &mut Frame) -> Result<()> {
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
let stopwatch = clock::time(Timer::DecodeFrame);
|
||||
let vsn = self.header.version();
|
||||
if vsn == 0 {
|
||||
return Err(ReplayError::NoCoreRead());
|
||||
}
|
||||
if vsn > 1 {
|
||||
/* skip over the backref */
|
||||
let _ = self.rply.read_u32::<LittleEndian>()?;
|
||||
}
|
||||
self.read_key_events(frame)?;
|
||||
let rply = &mut self.rply;
|
||||
let input_count = rply.read_u16::<LittleEndian>()? as usize;
|
||||
frame
|
||||
.input_events
|
||||
.resize_with(input_count, Default::default);
|
||||
for ii in 0..input_count {
|
||||
/* port, device, idx, padding, id_x2, value_x2 */
|
||||
let port = rply.read_u8()?;
|
||||
let device = rply.read_u8()?;
|
||||
let idx = rply.read_u8()?;
|
||||
let _ = rply.read_u8()?;
|
||||
let id = rply.read_u16::<LittleEndian>()?;
|
||||
let val = rply.read_i16::<LittleEndian>()?;
|
||||
let inp_data = InputData {
|
||||
port,
|
||||
device,
|
||||
idx,
|
||||
id,
|
||||
val,
|
||||
};
|
||||
frame.input_events[ii] = inp_data;
|
||||
}
|
||||
self.read_end_of_frame(frame)?;
|
||||
self.frame_number += 1;
|
||||
drop(stopwatch);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn decode_initial_checkpoint(&mut self) -> Result<()> {
|
||||
let mut initial_state = std::mem::take(&mut self.initial_state);
|
||||
self.decode_checkpoint(&mut initial_state)?;
|
||||
self.initial_state = initial_state;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn decode_checkpoint(&mut self, checkpoint_bytes: &mut Vec<u8>) -> Result<()> {
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
let stopwatch = clock::time(Timer::DecodeCheckpoint);
|
||||
let rply = &mut self.rply;
|
||||
// read a 1 byte compression code
|
||||
let compression =
|
||||
Compression::try_from(rply.read_u8()?).map_err(ReplayError::Compression)?;
|
||||
// read a 1 byte encoding code
|
||||
let encoding = Encoding::try_from(rply.read_u8()?).map_err(ReplayError::Encoding)?;
|
||||
// read a 4 byte uncompressed unencoded size
|
||||
let uc_ue_size = rply.read_u32::<LittleEndian>()? as usize;
|
||||
// read a 4 byte uncompressed encoded size
|
||||
#[expect(unused)]
|
||||
let uc_enc_size = rply.read_u32::<LittleEndian>()? as usize;
|
||||
// read a 4 byte compressed encoded size
|
||||
#[expect(unused)]
|
||||
let comp_enc_size = rply.read_u32::<LittleEndian>()? as usize;
|
||||
checkpoint_bytes.resize(uc_ue_size, 0);
|
||||
// maybe decompress
|
||||
match (compression, encoding) {
|
||||
(Compression::None, Encoding::Raw) => {
|
||||
rply.read_exact(checkpoint_bytes.as_mut_slice())?;
|
||||
}
|
||||
(Compression::None, Encoding::Statestream) => {
|
||||
let mut ss_decoder =
|
||||
statestream::Decoder::new(rply, &mut self.ss_state, uc_ue_size);
|
||||
std::io::copy(
|
||||
&mut ss_decoder,
|
||||
&mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
|
||||
)?;
|
||||
}
|
||||
(Compression::Zlib, Encoding::Raw) => {
|
||||
use flate2::bufread::ZlibDecoder;
|
||||
let mut decoder = ZlibDecoder::new(rply);
|
||||
std::io::copy(
|
||||
&mut decoder,
|
||||
&mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
|
||||
)?;
|
||||
}
|
||||
(Compression::Zlib, Encoding::Statestream) => {
|
||||
use flate2::bufread::ZlibDecoder;
|
||||
let mut decoder = ZlibDecoder::new(rply);
|
||||
let mut ss_decoder =
|
||||
statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size);
|
||||
std::io::copy(
|
||||
&mut ss_decoder,
|
||||
&mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
|
||||
)?;
|
||||
}
|
||||
(Compression::Zstd, Encoding::Raw) => {
|
||||
use zstd::Decoder;
|
||||
let mut decoder = Decoder::with_buffer(rply)?.single_frame();
|
||||
std::io::copy(
|
||||
&mut decoder,
|
||||
&mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
|
||||
)?;
|
||||
}
|
||||
(Compression::Zstd, Encoding::Statestream) => {
|
||||
use zstd::Decoder;
|
||||
let mut decoder = Decoder::with_buffer(rply)?.single_frame();
|
||||
let mut ss_decoder =
|
||||
statestream::Decoder::new(&mut decoder, &mut self.ss_state, uc_ue_size);
|
||||
std::io::copy(
|
||||
&mut ss_decoder,
|
||||
&mut std::io::Cursor::new(checkpoint_bytes.as_mut_slice()),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
drop(stopwatch);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a [`ReplayDecoder`] for the given buffered readable stream.
|
||||
///
|
||||
/// # Errors
|
||||
/// See [`ReplayDecoder::new`].
|
||||
pub fn decode<R: std::io::BufRead>(rply: R) -> Result<ReplayDecoder<R>> {
|
||||
ReplayDecoder::new(rply)
|
||||
}
|
||||
|
||||
pub struct ReplayEncoder<'a, W: std::io::Write + std::io::Seek> {
|
||||
rply: &'a mut W,
|
||||
pub header: Header,
|
||||
pub frame_number: u64,
|
||||
last_pos: u64,
|
||||
ss_state: statestream::Ctx,
|
||||
finished: bool,
|
||||
}
|
||||
|
||||
impl<'w, W: std::io::Write + std::io::Seek> ReplayEncoder<'w, W> {
|
||||
/// Creates a [`ReplayEncoder`] for the given writable and seekable stream.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`ReplayError::IO`]: Some issue with the write stream, e.g. unexpected end
|
||||
/// [`ReplayError::Version`]: Version identifier not supported by writer
|
||||
/// [`ReplayError::Compression`]: Unsupported compression scheme for checkpoints
|
||||
pub fn new<'s>(
|
||||
header: Header,
|
||||
initial_state: &'s [u8],
|
||||
rply: &'w mut W,
|
||||
) -> Result<ReplayEncoder<'w, W>> {
|
||||
if header.version() != 2 {
|
||||
return Err(ReplayError::Version(header.version()));
|
||||
}
|
||||
let ss_state = statestream::Ctx::new(header.block_size(), header.superblock_size());
|
||||
let mut replay = ReplayEncoder {
|
||||
rply,
|
||||
header,
|
||||
frame_number: 0,
|
||||
last_pos: 0,
|
||||
ss_state,
|
||||
finished: false,
|
||||
};
|
||||
replay.write_header()?;
|
||||
if !initial_state.is_empty() {
|
||||
replay.encode_initial_checkpoint(initial_state)?;
|
||||
}
|
||||
replay.last_pos = replay.rply.stream_position()?;
|
||||
Ok(replay)
|
||||
}
|
||||
fn write_header(&mut self) -> Result<()> {
|
||||
use byteorder::{LittleEndian, WriteBytesExt};
|
||||
self.header
|
||||
.set_frame_count(u32::try_from(self.frame_number).unwrap_or_default());
|
||||
let old_pos = self.rply.stream_position()?;
|
||||
self.rply.seek(std::io::SeekFrom::Start(0))?;
|
||||
self.rply.write_u32::<LittleEndian>(MAGIC)?;
|
||||
self.rply.write_u32::<LittleEndian>(2)?;
|
||||
self.rply
|
||||
.write_u32::<LittleEndian>(self.header.content_crc())?;
|
||||
// state size
|
||||
self.rply
|
||||
.write_u32::<LittleEndian>(self.header.initial_state_size())?;
|
||||
self.rply
|
||||
.write_u64::<LittleEndian>(self.header.identifier())?;
|
||||
self.rply.write_u32::<LittleEndian>(
|
||||
u32::try_from(self.header.frame_count().unwrap())
|
||||
.map_err(ReplayError::TooManyFrames)?,
|
||||
)?;
|
||||
self.rply
|
||||
.write_u32::<LittleEndian>(self.header.block_size())?;
|
||||
self.rply
|
||||
.write_u32::<LittleEndian>(self.header.superblock_size())?;
|
||||
let cp_interval = u32::from(self.header.checkpoint_commit_interval());
|
||||
let cp_threshold = u32::from(self.header.checkpoint_commit_threshold());
|
||||
let cp_compression = u32::from(u8::from(self.header.checkpoint_compression()));
|
||||
self.rply.write_u32::<LittleEndian>(
|
||||
(cp_interval << 24) | (cp_threshold << 16) | (cp_compression << 8),
|
||||
)?;
|
||||
self.rply.seek(std::io::SeekFrom::Start(old_pos))?;
|
||||
Ok(())
|
||||
}
|
||||
fn encode_checkpoint(&mut self, checkpoint: &[u8], frame: u64) -> Result<()> {
|
||||
use byteorder::{LittleEndian, WriteBytesExt};
|
||||
let stopwatch = clock::time(Timer::EncodeCheckpoint);
|
||||
let compression = self.header.checkpoint_compression();
|
||||
let encoding = Encoding::Statestream;
|
||||
self.rply.write_u8(u8::from(compression))?;
|
||||
self.rply.write_u8(u8::from(encoding))?;
|
||||
// write unencoded uncompressed size
|
||||
let full_size = u32::try_from(checkpoint.len()).map_err(ReplayError::CheckpointTooBig)?;
|
||||
self.rply.write_u32::<LittleEndian>(full_size)?;
|
||||
let size_pos = self.rply.stream_position()?;
|
||||
// can't yet write encoded uncompressed size, just write zeros for now
|
||||
// write encoded compressed size
|
||||
self.rply.write_u32::<LittleEndian>(0)?;
|
||||
// write encoded compressed bytes
|
||||
self.rply.write_u32::<LittleEndian>(0)?;
|
||||
let (encoded_size, compressed_size) = match (compression, encoding) {
|
||||
(Compression::None, Encoding::Raw) => {
|
||||
self.rply.write_all(checkpoint)?;
|
||||
(full_size, full_size)
|
||||
}
|
||||
(Compression::None, Encoding::Statestream) => {
|
||||
let encoder = statestream::Encoder::new(&mut self.rply, &mut self.ss_state);
|
||||
let encoded_size = encoder.encode_checkpoint(checkpoint, frame)?;
|
||||
(encoded_size, encoded_size)
|
||||
}
|
||||
(Compression::Zlib, Encoding::Raw) => {
|
||||
use flate2::write::ZlibEncoder;
|
||||
let here_pos = self.rply.stream_position()?;
|
||||
let mut encoder = ZlibEncoder::new(&mut self.rply, flate2::Compression::default());
|
||||
let encoded_size = full_size;
|
||||
encoder.write_all(checkpoint)?;
|
||||
encoder.finish()?;
|
||||
let compressed_size = u32::try_from(self.rply.stream_position()? - here_pos)
|
||||
.map_err(ReplayError::CheckpointTooBig)?;
|
||||
(encoded_size, compressed_size)
|
||||
}
|
||||
(Compression::Zlib, Encoding::Statestream) => {
|
||||
use flate2::write::ZlibEncoder;
|
||||
let here_pos = self.rply.stream_position()?;
|
||||
let mut compressor =
|
||||
ZlibEncoder::new(&mut self.rply, flate2::Compression::default());
|
||||
let encoder = statestream::Encoder::new(&mut compressor, &mut self.ss_state);
|
||||
let encoded_size = encoder.encode_checkpoint(checkpoint, frame)?;
|
||||
compressor.finish()?;
|
||||
let compressed_size = u32::try_from(self.rply.stream_position()? - here_pos)
|
||||
.map_err(ReplayError::CheckpointTooBig)?;
|
||||
(encoded_size, compressed_size)
|
||||
}
|
||||
(Compression::Zstd, Encoding::Raw) => {
|
||||
let here_pos = self.rply.stream_position()?;
|
||||
let mut encoder = zstd::Encoder::new(&mut self.rply, 16)?;
|
||||
encoder.write_all(checkpoint)?;
|
||||
encoder.finish()?;
|
||||
let encoded_size = full_size;
|
||||
let compressed_size = u32::try_from(self.rply.stream_position()? - here_pos)
|
||||
.map_err(ReplayError::CheckpointTooBig)?;
|
||||
(encoded_size, compressed_size)
|
||||
}
|
||||
(Compression::Zstd, Encoding::Statestream) => {
|
||||
let here_pos = self.rply.stream_position()?;
|
||||
let mut compressor = zstd::Encoder::new(&mut self.rply, 16)?;
|
||||
let encoder = statestream::Encoder::new(&mut compressor, &mut self.ss_state);
|
||||
let encoded_size = encoder.encode_checkpoint(checkpoint, frame)?;
|
||||
compressor.finish()?;
|
||||
let compressed_size = u32::try_from(self.rply.stream_position()? - here_pos)
|
||||
.map_err(ReplayError::CheckpointTooBig)?;
|
||||
(encoded_size, compressed_size)
|
||||
}
|
||||
};
|
||||
let end_pos = self.rply.stream_position()?;
|
||||
self.rply.seek(std::io::SeekFrom::Start(size_pos))?;
|
||||
// write encoded compressed size
|
||||
self.rply.write_u32::<LittleEndian>(encoded_size)?;
|
||||
// write encoded compressed bytes
|
||||
self.rply.write_u32::<LittleEndian>(compressed_size)?;
|
||||
self.rply.seek(std::io::SeekFrom::Start(end_pos))?;
|
||||
drop(stopwatch);
|
||||
Ok(())
|
||||
}
|
||||
fn encode_initial_checkpoint(&mut self, checkpoint: &[u8]) -> Result<()> {
|
||||
self.rply
|
||||
.seek(std::io::SeekFrom::Start(HEADERV2_LEN_BYTES as u64))?;
|
||||
self.encode_checkpoint(checkpoint, 0)?;
|
||||
let encoded_size = self.rply.stream_position()? - HEADERV2_LEN_BYTES as u64;
|
||||
self.header.set_initial_state_size(
|
||||
u32::try_from(encoded_size).map_err(ReplayError::CheckpointTooBig)?,
|
||||
);
|
||||
// Have to rewrite header to account for initial state size
|
||||
self.write_header()?;
|
||||
self.last_pos = self.rply.stream_position()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes a single frame at the current encoder position.
|
||||
/// # Errors
|
||||
/// [`ReplayError::FrameTooLong`]: Frame encoded to more than 2^32 bytes, backrefs invalid
|
||||
/// [`ReplayError::TooManyKeyEvents`]: More key events than allowed by spec
|
||||
/// [`ReplayError::TooManyInputEvents`]: More input events than allowed by spec
|
||||
/// [`ReplayError::CheckpointTooBig`]: Checkpoint data takes up more than 2^32 bytes
|
||||
pub fn write_frame(&mut self, frame: &Frame) -> Result<()> {
|
||||
use byteorder::{LittleEndian, WriteBytesExt};
|
||||
let stopwatch = clock::time(Timer::EncodeFrame);
|
||||
let start_pos = self.rply.stream_position()?;
|
||||
self.rply.write_u32::<LittleEndian>(
|
||||
u32::try_from(start_pos - self.last_pos).map_err(ReplayError::FrameTooLong)?,
|
||||
)?;
|
||||
self.rply.write_u8(
|
||||
u8::try_from(frame.key_events.len()).map_err(ReplayError::TooManyKeyEvents)?,
|
||||
)?;
|
||||
for evt in &frame.key_events {
|
||||
self.rply.write_u8(evt.down)?;
|
||||
self.rply.write_u8(0)?; // padding
|
||||
self.rply.write_u16::<LittleEndian>(evt.modf)?;
|
||||
self.rply.write_u32::<LittleEndian>(evt.code)?;
|
||||
self.rply.write_u32::<LittleEndian>(evt.chr)?;
|
||||
}
|
||||
self.rply.write_u16::<LittleEndian>(
|
||||
u16::try_from(frame.input_events.len()).map_err(ReplayError::TooManyInputEvents)?,
|
||||
)?;
|
||||
for evt in &frame.input_events {
|
||||
self.rply.write_u8(evt.port)?;
|
||||
self.rply.write_u8(evt.device)?;
|
||||
self.rply.write_u8(evt.idx)?;
|
||||
self.rply.write_u8(0)?; // padding
|
||||
self.rply.write_u16::<LittleEndian>(evt.id)?;
|
||||
self.rply.write_i16::<LittleEndian>(evt.val)?;
|
||||
}
|
||||
if frame.checkpoint_bytes.is_empty() {
|
||||
self.rply.write_u8(u8::from(FrameToken::Regular))?;
|
||||
} else {
|
||||
self.rply.write_u8(u8::from(FrameToken::Checkpoint2))?;
|
||||
self.encode_checkpoint(&frame.checkpoint_bytes, self.frame_number)?;
|
||||
}
|
||||
self.frame_number += 1;
|
||||
self.last_pos = start_pos;
|
||||
drop(stopwatch);
|
||||
Ok(())
|
||||
}
|
||||
/// Finishes the encoding, writing the header in the process
|
||||
/// # Errors
|
||||
/// [`ReplayError::IO`]: Underlying writer fails to write header
|
||||
pub fn finish(&mut self) -> Result<()> {
|
||||
if self.finished {
|
||||
return Ok(());
|
||||
}
|
||||
self.write_header()?;
|
||||
self.finished = true;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: std::io::Write + std::io::Seek> Drop for ReplayEncoder<'_, W> {
|
||||
fn drop(&mut self) {
|
||||
self.finish().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a [`ReplayEncoder`] for the given writable & seekable stream.
|
||||
///
|
||||
/// # Errors
|
||||
/// See [`ReplayEncoder::new`].
|
||||
pub fn encode<'w, W: std::io::Write + std::io::Seek>(
|
||||
header: Header,
|
||||
initial_state: &[u8],
|
||||
rply: &'w mut W,
|
||||
) -> Result<ReplayEncoder<'w, W>> {
|
||||
ReplayEncoder::new(header, initial_state, rply)
|
||||
}
|
||||
|
||||
impl Header {
|
||||
fn base(&self) -> &HeaderBase {
|
||||
match self {
|
||||
Header::V0V1(header_base) => header_base,
|
||||
Header::V2(header_v2) => &header_v2.base,
|
||||
}
|
||||
}
|
||||
fn base_mut(&mut self) -> &mut HeaderBase {
|
||||
match self {
|
||||
Header::V0V1(header_base) => header_base,
|
||||
Header::V2(header_v2) => &mut header_v2.base,
|
||||
}
|
||||
}
|
||||
#[must_use]
|
||||
pub fn version(&self) -> u32 {
|
||||
self.base().version
|
||||
}
|
||||
#[must_use]
|
||||
pub fn content_crc(&self) -> u32 {
|
||||
self.base().content_crc
|
||||
}
|
||||
pub fn set_content_crc(&mut self, crc: u32) {
|
||||
self.base_mut().content_crc = crc;
|
||||
}
|
||||
#[must_use]
|
||||
pub fn identifier(&self) -> u64 {
|
||||
self.base().identifier
|
||||
}
|
||||
pub fn set_identifier(&mut self, id: u64) {
|
||||
self.base_mut().identifier = id;
|
||||
}
|
||||
#[must_use]
|
||||
pub fn initial_state_size(&self) -> u32 {
|
||||
self.base().initial_state_size
|
||||
}
|
||||
pub fn set_initial_state_size(&mut self, sz: u32) {
|
||||
self.base_mut().initial_state_size = sz;
|
||||
}
|
||||
#[must_use]
|
||||
pub fn frame_count(&self) -> Option<u64> {
|
||||
match self {
|
||||
Header::V0V1(_) => None,
|
||||
Header::V2(header_v2) => Some(u64::from(header_v2.frame_count)),
|
||||
}
|
||||
}
|
||||
pub fn set_frame_count(&mut self, frames: u32) {
|
||||
self.upgrade().frame_count = frames;
|
||||
}
|
||||
pub fn upgrade(&mut self) -> &mut HeaderV2 {
|
||||
if let Header::V0V1(base) = self {
|
||||
*self = Header::V2(HeaderV2 {
|
||||
base: base.clone(),
|
||||
frame_count: 0,
|
||||
block_size: 256,
|
||||
superblock_size: 256,
|
||||
checkpoint_commit_interval: 8,
|
||||
checkpoint_commit_threshold: 4,
|
||||
checkpoint_compression: Compression::None,
|
||||
});
|
||||
}
|
||||
let Header::V2(v2) = self else { unreachable!() };
|
||||
v2.base.version = 2;
|
||||
v2
|
||||
}
|
||||
#[must_use]
|
||||
pub fn block_size(&self) -> u32 {
|
||||
match self {
|
||||
Header::V0V1(_) => 0,
|
||||
Header::V2(header_v2) => header_v2.block_size,
|
||||
}
|
||||
}
|
||||
pub fn set_block_size(&mut self, sz: u32) {
|
||||
let v2 = self.upgrade();
|
||||
v2.block_size = sz;
|
||||
}
|
||||
#[must_use]
|
||||
pub fn superblock_size(&self) -> u32 {
|
||||
match self {
|
||||
Header::V0V1(_) => 0,
|
||||
Header::V2(header_v2) => header_v2.superblock_size,
|
||||
}
|
||||
}
|
||||
pub fn set_superblock_size(&mut self, sz: u32) {
|
||||
let v2 = self.upgrade();
|
||||
v2.superblock_size = sz;
|
||||
}
|
||||
#[must_use]
|
||||
pub fn checkpoint_commit_interval(&self) -> u8 {
|
||||
match self {
|
||||
Header::V0V1(_) => 0,
|
||||
Header::V2(header_v2) => header_v2.checkpoint_commit_interval,
|
||||
}
|
||||
}
|
||||
#[must_use]
|
||||
pub fn checkpoint_commit_threshold(&self) -> u8 {
|
||||
match self {
|
||||
Header::V0V1(_) => 0,
|
||||
Header::V2(header_v2) => header_v2.checkpoint_commit_threshold,
|
||||
}
|
||||
}
|
||||
pub fn set_checkpoint_commit_settings(&mut self, interval: u8, threshold: u8) {
|
||||
let v2 = self.upgrade();
|
||||
v2.checkpoint_commit_interval = interval;
|
||||
v2.checkpoint_commit_threshold = threshold;
|
||||
}
|
||||
#[must_use]
|
||||
pub fn checkpoint_compression(&self) -> Compression {
|
||||
match self {
|
||||
Header::V0V1(_) => Compression::None,
|
||||
Header::V2(header_v2) => header_v2.checkpoint_compression,
|
||||
}
|
||||
}
|
||||
pub fn set_checkpoint_compression(&mut self, compression: Compression) {
|
||||
let v2 = self.upgrade();
|
||||
v2.checkpoint_compression = compression;
|
||||
}
|
||||
}
|
||||
#[derive(Debug, Default)]
|
||||
pub struct KeyData {
|
||||
pub down: u8,
|
||||
pub modf: u16,
|
||||
pub code: u32,
|
||||
pub chr: u32,
|
||||
}
|
||||
#[derive(Debug, Default)]
|
||||
pub struct InputData {
|
||||
pub port: u8,
|
||||
pub device: u8,
|
||||
pub idx: u8,
|
||||
pub id: u16,
|
||||
pub val: i16,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Frame {
|
||||
pub key_events: Vec<KeyData>,
|
||||
pub input_events: Vec<InputData>,
|
||||
pub checkpoint_bytes: Vec<u8>,
|
||||
pub checkpoint_compression: Compression,
|
||||
pub checkpoint_encoding: Encoding,
|
||||
}
|
||||
|
||||
impl Frame {
|
||||
#[must_use]
|
||||
pub fn inputs(&self) -> String {
|
||||
use std::fmt::Write;
|
||||
let mut output = String::new();
|
||||
for i in 0..self.input_events.len() {
|
||||
let evt = &self.input_events[i];
|
||||
write!(output, "{:03}:{:016b}", evt.id, evt.val).unwrap();
|
||||
if i + 1 < self.input_events.len() {
|
||||
write!(output, "--").unwrap();
|
||||
}
|
||||
}
|
||||
output
|
||||
}
|
||||
pub fn drop_checkpoint(&mut self) {
|
||||
self.checkpoint_bytes.clear();
|
||||
self.checkpoint_compression = Compression::None;
|
||||
self.checkpoint_encoding = Encoding::Raw;
|
||||
}
|
||||
pub fn clear(&mut self) {
|
||||
self.key_events.clear();
|
||||
self.input_events.clear();
|
||||
self.drop_checkpoint();
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Frame {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
key_events: Vec::default(),
|
||||
input_events: Vec::default(),
|
||||
checkpoint_bytes: Vec::default(),
|
||||
checkpoint_compression: Compression::None,
|
||||
checkpoint_encoding: Encoding::Raw,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,418 @@
|
||||
mod blockindex;
|
||||
use crate::{
|
||||
InvalidDeterminant,
|
||||
clock::{self, Counter, Timer},
|
||||
};
|
||||
use blockindex::BlockIndex;
|
||||
use std::io::Write;
|
||||
|
||||
#[repr(u8)]
|
||||
#[non_exhaustive]
|
||||
#[derive(Debug)]
|
||||
pub enum SSToken {
|
||||
Start = 0,
|
||||
NewBlock = 1,
|
||||
NewSuperblock = 2,
|
||||
SuperblockSeq = 3,
|
||||
}
|
||||
impl TryFrom<u8> for SSToken {
|
||||
type Error = InvalidDeterminant;
|
||||
|
||||
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
|
||||
match value {
|
||||
0 => Ok(SSToken::Start),
|
||||
1 => Ok(SSToken::NewBlock),
|
||||
2 => Ok(SSToken::NewSuperblock),
|
||||
3 => Ok(SSToken::SuperblockSeq),
|
||||
_ => Err(InvalidDeterminant(value)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SSToken> for u8 {
|
||||
fn from(value: SSToken) -> Self {
|
||||
match value {
|
||||
SSToken::Start => 0,
|
||||
SSToken::NewBlock => 1,
|
||||
SSToken::NewSuperblock => 2,
|
||||
SSToken::SuperblockSeq => 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Ctx {
|
||||
block_size: u32,
|
||||
superblock_size: u32,
|
||||
last_state: Vec<u8>,
|
||||
last_superseq: Vec<u32>,
|
||||
block_index: BlockIndex<u8>,
|
||||
superblock_index: BlockIndex<u32>,
|
||||
use_encode_state_comparisons: bool,
|
||||
}
|
||||
|
||||
impl Ctx {
|
||||
pub fn new(block_size: u32, superblock_size: u32) -> Self {
|
||||
Self {
|
||||
block_size,
|
||||
superblock_size,
|
||||
last_state: vec![],
|
||||
last_superseq: vec![],
|
||||
block_index: BlockIndex::new(block_size as usize),
|
||||
superblock_index: BlockIndex::new(superblock_size as usize),
|
||||
use_encode_state_comparisons: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Decoder<'r, 'c, R: std::io::Read> {
|
||||
reader: &'r mut R,
|
||||
ctx: &'c mut Ctx,
|
||||
state_size: usize,
|
||||
finished: bool,
|
||||
readout_cursor: usize,
|
||||
}
|
||||
|
||||
impl<'r, 'c, R: std::io::Read> Decoder<'r, 'c, R> {
|
||||
pub(crate) fn new(reader: &'r mut R, ctx: &'c mut Ctx, state_size: usize) -> Self {
|
||||
Self {
|
||||
reader,
|
||||
ctx,
|
||||
finished: false,
|
||||
readout_cursor: 0,
|
||||
state_size,
|
||||
}
|
||||
}
|
||||
fn readout(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
match buf.write(&self.ctx.last_state[self.readout_cursor..]) {
|
||||
Err(e) => Err(e),
|
||||
Ok(sz) => {
|
||||
self.readout_cursor += sz;
|
||||
Ok(sz)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ParseState {
|
||||
WaitForStart,
|
||||
WaitForSuperblockSeq,
|
||||
Finished,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum SSError {
|
||||
#[error("Invalid token {0}")]
|
||||
InvalidToken(#[from] InvalidDeterminant),
|
||||
#[error("Too many start tokens in stream")]
|
||||
TooManyStarts(),
|
||||
#[error("Unexpected {1:?} during {0:?}")]
|
||||
ParseError(ParseState, SSToken),
|
||||
#[error("Block {0} is the wrong size")]
|
||||
BlockWrongSize(u32),
|
||||
#[error("Superblock {0} is the wrong size")]
|
||||
SuperblockWrongSize(u32),
|
||||
#[error("Couldn't insert block at {1} on frame {0}")]
|
||||
BadBlockInsert(u64, u32),
|
||||
#[error("Couldn't insert superblock at {1} on frame {0}")]
|
||||
BadSuperblockInsert(u64, u32),
|
||||
}
|
||||
|
||||
impl<R: std::io::Read> std::io::Read for Decoder<'_, '_, R> {
|
||||
/* a slightly degenerate read implementation in that it will keep
|
||||
* calling read on the inner reader until a complete checkpoint is
|
||||
* read, then return 0 for subsequent reads */
|
||||
#[allow(clippy::too_many_lines)]
|
||||
fn read(&mut self, outbuf: &mut [u8]) -> std::io::Result<usize> {
|
||||
use ParseState as State;
|
||||
use rmp::decode as r;
|
||||
if self.finished {
|
||||
if self.readout_cursor == self.state_size {
|
||||
return Ok(0);
|
||||
}
|
||||
return self.readout(outbuf);
|
||||
}
|
||||
let stopwatch = clock::time(Timer::DecodeStatestream);
|
||||
let mut frame = 0;
|
||||
let mut state = State::WaitForStart;
|
||||
let mut buf = vec![0_u8; self.ctx.block_size as usize];
|
||||
let mut superblock = vec![0_u32; self.ctx.superblock_size as usize];
|
||||
loop {
|
||||
let tok: u8 = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||
match (
|
||||
state,
|
||||
SSToken::try_from(tok)
|
||||
.map_err(|e| std::io::Error::other(SSError::InvalidToken(e)))?,
|
||||
) {
|
||||
(State::WaitForStart, SSToken::Start) => {
|
||||
frame = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||
state = State::WaitForSuperblockSeq;
|
||||
}
|
||||
(_, SSToken::Start) => return Err(std::io::Error::other(SSError::TooManyStarts())),
|
||||
(State::WaitForSuperblockSeq, SSToken::NewBlock) => {
|
||||
let idx = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||
let bin_len = r::read_bin_len(self.reader).map_err(std::io::Error::other)?;
|
||||
if bin_len != self.ctx.block_size {
|
||||
return Err(std::io::Error::other(SSError::BlockWrongSize(bin_len)));
|
||||
}
|
||||
self.reader.read_exact(&mut buf)?;
|
||||
// hashes += 1;
|
||||
if !self
|
||||
.ctx
|
||||
.block_index
|
||||
.insert_exact(idx, Box::from(buf.clone()), frame)
|
||||
{
|
||||
return Err(std::io::Error::other(SSError::BadBlockInsert(frame, idx)));
|
||||
}
|
||||
}
|
||||
(State::WaitForSuperblockSeq, SSToken::NewSuperblock) => {
|
||||
let idx = r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||
let arr_len = r::read_array_len(self.reader).map_err(std::io::Error::other)?;
|
||||
if arr_len != self.ctx.superblock_size {
|
||||
return Err(std::io::Error::other(SSError::SuperblockWrongSize(arr_len)));
|
||||
}
|
||||
for superblock_elt in &mut superblock {
|
||||
*superblock_elt =
|
||||
r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||
}
|
||||
// hashes += 1;
|
||||
if !self.ctx.superblock_index.insert_exact(
|
||||
idx,
|
||||
Box::from(superblock.clone()),
|
||||
frame,
|
||||
) {
|
||||
return Err(std::io::Error::other(SSError::BadSuperblockInsert(
|
||||
frame, idx,
|
||||
)));
|
||||
}
|
||||
}
|
||||
(State::WaitForSuperblockSeq, SSToken::SuperblockSeq) => {
|
||||
let arr_len =
|
||||
r::read_array_len(self.reader).map_err(std::io::Error::other)? as usize;
|
||||
let last_state_valid = self.ctx.last_superseq.len() >= arr_len
|
||||
&& self.ctx.last_state.len() >= self.state_size;
|
||||
let block_byte_size = self.ctx.block_size as usize;
|
||||
let superblock_byte_size = self.ctx.superblock_size as usize * block_byte_size;
|
||||
let mut superseq = vec![0; arr_len];
|
||||
self.ctx.last_state.resize(self.state_size, 0);
|
||||
let mut skipped_superblocks = 0;
|
||||
let mut skipped_blocks = 0;
|
||||
for (superblock_i, superseq_sblk) in superseq.iter_mut().enumerate() {
|
||||
let superblock_idx =
|
||||
r::read_int(self.reader).map_err(std::io::Error::other)?;
|
||||
*superseq_sblk = superblock_idx;
|
||||
if last_state_valid
|
||||
&& self.ctx.last_superseq[superblock_i] == superblock_idx
|
||||
{
|
||||
// no need to copy bytes
|
||||
skipped_superblocks += 1;
|
||||
continue;
|
||||
}
|
||||
let superblock_data = self.ctx.superblock_index.get(superblock_idx);
|
||||
for (block_i, block_id) in superblock_data.iter().copied().enumerate() {
|
||||
if last_state_valid
|
||||
&& self
|
||||
.ctx
|
||||
.superblock_index
|
||||
.get(self.ctx.last_superseq[superblock_i])[block_i]
|
||||
== block_id
|
||||
{
|
||||
// no need to copy bytes
|
||||
skipped_blocks += 1;
|
||||
continue;
|
||||
}
|
||||
let block_start = (superblock_i * superblock_byte_size
|
||||
+ block_i * block_byte_size)
|
||||
.min(self.state_size);
|
||||
let block_end = (block_start + block_byte_size).min(self.state_size);
|
||||
let block_bytes = self.ctx.block_index.get(block_id);
|
||||
if block_end <= block_start {
|
||||
// This can happen in the last superblock if it was padded with extra blocks
|
||||
break;
|
||||
}
|
||||
self.ctx.last_state[block_start..block_end]
|
||||
.copy_from_slice(&block_bytes[0..(block_end - block_start)]);
|
||||
}
|
||||
}
|
||||
clock::count(Counter::DecSkippedSuperblocks, skipped_superblocks);
|
||||
clock::count(Counter::DecSkippedBlocks, skipped_blocks);
|
||||
self.ctx.last_superseq = superseq;
|
||||
state = State::Finished;
|
||||
self.finished = true;
|
||||
break;
|
||||
}
|
||||
(s, tok) => return Err(std::io::Error::other(SSError::ParseError(s, tok))),
|
||||
}
|
||||
}
|
||||
assert_eq!(state, State::Finished);
|
||||
drop(stopwatch);
|
||||
self.readout(outbuf)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Encoder<'w, 'c, W: std::io::Write> {
|
||||
writer: &'w mut W,
|
||||
ctx: &'c mut Ctx,
|
||||
}
|
||||
|
||||
/* Does not include the size of the str,arr,map,ext contents */
|
||||
fn rmp_size(m: rmp::Marker) -> usize {
|
||||
#[allow(
|
||||
clippy::enum_glob_use,
|
||||
reason = "If any new variants are added, the match will cease to be exhaustive"
|
||||
)]
|
||||
use rmp::Marker::*;
|
||||
match m {
|
||||
FixPos(_) | FixNeg(_) | Null | Reserved | False | True | FixMap(_) | FixArray(_)
|
||||
| FixStr(_) => 1,
|
||||
U8 | I8 | Bin8 | Str8 | FixExt1 | FixExt2 | FixExt4 | FixExt8 | FixExt16 => 2,
|
||||
U16 | I16 | Bin16 | Ext8 | Str16 | Array16 | Map16 => 3,
|
||||
Ext16 => 4,
|
||||
U32 | I32 | F32 | Bin32 | Str32 | Array32 | Map32 => 5,
|
||||
Ext32 => 6,
|
||||
U64 | I64 | F64 => 9,
|
||||
}
|
||||
}
|
||||
|
||||
impl<'w, 'c, W: std::io::Write> Encoder<'w, 'c, W> {
|
||||
pub(crate) fn new(writer: &'w mut W, ctx: &'c mut Ctx) -> Self {
|
||||
Self { writer, ctx }
|
||||
}
|
||||
#[allow(clippy::too_many_lines)]
|
||||
pub fn encode_checkpoint(mut self, checkpoint: &[u8], frame: u64) -> std::io::Result<u32> {
|
||||
use rmp::encode as r;
|
||||
let stopwatch = clock::time(Timer::EncodeStatestream);
|
||||
clock::count(Counter::EncTotalKBsIn, (checkpoint.len() / 1024) as u64);
|
||||
let mut bytes_out = 0;
|
||||
bytes_out += rmp_size(r::write_uint(
|
||||
&mut self.writer,
|
||||
u64::from(u8::from(SSToken::Start)),
|
||||
)?);
|
||||
bytes_out += rmp_size(r::write_uint(&mut self.writer, frame)?);
|
||||
let block_size = self.ctx.block_size as usize;
|
||||
let mut padded_block = vec![0; block_size];
|
||||
let superblock_size = self.ctx.superblock_size as usize;
|
||||
let superblock_size_bytes = block_size * superblock_size;
|
||||
let superblock_count = ((checkpoint.len() - 1) / superblock_size_bytes) + 1;
|
||||
clock::count(Counter::EncTotalSuperblocks, superblock_count as u64);
|
||||
clock::count(
|
||||
Counter::EncTotalBlocks,
|
||||
(((checkpoint.len() - 1) / block_size) + 1) as u64,
|
||||
);
|
||||
let mut reused_blocks = 0;
|
||||
let mut reused_superblocks = 0;
|
||||
let mut hashes = 0;
|
||||
let mut skipped_blocks = 0;
|
||||
let mut memcmps = 0;
|
||||
self.ctx
|
||||
.last_superseq
|
||||
.resize(superblock_count.max(self.ctx.last_superseq.len()), 0);
|
||||
let mut superblock_contents = vec![0_u32; superblock_size];
|
||||
let can_compare_saves = if self.ctx.last_state.len() < checkpoint.len() {
|
||||
self.ctx.last_state.clear();
|
||||
self.ctx.last_state.extend_from_slice(checkpoint);
|
||||
false
|
||||
} else {
|
||||
self.ctx.last_state.truncate(checkpoint.len());
|
||||
self.ctx.use_encode_state_comparisons
|
||||
};
|
||||
for (superblock_i, (superblock_bytes, last_state_superblock_bytes)) in (checkpoint
|
||||
.chunks(superblock_size_bytes)
|
||||
.zip(self.ctx.last_state.chunks(superblock_size_bytes)))
|
||||
.enumerate()
|
||||
{
|
||||
/* maybe: skip superblocks */
|
||||
if superblock_bytes.len() < superblock_size_bytes {
|
||||
let block_count = (superblock_bytes.len() - 1) / block_size + 1;
|
||||
if block_count + 1 < superblock_size {
|
||||
superblock_contents[(block_count + 1)..].fill(0);
|
||||
}
|
||||
}
|
||||
for (block_i, (block_bytes, last_state_block_bytes)) in (superblock_bytes
|
||||
.chunks(block_size)
|
||||
.zip(last_state_superblock_bytes.chunks(block_size)))
|
||||
.enumerate()
|
||||
{
|
||||
memcmps += u64::from(can_compare_saves);
|
||||
let found_block = if can_compare_saves
|
||||
&& block_bytes[..] == last_state_block_bytes[..block_bytes.len()]
|
||||
{
|
||||
skipped_blocks += 1;
|
||||
blockindex::Insertion {
|
||||
index: self
|
||||
.ctx
|
||||
.superblock_index
|
||||
.get(self.ctx.last_superseq[superblock_i])[block_i],
|
||||
is_new: false,
|
||||
}
|
||||
} else if block_bytes.len() < block_size {
|
||||
padded_block[block_bytes.len()..].fill(0);
|
||||
padded_block[..block_bytes.len()].copy_from_slice(block_bytes);
|
||||
hashes += 1;
|
||||
self.ctx.block_index.insert(&padded_block, frame)
|
||||
} else {
|
||||
hashes += 1;
|
||||
self.ctx.block_index.insert(block_bytes, frame)
|
||||
};
|
||||
superblock_contents[block_i] = found_block.index;
|
||||
if found_block.is_new {
|
||||
let block_out_bytes = self.ctx.block_index.get(found_block.index);
|
||||
bytes_out += rmp_size(r::write_uint(
|
||||
self.writer,
|
||||
u64::from(u8::from(SSToken::NewBlock)),
|
||||
)?);
|
||||
bytes_out +=
|
||||
rmp_size(r::write_uint(self.writer, u64::from(found_block.index))?);
|
||||
bytes_out += rmp_size(r::write_bin_len(self.writer, self.ctx.block_size)?);
|
||||
self.writer.write_all(block_out_bytes)?;
|
||||
bytes_out += block_out_bytes.len();
|
||||
} else {
|
||||
reused_blocks += 1;
|
||||
}
|
||||
}
|
||||
hashes += 1;
|
||||
let found_superblock = self
|
||||
.ctx
|
||||
.superblock_index
|
||||
.insert(&superblock_contents, frame);
|
||||
self.ctx.last_superseq[superblock_i] = found_superblock.index;
|
||||
if found_superblock.is_new {
|
||||
bytes_out += rmp_size(r::write_uint(
|
||||
self.writer,
|
||||
u64::from(u8::from(SSToken::NewSuperblock)),
|
||||
)?);
|
||||
bytes_out += rmp_size(r::write_uint(
|
||||
self.writer,
|
||||
u64::from(found_superblock.index),
|
||||
)?);
|
||||
bytes_out += rmp_size(r::write_array_len(self.writer, self.ctx.superblock_size)?);
|
||||
for blkid in &superblock_contents {
|
||||
bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*blkid))?);
|
||||
}
|
||||
} else {
|
||||
reused_superblocks += 1;
|
||||
}
|
||||
}
|
||||
clock::count(Counter::EncReusedBlocks, reused_blocks);
|
||||
clock::count(Counter::EncReusedSuperblocks, reused_superblocks);
|
||||
clock::count(Counter::EncSkippedBlocks, skipped_blocks);
|
||||
clock::count(Counter::EncMemCmps, memcmps);
|
||||
clock::count(Counter::EncHashes, hashes);
|
||||
self.ctx.last_superseq.truncate(superblock_count);
|
||||
bytes_out += rmp_size(r::write_uint(
|
||||
self.writer,
|
||||
u64::from(u8::from(SSToken::SuperblockSeq)),
|
||||
)?);
|
||||
bytes_out += rmp_size(r::write_array_len(
|
||||
self.writer,
|
||||
u32::try_from(superblock_count)
|
||||
.map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e)))?,
|
||||
)?);
|
||||
for super_id in &self.ctx.last_superseq {
|
||||
bytes_out += rmp_size(r::write_uint(self.writer, u64::from(*super_id))?);
|
||||
}
|
||||
drop(stopwatch);
|
||||
clock::count(Counter::EncTotalKBsOut, (bytes_out / 1024) as u64);
|
||||
u32::try_from(bytes_out)
|
||||
.map_err(|e| std::io::Error::other(crate::ReplayError::CheckpointTooBig(e)))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,110 @@
|
||||
use nohash_hasher::NoHashHasher;
|
||||
use smallvec::{SmallVec, smallvec};
|
||||
use std::{collections::HashMap, hash::BuildHasherDefault};
|
||||
use xxhash_rust::xxh3::xxh3_64 as xxh;
|
||||
|
||||
// struct Addition {
|
||||
// when:u64, // Frame on which some objects were added
|
||||
// index:u32, // Lowest index added on this frame
|
||||
// }
|
||||
|
||||
pub(crate) struct BlockIndex<
|
||||
T: bytemuck::Zeroable + bytemuck::AnyBitPattern + bytemuck::NoUninit + PartialEq,
|
||||
> {
|
||||
index: HashMap<u64, SmallVec<[u32; 4]>, BuildHasherDefault<NoHashHasher<u64>>>,
|
||||
objects: Vec<Box<[T]>>,
|
||||
hashes: Vec<u64>,
|
||||
//additions: Vec<Addition>,
|
||||
object_size: usize,
|
||||
}
|
||||
|
||||
pub(crate) struct Insertion {
|
||||
pub index: u32,
|
||||
pub is_new: bool,
|
||||
}
|
||||
|
||||
fn hash<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(val: &[T]) -> u64 {
|
||||
xxh(bytemuck::cast_slice(val))
|
||||
}
|
||||
|
||||
impl<T: bytemuck::Zeroable + bytemuck::AnyBitPattern + bytemuck::NoUninit + PartialEq>
|
||||
BlockIndex<T>
|
||||
{
|
||||
pub fn new(object_size: usize) -> Self {
|
||||
let mut index = HashMap::with_capacity_and_hasher(4096, BuildHasherDefault::default());
|
||||
let zeros = (vec![T::zeroed(); object_size]).into_boxed_slice();
|
||||
let zero_hash = hash(&zeros);
|
||||
index.insert(zero_hash, smallvec![0]);
|
||||
Self {
|
||||
index,
|
||||
object_size,
|
||||
objects: vec![zeros],
|
||||
hashes: vec![zero_hash],
|
||||
}
|
||||
}
|
||||
pub fn insert(&mut self, obj: &[T], _frame: u64) -> Insertion {
|
||||
assert_eq!(obj.len(), self.object_size);
|
||||
let hash = hash(obj);
|
||||
match self.index.entry(hash) {
|
||||
std::collections::hash_map::Entry::Occupied(mut e) => {
|
||||
if let Some(found) = e
|
||||
.get()
|
||||
.iter()
|
||||
.find(|o| obj == &*self.objects[(**o) as usize])
|
||||
{
|
||||
Insertion {
|
||||
index: *found,
|
||||
is_new: false,
|
||||
}
|
||||
} else {
|
||||
let copy = Box::from(obj);
|
||||
let idx = u32::try_from(self.objects.len()).unwrap();
|
||||
self.objects.push(copy);
|
||||
self.hashes.push(hash);
|
||||
e.get_mut().push(idx);
|
||||
Insertion {
|
||||
index: idx,
|
||||
is_new: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
std::collections::hash_map::Entry::Vacant(e) => {
|
||||
let copy = Box::from(obj);
|
||||
let idx = u32::try_from(self.objects.len()).unwrap();
|
||||
self.objects.push(copy);
|
||||
self.hashes.push(hash);
|
||||
e.insert(smallvec![idx]);
|
||||
Insertion {
|
||||
index: idx,
|
||||
is_new: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn insert_exact(&mut self, idx: u32, obj: Box<[T]>, _frame: u64) -> bool {
|
||||
assert_eq!(obj.len(), self.object_size);
|
||||
if self.objects.len() != idx as usize {
|
||||
return false;
|
||||
}
|
||||
let hash = hash(&obj);
|
||||
self.index.entry(hash).or_default().push(idx);
|
||||
self.objects.push(obj);
|
||||
self.hashes.push(hash);
|
||||
true
|
||||
}
|
||||
pub fn get(&self, which: u32) -> &[T] {
|
||||
&self.objects[which as usize]
|
||||
}
|
||||
#[expect(unused)]
|
||||
pub fn clear(&mut self) {
|
||||
self.index.clear();
|
||||
self.objects.truncate(1);
|
||||
self.hashes.truncate(1);
|
||||
self.index.insert(self.hashes[0], smallvec![0]);
|
||||
}
|
||||
#[expect(unused)]
|
||||
pub fn len(&self) -> usize {
|
||||
self.objects.len()
|
||||
}
|
||||
// remove_after, commit?
|
||||
}
|
||||
Reference in New Issue
Block a user