Skip to content

Commit 2d7f148

Browse files
CopilotlinhrCopilotlonless9
authored
chore: update DataFusion dependencies to 53.0.0 (#1442)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: linhr <5601366+linhr@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: lonless9 <167735979+lonless9@users.noreply.github.com> Co-authored-by: XL Liang <xiaolong@lakesail.com>
1 parent d56bfca commit 2d7f148

File tree

107 files changed

+3753
-3948
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+3753
-3948
lines changed

Cargo.lock

Lines changed: 158 additions & 167 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -142,33 +142,33 @@ prost-types = "0.14"
142142
# The `axum` version must match the one used in `tonic` (replace `RELEASE` with the release we are using):
143143
# https://github.com/hyperium/tonic/blob/vRELEASE/tonic/Cargo.toml
144144
axum = "0.8.8"
145-
datafusion = { version = "52.1.0", features = ["serde", "avro", "sql"] }
146-
datafusion-common = { version = "52.1.0", features = ["object_store", "avro"] }
147-
datafusion-datasource = { version = "52.1.0" }
148-
datafusion-expr = { version = "52.1.0" }
149-
datafusion-expr-common = { version = "52.1.0" }
150-
datafusion-proto = { version = "52.1.0" }
151-
datafusion-functions = { version = "52.1.0" }
152-
datafusion-functions-nested = { version = "52.1.0" }
153-
datafusion-physical-expr = { version = "52.1.0" }
154-
datafusion-session = { version = "52.1.0" }
155-
datafusion-spark = { version = "52.1.0" }
145+
datafusion = { version = "53.0.0", features = ["serde", "avro", "sql"] }
146+
datafusion-common = { version = "53.0.0", features = ["object_store", "avro"] }
147+
datafusion-datasource = { version = "53.0.0" }
148+
datafusion-expr = { version = "53.0.0" }
149+
datafusion-expr-common = { version = "53.0.0" }
150+
datafusion-proto = { version = "53.0.0" }
151+
datafusion-functions = { version = "53.0.0" }
152+
datafusion-functions-nested = { version = "53.0.0" }
153+
datafusion-physical-expr = { version = "53.0.0" }
154+
datafusion-session = { version = "53.0.0" }
155+
datafusion-spark = { version = "53.0.0", features = ["core"] }
156156
# The `pyo3` version must match the one used in `arrow-pyarrow` (replace `RELEASE` with the release we are using):
157157
# https://github.com/apache/arrow-rs/blob/RELEASE/arrow-pyarrow/Cargo.toml
158-
pyo3 = { version = "0.26.0", features = ["serde"] }
158+
pyo3 = { version = "0.28.0", features = ["serde"] }
159159
# Jiter has a dependency on pyo3, which needs to match the version used in Sail.
160-
# https://github.com/pydantic/jiter/blob/v0.11.1/Cargo.toml
161-
jiter = { version = "0.11.1" }
162-
arrow = { version = "57.1.0", features = ["chrono-tz"] }
163-
arrow-buffer = { version = "57.1.0" }
164-
arrow-schema = { version = "57.1.0", features = ["serde"] }
165-
arrow-flight = { version = "57.1.0" }
166-
arrow-pyarrow = { version = "57.1.0" }
167-
parquet = { version = "57.3.0" }
168-
serde_arrow = { version = "0.13.7", features = ["arrow-57"] }
160+
# https://github.com/pydantic/jiter/blob/v0.13.0/Cargo.toml
161+
jiter = { version = "0.13.0" }
162+
arrow = { version = "58.0.0", features = ["chrono-tz"] }
163+
arrow-buffer = { version = "58.0.0" }
164+
arrow-schema = { version = "58.0.0", features = ["serde"] }
165+
arrow-flight = { version = "58.0.0" }
166+
arrow-pyarrow = { version = "58.0.0" }
167+
parquet = { version = "58.0.0" }
168+
serde_arrow = { version = "0.14.0", features = ["arrow-58"] }
169169
# The `object_store` version must match the one used in DataFusion.
170-
object_store = { version = "0.12.4", features = ["aws", "gcp", "azure", "http"] }
171-
hdfs-native-object-store = "0.15.0"
170+
object_store = { version = "0.13.2", features = ["aws", "gcp", "azure", "http"] }
171+
hdfs-native-object-store = "0.16.0"
172172

173173
######
174174
# This is the end of the manually managed dependencies.

crates/sail-cache/src/file_listing_cache.rs

Lines changed: 13 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@ use std::sync::Arc;
44
use std::time::Duration;
55

66
use datafusion::common::{Result as DataFusionResult, TableReference};
7-
use datafusion::execution::cache::cache_manager::ListFilesCache;
7+
use datafusion::execution::cache::cache_manager::{CachedFileList, ListFilesCache};
88
use datafusion::execution::cache::{CacheAccessor, ListFilesEntry, TableScopedPath};
99
use log::debug;
1010
use moka::sync::Cache;
11-
use object_store::path::Path;
1211
use object_store::ObjectMeta;
1312

1413
pub struct MokaFileListingCache {
15-
objects: Cache<TableScopedPath, Arc<Vec<ObjectMeta>>>,
14+
objects: Cache<TableScopedPath, CachedFileList>,
1615
ttl: Option<Duration>,
1716
max_entries: Option<u64>,
1817
}
@@ -58,63 +57,17 @@ fn meta_heap_bytes(object_meta: &ObjectMeta) -> usize {
5857
size
5958
}
6059

61-
impl CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>> for MokaFileListingCache {
62-
type Extra = Option<Path>;
63-
64-
fn get(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
65-
self.get_with_extra(k, &None)
60+
impl CacheAccessor<TableScopedPath, CachedFileList> for MokaFileListingCache {
61+
fn get(&self, k: &TableScopedPath) -> Option<CachedFileList> {
62+
self.objects.get(k)
6663
}
6764

68-
fn get_with_extra(
69-
&self,
70-
k: &TableScopedPath,
71-
prefix: &Self::Extra,
72-
) -> Option<Arc<Vec<ObjectMeta>>> {
73-
let objects = self.objects.get(k)?;
74-
75-
let Some(prefix) = prefix else {
76-
return Some(objects);
77-
};
78-
79-
// Build full prefix: table_base/prefix
80-
let table_base = &k.path;
81-
let mut parts: Vec<_> = table_base.parts().collect();
82-
parts.extend(prefix.parts());
83-
let full_prefix = Path::from_iter(parts);
84-
let full_prefix_str = full_prefix.as_ref();
85-
86-
let filtered = objects
87-
.iter()
88-
.filter(|meta| meta.location.as_ref().starts_with(full_prefix_str))
89-
.cloned()
90-
.collect::<Vec<_>>();
91-
92-
if filtered.is_empty() {
93-
None
94-
} else {
95-
Some(Arc::new(filtered))
96-
}
97-
}
98-
99-
fn put(
100-
&self,
101-
key: &TableScopedPath,
102-
value: Arc<Vec<ObjectMeta>>,
103-
) -> Option<Arc<Vec<ObjectMeta>>> {
65+
fn put(&self, key: &TableScopedPath, value: CachedFileList) -> Option<CachedFileList> {
10466
self.objects.insert(key.clone(), value);
10567
None
10668
}
10769

108-
fn put_with_extra(
109-
&self,
110-
key: &TableScopedPath,
111-
value: Arc<Vec<ObjectMeta>>,
112-
_e: &Self::Extra,
113-
) -> Option<Arc<Vec<ObjectMeta>>> {
114-
self.put(key, value)
115-
}
116-
117-
fn remove(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
70+
fn remove(&self, k: &TableScopedPath) -> Option<CachedFileList> {
11871
self.objects.remove(k)
11972
}
12073

@@ -157,14 +110,14 @@ impl ListFilesCache for MokaFileListingCache {
157110
fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
158111
self.objects
159112
.iter()
160-
.map(|(table_scoped_path, metas)| {
161-
let metas = Arc::clone(&metas);
113+
.map(|(table_scoped_path, cached)| {
114+
let metas = Arc::clone(&cached.files);
162115
let size_bytes = (metas.capacity() * size_of::<ObjectMeta>())
163116
+ metas.iter().map(meta_heap_bytes).sum::<usize>();
164117
(
165118
(*table_scoped_path).clone(),
166119
ListFilesEntry {
167-
metas,
120+
metas: cached.clone(),
168121
size_bytes,
169122
// moka handles expiration; we don't have per-entry expiration time
170123
expires: None,
@@ -200,7 +153,7 @@ mod tests {
200153
#[test]
201154
fn test_file_listing_cache() {
202155
let meta = ObjectMeta {
203-
location: Path::from("test"),
156+
location: object_store::path::Path::from("test"),
204157
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
205158
.unwrap()
206159
.into(),
@@ -216,9 +169,9 @@ mod tests {
216169
};
217170
assert!(cache.get(&key).is_none());
218171

219-
cache.put(&key, vec![meta.clone()].into());
172+
cache.put(&key, CachedFileList::new(vec![meta.clone()]));
220173
assert_eq!(
221-
cache.get(&key).unwrap().first().unwrap().clone(),
174+
cache.get(&key).unwrap().files.first().unwrap().clone(),
222175
meta.clone()
223176
);
224177
}

crates/sail-cache/src/file_metadata_cache.rs

Lines changed: 34 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
11
use std::collections::HashMap;
2-
use std::sync::Arc;
32
use std::time::Duration;
43

54
use datafusion::execution::cache::cache_manager::{
6-
FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
5+
CachedFileMetadataEntry, FileMetadataCache, FileMetadataCacheEntry,
76
};
87
use datafusion::execution::cache::CacheAccessor;
98
use log::debug;
109
use moka::policy::EvictionPolicy;
1110
use moka::sync::Cache;
1211
use object_store::path::Path;
13-
use object_store::ObjectMeta;
1412

1513
pub struct MokaFileMetadataCache {
1614
size_limit: Option<u64>,
17-
metadata: Cache<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
15+
metadata: Cache<Path, CachedFileMetadataEntry>,
1816
}
1917

2018
impl MokaFileMetadataCache {
@@ -34,11 +32,9 @@ impl MokaFileMetadataCache {
3432
Self::NAME
3533
);
3634
builder = builder
37-
.weigher(
38-
|_key: &Path, (_, meta): &(ObjectMeta, Arc<dyn FileMetadata>)| -> u32 {
39-
meta.memory_size() as u32
40-
},
41-
)
35+
.weigher(|_key: &Path, entry: &CachedFileMetadataEntry| -> u32 {
36+
entry.file_metadata.memory_size() as u32
37+
})
4238
.max_capacity(size_limit);
4339
} else {
4440
debug!("No size limit set for {}", Self::NAME);
@@ -65,67 +61,38 @@ impl FileMetadataCache for MokaFileMetadataCache {
6561
fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry> {
6662
self.metadata
6763
.iter()
68-
.map(|(path, (object_meta, meta))| {
64+
.map(|(path, entry)| {
6965
(
7066
path.as_ref().clone(),
7167
FileMetadataCacheEntry {
72-
object_meta,
73-
size_bytes: meta.memory_size(),
68+
object_meta: entry.meta.clone(),
69+
size_bytes: entry.file_metadata.memory_size(),
7470
// TODO: get hits from the cache
7571
hits: 0,
76-
extra: meta.extra_info(),
72+
extra: entry.file_metadata.extra_info(),
7773
},
7874
)
7975
})
8076
.collect()
8177
}
8278
}
8379

84-
impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for MokaFileMetadataCache {
85-
type Extra = ObjectMeta;
86-
87-
fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
88-
self.metadata
89-
.get(&k.location)
90-
.and_then(|(extra, metadata)| {
91-
if extra.size == k.size && extra.last_modified == k.last_modified {
92-
Some(Arc::clone(&metadata))
93-
} else {
94-
None
95-
}
96-
})
97-
}
98-
99-
fn get_with_extra(&self, k: &ObjectMeta, _e: &Self::Extra) -> Option<Arc<dyn FileMetadata>> {
100-
self.get(k)
80+
impl CacheAccessor<Path, CachedFileMetadataEntry> for MokaFileMetadataCache {
81+
fn get(&self, k: &Path) -> Option<CachedFileMetadataEntry> {
82+
self.metadata.get(k)
10183
}
10284

103-
fn put(&self, key: &ObjectMeta, value: Arc<dyn FileMetadata>) -> Option<Arc<dyn FileMetadata>> {
104-
self.metadata
105-
.insert(key.location.clone(), (key.clone(), value));
85+
fn put(&self, key: &Path, value: CachedFileMetadataEntry) -> Option<CachedFileMetadataEntry> {
86+
self.metadata.insert(key.clone(), value);
10687
None
10788
}
10889

109-
fn put_with_extra(
110-
&self,
111-
key: &ObjectMeta,
112-
value: Arc<dyn FileMetadata>,
113-
_e: &Self::Extra,
114-
) -> Option<Arc<dyn FileMetadata>> {
115-
self.put(key, value)
90+
fn remove(&self, k: &Path) -> Option<CachedFileMetadataEntry> {
91+
self.metadata.remove(k)
11692
}
11793

118-
fn remove(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
119-
self.metadata
120-
.remove(&k.location)
121-
.map(|(_, metadata)| metadata)
122-
}
123-
124-
fn contains_key(&self, k: &ObjectMeta) -> bool {
125-
self.metadata
126-
.get(&k.location)
127-
.map(|(extra, _)| extra.size == k.size && extra.last_modified == k.last_modified)
128-
.unwrap_or(false)
94+
fn contains_key(&self, k: &Path) -> bool {
95+
self.metadata.contains_key(k)
12996
}
13097

13198
fn len(&self) -> usize {
@@ -184,47 +151,36 @@ mod tests {
184151
version: None,
185152
};
186153

187-
let metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
154+
let file_metadata: Arc<dyn FileMetadata> = Arc::new(TestFileMetadata {
188155
metadata: "retrieved_metadata".to_owned(),
189156
});
157+
let entry = CachedFileMetadataEntry::new(object_meta.clone(), Arc::clone(&file_metadata));
190158

191159
let cache = MokaFileMetadataCache::new(None, None);
192-
assert!(cache.get(&object_meta).is_none());
160+
assert!(cache.get(&object_meta.location).is_none());
193161

194162
// put
195-
cache.put(&object_meta, metadata);
163+
cache.put(&object_meta.location, entry.clone());
196164

197165
// get and contains of a valid entry
198-
assert!(cache.contains_key(&object_meta));
199-
let value = cache.get(&object_meta);
166+
assert!(cache.contains_key(&object_meta.location));
167+
let value = cache.get(&object_meta.location);
200168
assert!(value.is_some());
201-
let test_file_metadata = Arc::downcast::<TestFileMetadata>(value.unwrap());
202-
assert!(test_file_metadata.is_ok());
203-
assert_eq!(test_file_metadata.unwrap().metadata, "retrieved_metadata");
204-
205-
// file size changed
206-
let mut object_meta2 = object_meta.clone();
207-
object_meta2.size = 2048;
208-
assert!(cache.get(&object_meta2).is_none());
209-
assert!(!cache.contains_key(&object_meta2));
210-
211-
// file last_modified changed
212-
let mut object_meta2 = object_meta.clone();
213-
object_meta2.last_modified = DateTime::parse_from_rfc3339("2025-07-29T13:13:13+00:00")
214-
.unwrap()
215-
.into();
216-
assert!(cache.get(&object_meta2).is_none());
217-
assert!(!cache.contains_key(&object_meta2));
169+
let cached_entry = value.unwrap();
170+
assert!(cached_entry.is_valid_for(&object_meta));
171+
let test_meta = Arc::downcast::<TestFileMetadata>(cached_entry.file_metadata);
172+
assert!(test_meta.is_ok());
173+
assert_eq!(test_meta.unwrap().metadata, "retrieved_metadata");
218174

219175
// different file
220176
let mut object_meta2 = object_meta.clone();
221177
object_meta2.location = Path::from("test2");
222-
assert!(cache.get(&object_meta2).is_none());
223-
assert!(!cache.contains_key(&object_meta2));
178+
assert!(cache.get(&object_meta2.location).is_none());
179+
assert!(!cache.contains_key(&object_meta2.location));
224180

225181
// remove
226-
cache.remove(&object_meta);
227-
assert!(cache.get(&object_meta).is_none());
228-
assert!(!cache.contains_key(&object_meta));
182+
cache.remove(&object_meta.location);
183+
assert!(cache.get(&object_meta.location).is_none());
184+
assert!(!cache.contains_key(&object_meta.location));
229185
}
230186
}

0 commit comments

Comments
 (0)