diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..8a926b7 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,305 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "bytemuck" +version = "1.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "cc" +version = "1.2.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2" +dependencies = [ + "find-msvc-tools", + "jobserver", + "libc", + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "find-msvc-tools" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" + +[[package]] +name = "flate2" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" +dependencies = [ + "crc32fast", + "libz-rs-sys", + "miniz_oxide", +] + +[[package]] +name = "getrandom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", +] + +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom", + "libc", +] + +[[package]] +name = "libc" +version = "0.2.177" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" + +[[package]] +name = "libz-rs-sys" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "840db8cf39d9ec4dd794376f38acc40d0fc65eec2a8f484f7fd375b84602becd" +dependencies = [ + "zlib-rs", +] + +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + +[[package]] +name = "proc-macro2" +version = "1.0.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce25767e7b499d1b604768e7cde645d14cc8584231ea6b295e9c9eb22c02e1d1" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "rmp" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rply-codec" +version = "0.1.0" +dependencies = [ + "bytemuck", + "byteorder", + "flate2", + "nohash-hasher", + "rmp", + "smallvec", + "thiserror", + "xxhash-rust", + "zstd", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "simd-adler32" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "syn" +version = "2.0.108" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "462eeb75aeb73aea900253ce739c8e18a67423fadf006037cd3ff27e82748a06" + +[[package]] +name = "wasip2" +version = "1.0.1+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wit-bindgen" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" + +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + +[[package]] +name = "zlib-rs" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f06ae92f42f5e5c42443fd094f245eb656abf56dd7cce9b8b263236565e00f2" + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 25c11f0..b599b36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,12 @@ version = "0.1.0" edition = "2024" [dependencies] +bytemuck = { version = "1.24.0", features = ["const_zeroed"] } byteorder = "1.5.0" flate2 = { version = "1.1.5", features = ["zlib-rs"] } +nohash-hasher = "0.2.0" rmp = "0.8.14" +smallvec = "1.15.1" thiserror = "2.0.17" +xxhash-rust = { version = "0.8.15", features = ["xxh3"] } zstd = "0.13.3" diff --git a/src/lib.rs b/src/lib.rs index c2cb4e5..0ecdfec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ mod rply; +mod statestream; pub use rply::*; #[cfg(test)] diff --git a/src/rply.rs b/src/rply.rs index 12572d0..72b6e24 100644 --- a/src/rply.rs +++ b/src/rply.rs @@ -1,3 +1,4 @@ +use crate::statestream; use thiserror::Error; #[derive(Debug, Error)] @@ -168,9 +169,74 @@ pub struct ReplayDecoder<'a, R: std::io::BufRead> { pub header: Header, pub initial_state: Vec, pub frame_number: usize, + ss_state: statestream::Ctx, } impl ReplayDecoder<'_, R> { + /// Creates a [`ReplayDecoder`] for the given buffered readable stream. + /// + /// # Errors + /// [`ReplayError::IO`]: Some issue with the read stream, e.g. insufficient length or unexpected end + /// [`ReplayError::Magic`]: Invalid magic number at beginning of file + /// [`ReplayError::Version`]: Version identifier not recognized by parser + /// [`ReplayError::Compression`]: Unsupported compression scheme for checkpoints + pub fn new(rply: &mut R) -> Result> { + use byteorder::{LittleEndian, ReadBytesExt}; + let magic = rply.read_u32::()?; + if magic != MAGIC { + return Err(ReplayError::Magic(magic)); + } + let version = rply.read_u32::()?; + if version > 2 { + return Err(ReplayError::Version(version)); + } + let content_crc = rply.read_u32::()?; + let initial_state_size = rply.read_u32::()?; + let identifier = rply.read_u64::()?; + let base = HeaderBase { + version, + content_crc, + initial_state_size, + identifier, + }; + let mut initial_state = vec![0; initial_state_size as usize]; + if version < 2 { + rply.read_exact(initial_state.as_mut_slice())?; + return Ok(ReplayDecoder { + header: Header::V0V1(base), + rply, + initial_state, + frame_number: 0, + ss_state: statestream::Ctx::new(1, 1), + }); + } + let frame_count = rply.read_u32::()?; + let block_size = rply.read_u32::()?; + let superblock_size = rply.read_u32::()?; + let cp_config = rply.read_u32::()?; + let checkpoint_commit_interval = (cp_config >> 24) as u8; + let checkpoint_commit_threshold = ((cp_config >> 16) & 0xFF) as u8; + let checkpoint_compression = Compression::try_from(((cp_config >> 8) & 0xFF) as u8) + .map_err(ReplayError::Compression)?; + rply.read_exact(initial_state.as_mut_slice())?; + // TODO: decode if version is 2 + Ok(ReplayDecoder { + rply, + initial_state, + header: Header::V2(HeaderV2 { + base, + frame_count, + block_size, + superblock_size, + checkpoint_commit_interval, + checkpoint_commit_threshold, + checkpoint_compression, + }), + frame_number: 0, + ss_state: statestream::Ctx::new(block_size, superblock_size), + }) + } + /// Reads a single frame at the current decoder position. /// # Errors /// [`ReplayError::IO`]: Unexpected end of stream or other I/O error @@ -238,75 +304,83 @@ impl ReplayDecoder<'_, R> { FrameToken::Regular => { frame.checkpoint_compression = Compression::None; frame.checkpoint_encoding = Encoding::Raw; - frame.checkpoint_raw_bytes.clear(); - frame.checkpoint_uncompressed_raw_bytes.clear(); - frame.checkpoint_uncompressed_unencoded_bytes.clear(); + frame.checkpoint_bytes.clear(); } FrameToken::Checkpoint => { frame.checkpoint_compression = Compression::None; frame.checkpoint_encoding = Encoding::Raw; let cp_size = usize::try_from(rply.read_u64::()?) .map_err(ReplayError::CheckpointTooBig)?; - frame.checkpoint_raw_bytes.resize(cp_size, 0); - rply.read_exact(frame.checkpoint_raw_bytes.as_mut_slice())?; + frame.checkpoint_bytes.resize(cp_size, 0); + rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?; } FrameToken::Checkpoint2 => { - // read a 1 byte compression - let compression = - Compression::try_from(rply.read_u8()?).map_err(ReplayError::Compression)?; - // read a 1 byte encoding - let encoding = - Encoding::try_from(rply.read_u8()?).map_err(ReplayError::Encoding)?; - // read a 4 byte uncompressed unencoded size - let uc_ue_size = rply.read_u32::()? as usize; - // read a 4 byte uncompressed encoded size - let uc_enc_size = rply.read_u32::()? as usize; - // read a 4 byte compressed encoded size - let comp_enc_size = rply.read_u32::()? as usize; - // read the compressed encoded data (todo, make a reader instead) - frame.checkpoint_raw_bytes.resize(comp_enc_size, 0); - rply.read_exact(frame.checkpoint_raw_bytes.as_mut_slice())?; - // maybe decompress - match compression { - Compression::None => {} - Compression::Zlib => { - use flate2::bufread::ZlibDecoder; - frame - .checkpoint_uncompressed_raw_bytes - .resize(uc_enc_size, 0); - let mut decoder = ZlibDecoder::new(rply); - std::io::copy( - &mut decoder, - &mut std::io::Cursor::new( - frame.checkpoint_uncompressed_raw_bytes.as_mut_slice(), - ), - )?; - } - Compression::Zstd => { - use zstd::Decoder; - frame - .checkpoint_uncompressed_raw_bytes - .resize(uc_enc_size, 0); - let mut decoder = Decoder::with_buffer(rply)?.single_frame(); - std::io::copy( - &mut decoder, - &mut std::io::Cursor::new( - frame.checkpoint_uncompressed_raw_bytes.as_mut_slice(), - ), - )?; - decoder.finish(); - } - } - // maybe decode - match encoding { - Encoding::Raw => {} - Encoding::Statestream => { - frame - .checkpoint_uncompressed_unencoded_bytes - .resize(uc_ue_size, 0); - // statestream_decode(frame.checkpoint_decompressed_data().unwrap(), &mut frame.checkpoint_uncompressed_unencoded_bytes); - } - } + self.decode_checkpoint(frame)?; + } + } + Ok(()) + } + + fn decode_checkpoint(&mut self, frame: &mut Frame) -> Result<()> { + use byteorder::{LittleEndian, ReadBytesExt}; + let rply = &mut *self.rply; + // read a 1 byte compression code + let compression = + Compression::try_from(rply.read_u8()?).map_err(ReplayError::Compression)?; + // read a 1 byte encoding code + let encoding = Encoding::try_from(rply.read_u8()?).map_err(ReplayError::Encoding)?; + // read a 4 byte uncompressed unencoded size + let uc_ue_size = rply.read_u32::()? as usize; + // read a 4 byte uncompressed encoded size + let uc_enc_size = rply.read_u32::()? as usize; + // read a 4 byte compressed encoded size + let comp_enc_size = rply.read_u32::()? as usize; + frame.checkpoint_bytes.resize(uc_ue_size, 0); + // maybe decompress + match (compression, encoding) { + (Compression::None, Encoding::Raw) => { + rply.read_exact(frame.checkpoint_bytes.as_mut_slice())?; + } + (Compression::None, Encoding::Statestream) => { + let mut ss_decoder = statestream::Decoder::new(rply, &mut self.ss_state); + std::io::copy( + &mut ss_decoder, + &mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), + )?; + } + (Compression::Zlib, Encoding::Raw) => { + use flate2::bufread::ZlibDecoder; + let mut decoder = ZlibDecoder::new(rply); + std::io::copy( + &mut decoder, + &mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), + )?; + } + (Compression::Zlib, Encoding::Statestream) => { + use flate2::bufread::ZlibDecoder; + let mut decoder = ZlibDecoder::new(rply); + let mut ss_decoder = statestream::Decoder::new(&mut decoder, &mut self.ss_state); + std::io::copy( + &mut ss_decoder, + &mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), + )?; + } + (Compression::Zstd, Encoding::Raw) => { + use zstd::Decoder; + let mut decoder = Decoder::with_buffer(rply)?.single_frame(); + std::io::copy( + &mut decoder, + &mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), + )?; + } + (Compression::Zstd, Encoding::Statestream) => { + use zstd::Decoder; + let mut decoder = Decoder::with_buffer(rply)?.single_frame(); + let mut ss_decoder = statestream::Decoder::new(&mut decoder, &mut self.ss_state); + std::io::copy( + &mut ss_decoder, + &mut std::io::Cursor::new(frame.checkpoint_bytes.as_mut_slice()), + )?; } } Ok(()) @@ -316,64 +390,11 @@ impl ReplayDecoder<'_, R> { /// Creates a [`ReplayDecoder`] for the given buffered readable stream. /// /// # Errors -/// [`ReplayError::IO`]: Some issue with the read stream, e.g. insufficient length or unexpected end -/// [`ReplayError::Magic`]: Invalid magic number at beginning of file -/// [`ReplayError::Version`]: Version identifier not recognized by parser -/// [`ReplayError::Compression`]: Unsupported compression scheme for checkpoints +/// See [`ReplayDecoder::new`]. pub fn decode(rply: &mut R) -> Result> { - use byteorder::{LittleEndian, ReadBytesExt}; - let magic = rply.read_u32::()?; - if magic != MAGIC { - return Err(ReplayError::Magic(magic)); - } - let version = rply.read_u32::()?; - if version > 2 { - return Err(ReplayError::Version(version)); - } - let content_crc = rply.read_u32::()?; - let initial_state_size = rply.read_u32::()?; - let identifier = rply.read_u64::()?; - let base = HeaderBase { - version, - content_crc, - initial_state_size, - identifier, - }; - let mut initial_state = vec![0; initial_state_size as usize]; - if version < 2 { - rply.read_exact(initial_state.as_mut_slice())?; - return Ok(ReplayDecoder { - header: Header::V0V1(base), - rply, - initial_state, - frame_number: 0, - }); - } - let frame_count = rply.read_u32::()?; - let block_size = rply.read_u32::()?; - let superblock_size = rply.read_u32::()?; - let cp_config = rply.read_u32::()?; - let checkpoint_commit_interval = (cp_config >> 24) as u8; - let checkpoint_commit_threshold = ((cp_config >> 16) & 0xFF) as u8; - let checkpoint_compression = - Compression::try_from(((cp_config >> 8) & 0xFF) as u8).map_err(ReplayError::Compression)?; - rply.read_exact(initial_state.as_mut_slice())?; - // TODO: decode if version is 2 - Ok(ReplayDecoder { - rply, - initial_state, - header: Header::V2(HeaderV2 { - base, - frame_count, - block_size, - superblock_size, - checkpoint_commit_interval, - checkpoint_commit_threshold, - checkpoint_compression, - }), - frame_number: 0, - }) + ReplayDecoder::new(rply) } + impl Header { #[must_use] pub fn version(&self) -> u32 { @@ -403,47 +424,17 @@ pub struct InputData { pub struct Frame { pub key_events: Vec, pub input_events: Vec, - checkpoint_raw_bytes: Vec, - checkpoint_uncompressed_raw_bytes: Vec, - checkpoint_uncompressed_unencoded_bytes: Vec, + checkpoint_bytes: Vec, pub checkpoint_compression: Compression, pub checkpoint_encoding: Encoding, } -impl Frame { - #[must_use] - pub fn checkpoint_decompressed_data(&self) -> Option<&[u8]> { - if self.checkpoint_raw_bytes.is_empty() { - return None; - } - Some(match self.checkpoint_compression { - Compression::None => self.checkpoint_raw_bytes.as_slice(), - _ => self.checkpoint_uncompressed_raw_bytes.as_slice(), - }) - } - #[must_use] - pub fn checkpoint_data(&self) -> Option<&[u8]> { - if self.checkpoint_raw_bytes.is_empty() { - return None; - } - Some( - match (self.checkpoint_compression, self.checkpoint_encoding) { - (Compression::None, Encoding::Raw) => self.checkpoint_raw_bytes.as_slice(), - (_, Encoding::Raw) => self.checkpoint_uncompressed_raw_bytes.as_slice(), - (_, _) => self.checkpoint_uncompressed_unencoded_bytes.as_slice(), - }, - ) - } -} - impl Default for Frame { fn default() -> Self { Self { key_events: Vec::default(), input_events: Vec::default(), - checkpoint_raw_bytes: Vec::default(), - checkpoint_uncompressed_raw_bytes: Vec::default(), - checkpoint_uncompressed_unencoded_bytes: Vec::default(), + checkpoint_bytes: Vec::default(), checkpoint_compression: Compression::None, checkpoint_encoding: Encoding::Raw, } diff --git a/src/statestream.rs b/src/statestream.rs new file mode 100644 index 0000000..53f1349 --- /dev/null +++ b/src/statestream.rs @@ -0,0 +1,56 @@ +mod blockindex; +use blockindex::BlockIndex; + +pub(crate) struct Ctx { + block_size: u32, + superblock_size: u32, + last_state: Vec, + last_superseq: Vec, + block_index: BlockIndex, + superblock_index: BlockIndex, +} + +impl Ctx { + pub fn new(block_size: u32, superblock_size: u32) -> Self { + Self { + block_size, + superblock_size, + last_state: vec![], + last_superseq: vec![], + block_index: BlockIndex::new(block_size as usize), + superblock_index: BlockIndex::new(superblock_size as usize), + } + } +} + +pub(crate) struct Decoder<'r, 'c, R: std::io::Read> { + reader: &'r mut R, + ctx: &'c mut Ctx, + finished: bool, +} + +impl<'r, 'c, R: std::io::Read> Decoder<'r, 'c, R> { + pub(crate) fn new(reader: &'r mut R, ctx: &'c mut Ctx) -> Self { + Self { + reader, + ctx, + finished: false, + } + } +} + +impl<'r, 'c, R: std::io::Read> std::io::Read for Decoder<'r, 'c, R> { + /* a slightly degenerate read implementation in that it will keep + * calling read on the inner reader until a complete checkpoint is + * read, then return 0 for subsequent reads */ + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + use rmp::decode as r; + if self.finished { + return Ok(0); + } + + let sz = 0; + todo!(); + Ok(sz) + } +} diff --git a/src/statestream/blockindex.rs b/src/statestream/blockindex.rs new file mode 100644 index 0000000..1a61125 --- /dev/null +++ b/src/statestream/blockindex.rs @@ -0,0 +1,106 @@ +use nohash_hasher::NoHashHasher; +use smallvec::{SmallVec, smallvec}; +use std::{collections::HashMap, hash::BuildHasherDefault}; +use xxhash_rust::xxh3::xxh3_64 as xxh; + +// struct Addition { +// when:u64, // Frame on which some objects were added +// index:u32, // Lowest index added on this frame +// } + +pub(crate) struct BlockIndex< + T: bytemuck::Zeroable + bytemuck::AnyBitPattern + bytemuck::NoUninit + PartialEq, +> { + index: HashMap, BuildHasherDefault>>, + objects: Vec>, + hashes: Vec, + //additions: Vec, + object_size: usize, +} + +pub(crate) struct Insertion { + index: u32, + is_new: bool, +} + +fn hash(val: &[T]) -> u64 { + xxh(bytemuck::cast_slice(val)) +} + +impl + BlockIndex +{ + pub fn new(object_size: usize) -> Self { + let mut index = HashMap::with_capacity_and_hasher(4096, BuildHasherDefault::default()); + let zeros = (vec![T::zeroed(); object_size]).into_boxed_slice(); + let zero_hash = hash(&zeros); + index.insert(zero_hash, smallvec![0]); + Self { + index, + object_size, + objects: vec![zeros], + hashes: vec![zero_hash], + } + } + pub fn insert(&mut self, obj: &[T], _frame: u64) -> Insertion { + let hash = hash(obj); + match self.index.entry(hash) { + std::collections::hash_map::Entry::Occupied(mut e) => { + if let Some(found) = e + .get() + .iter() + .find(|o| obj == &*self.objects[(**o) as usize]) + { + Insertion { + index: *found, + is_new: false, + } + } else { + let copy = Box::from(obj); + let idx = u32::try_from(self.objects.len()).unwrap(); + self.objects.push(copy); + self.hashes.push(hash); + e.get_mut().push(idx); + Insertion { + index: idx, + is_new: true, + } + } + } + std::collections::hash_map::Entry::Vacant(e) => { + let copy = Box::from(obj); + let idx = u32::try_from(self.objects.len()).unwrap(); + self.objects.push(copy); + self.hashes.push(hash); + e.insert(smallvec![idx]); + Insertion { + index: idx, + is_new: true, + } + } + } + } + pub fn insert_exact(&mut self, idx: u32, obj: Box<[T]>, _frame: u64) -> bool { + if self.objects.len() != idx as usize { + return false; + } + let hash = hash(&obj); + self.index.entry(hash).or_default().push(idx); + self.objects.push(obj); + self.hashes.push(hash); + return true; + } + pub fn get(&self, which: u32) -> &[T] { + &self.objects[which as usize] + } + pub fn clear(&mut self) { + self.index.clear(); + self.objects.truncate(1); + self.hashes.truncate(1); + self.index.insert(self.hashes[0], smallvec![0]); + } + pub fn len(&self) -> usize { + self.objects.len() + } + // remove_after, commit? +}