Skip to content

Commit 3d2bf5d

Browse files
committed
apply review
1 parent c3f951b commit 3d2bf5d

File tree

6 files changed

+48
-14
lines changed

6 files changed

+48
-14
lines changed

crates/codec/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ derive_more = { version = "2.0", default-features = false }
2020
eyre = { workspace = true, optional = true }
2121
thiserror = { version = "2.0", default-features = false }
2222
zstd = { version = "=0.13.3", optional = true }
23-
ruzstd = "0.8"
23+
ruzstd = { version = "0.8", optional = true }
2424

2525
[dev-dependencies]
2626
eyre.workspace = true
@@ -30,3 +30,4 @@ serde_json = "1.0"
3030
default = ["zstd"]
3131
test-utils = ["dep:eyre", "scroll-l1/test-utils"]
3232
zstd = ["dep:zstd"]
33+
ruzstd = ["dep:ruzstd"]

crates/codec/src/decoding/v2/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub fn decode_v2(calldata: &[u8], blob: &[u8]) -> Result<Batch, DecodingError> {
3535

3636
// get blob iterator and collect, skipping unused bytes.
3737
let compressed_heap_blob = BlobSliceIter::from_blob_slice(blob).copied().collect::<Vec<_>>();
38-
let uncompressed_heap_blob = decompress_blob_data(&compressed_heap_blob);
38+
let uncompressed_heap_blob = decompress_blob_data(&compressed_heap_blob)?;
3939
let buf = &mut (uncompressed_heap_blob.as_slice());
4040

4141
// check buf len.

crates/codec/src/decoding/v2/zstd.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,46 @@
22
33
use std::io::Read;
44

5-
#[cfg(feature = "zstd")]
6-
use ruzstd as _;
5+
#[cfg(not(any(feature = "zstd", feature = "ruzstd")))]
6+
compile_error!("You must enable exactly one of the `zstd` or `ruzstd` features");
7+
#[cfg(all(feature = "zstd", feature = "ruzstd"))]
8+
compile_error!("Features `zstd` and `ruzstd` are mutually exclusive");
79

810
/// The ZSTD magic number for zstd compressed data header.
911
const ZSTD_MAGIC_NUMBER: [u8; 4] = [0x28, 0xb5, 0x2f, 0xfd];
1012

13+
/// Result type for Zstd operations.
14+
type Result<T> = std::result::Result<T, ZstdError>;
15+
16+
/// Zstd error type.
17+
#[derive(Debug)]
18+
pub struct ZstdError(Box<dyn std::error::Error + Send + Sync>);
19+
20+
impl std::fmt::Display for ZstdError {
21+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22+
write!(f, "ZstdError: {}", self.0)
23+
}
24+
}
25+
26+
impl std::error::Error for ZstdError {}
27+
28+
impl ZstdError {
29+
/// Consumes the error and returns the inner error.
30+
pub fn into_inner(self) -> Box<dyn std::error::Error + Send + Sync> {
31+
self.0
32+
}
33+
}
34+
1135
/// Uncompress the provided data.
1236
#[cfg(feature = "zstd")]
13-
pub fn decompress_blob_data(data: &[u8]) -> Vec<u8> {
37+
pub fn decompress_blob_data(data: &[u8]) -> Result<Vec<u8>> {
1438
use zstd::Decoder;
1539
let mut header_data = ZSTD_MAGIC_NUMBER.to_vec();
1640

1741
header_data.extend_from_slice(data);
1842

1943
// init decoder and owned output data.
20-
let mut decoder = Decoder::new(header_data.as_slice()).unwrap();
44+
let mut decoder = Decoder::new(header_data.as_slice()).map_err(|e| ZstdError(Box::new(e)))?;
2145
// heuristic: use data length as the allocated output capacity.
2246
let mut output = Vec::with_capacity(header_data.len());
2347

@@ -33,22 +57,23 @@ pub fn decompress_blob_data(data: &[u8]) -> Vec<u8> {
3357
output.extend_from_slice(&dst[..size]);
3458
}
3559

36-
output
60+
Ok(output)
3761
}
3862

3963
/// Uncompress the provided data.
40-
#[cfg(not(feature = "zstd"))]
41-
pub fn decompress_blob_data(data: &[u8]) -> Vec<u8> {
64+
#[cfg(feature = "ruzstd")]
65+
pub fn decompress_blob_data(data: &[u8]) -> Result<Vec<u8>> {
4266
use ruzstd::decoding::StreamingDecoder;
4367

4468
let mut header_data = ZSTD_MAGIC_NUMBER.to_vec();
4569
header_data.extend_from_slice(data);
4670

4771
// init decoder and owned output data.
48-
let mut decoder = StreamingDecoder::new(header_data.as_slice()).unwrap();
72+
let mut decoder =
73+
StreamingDecoder::new(header_data.as_slice()).map_err(|e| ZstdError(Box::new(e)))?;
4974
// heuristic: use data length as the allocated output capacity.
5075
let mut output = Vec::with_capacity(header_data.len());
51-
decoder.read_to_end(&mut output).unwrap();
76+
decoder.read_to_end(&mut output).map_err(|e| ZstdError(Box::new(e)))?;
5277

53-
output
78+
Ok(output)
5479
}

crates/codec/src/decoding/v4/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub fn decode_v4(calldata: &[u8], blob: &[u8]) -> Result<Batch, DecodingError> {
3131
debug_assert!(is_compressed == 1 || is_compressed == 0, "incorrect compressed byte flag");
3232

3333
let buf = if is_compressed == 1 {
34-
heap_blob = decompress_blob_data(&heap_blob[1..]);
34+
heap_blob = decompress_blob_data(&heap_blob[1..])?;
3535
&mut heap_blob.as_slice()
3636
} else {
3737
&mut (&heap_blob[1..])

crates/codec/src/decoding/v7/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub fn decode_v7(blob: &[u8]) -> Result<Batch, DecodingError> {
4949

5050
// uncompress if necessary.
5151
let buf = if is_compressed == 1 {
52-
heap_blob = decompress_blob_data(buf);
52+
heap_blob = decompress_blob_data(buf)?;
5353
&mut heap_blob.as_slice()
5454
} else {
5555
buf

crates/codec/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub enum DecodingError {
3030
InvalidCommitBatchCall(#[from] InvalidCommitBatchCall),
3131
#[error("end of file")]
3232
Eof,
33+
#[error("zstd decompression error occurred: {0}")]
34+
ZstdDecompression(Box<dyn std::error::Error + Send + Sync + 'static>),
3335
#[error("decoding error occurred: {0}")]
3436
Other(Box<dyn std::error::Error + Send + Sync + 'static>),
3537
}
@@ -46,3 +48,9 @@ impl From<String> for DecodingError {
4648
DecodingError::Other(value.into())
4749
}
4850
}
51+
52+
impl From<crate::decoding::v2::zstd::ZstdError> for DecodingError {
53+
fn from(e: crate::decoding::v2::zstd::ZstdError) -> Self {
54+
DecodingError::ZstdDecompression(e.into_inner())
55+
}
56+
}

0 commit comments

Comments
 (0)