Skip to content

Commit b9aa89b

Browse files
committed
feat(amp-data-store): introduce crate with unified store interface
Consolidate data storage and caching into single abstraction to simplify architecture and eliminate the cached/uncached store distinction. - Unified `DataStore` combines storage and cache in single type - Integrated parquet metadata cache with memory-aware eviction - Centralized physical table operations and file naming - Single store interface across all services and modules Signed-off-by: Lorenzo Delgado <lorenzo@edgeandnode.com>
1 parent d2f39f0 commit b9aa89b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1114
-1030
lines changed

Cargo.lock

Lines changed: 531 additions & 434 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ members = [
1010
"crates/bin/ampup",
1111
"crates/clients/flight",
1212
"crates/core/common",
13+
"crates/core/data-store",
1314
"crates/core/dataset-store",
1415
"crates/core/datasets-common",
1516
"crates/core/datasets-derived",

crates/bin/ampd/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ console-subscriber = ["dep:console-subscriber"]
1414
snmalloc = ["dep:snmalloc-rs"]
1515

1616
[dependencies]
17+
amp-data-store = { path = "../../core/data-store" }
1718
amp-object-store = { path = "../../core/object-store" }
1819
clap.workspace = true
1920
common = { path = "../../core/common" }

crates/bin/ampd/src/controller_cmd.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::{net::SocketAddr, sync::Arc};
22

33
use amp_config::Config as CommonConfig;
4+
use amp_data_store::DataStore;
45
use amp_dataset_store::{
56
DatasetStore, manifests::DatasetManifestsStore, providers::ProviderConfigsStore,
67
};
78
use amp_object_store::ObjectStoreCreationError;
8-
use common::{BoxError, store::Store};
9+
use common::BoxError;
910
use controller::config::Config;
1011
use monitoring::telemetry::metrics::Meter;
1112

@@ -16,8 +17,12 @@ pub async fn run(config: CommonConfig, meter: Option<Meter>, at: SocketAddr) ->
1617
.await
1718
.map_err(|err| Error::MetadataDbConnection(Box::new(err)))?;
1819

19-
let data_store = Store::new(metadata_db.clone(), config.data_store_url.clone())
20-
.map_err(Error::DataStoreCreation)?;
20+
let data_store = DataStore::new(
21+
metadata_db.clone(),
22+
config.data_store_url.clone(),
23+
config.parquet.cache_size_mb,
24+
)
25+
.map_err(Error::DataStoreCreation)?;
2126

2227
let dataset_store = {
2328
let provider_configs_store = ProviderConfigsStore::new(

crates/bin/ampd/src/server_cmd.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::sync::Arc;
22

33
use amp_config::{Addrs, Config as CommonConfig};
4+
use amp_data_store::DataStore;
45
use amp_dataset_store::{
56
DatasetStore, manifests::DatasetManifestsStore, providers::ProviderConfigsStore,
67
};
78
use amp_object_store::ObjectStoreCreationError;
8-
use common::{BoxError, store::Store};
9+
use common::BoxError;
910
use monitoring::telemetry::metrics::Meter;
1011
use server::config::Config as ServerConfig;
1112

@@ -21,8 +22,12 @@ pub async fn run(
2122
.await
2223
.map_err(|err| Error::MetadataDbConnection(Box::new(err)))?;
2324

24-
let data_store = Store::new(metadata_db.clone(), config.data_store_url.clone())
25-
.map_err(Error::DataStoreCreation)?;
25+
let data_store = DataStore::new(
26+
metadata_db.clone(),
27+
config.data_store_url.clone(),
28+
config.parquet.cache_size_mb,
29+
)
30+
.map_err(Error::DataStoreCreation)?;
2631

2732
let dataset_store = {
2833
let provider_configs_store = ProviderConfigsStore::new(
@@ -146,6 +151,5 @@ pub fn config_from_common(config: &CommonConfig) -> ServerConfig {
146151
max_mem_mb: config.max_mem_mb,
147152
query_max_mem_mb: config.query_max_mem_mb,
148153
spill_location: config.spill_location.clone(),
149-
parquet_cache_size_mb: config.parquet.cache_size_mb,
150154
}
151155
}

crates/bin/ampd/src/solo_cmd.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::{future::Future, pin::Pin, sync::Arc};
22

33
use amp_config::Config as CommonConfig;
4+
use amp_data_store::DataStore;
45
use amp_dataset_store::{
56
DatasetStore, manifests::DatasetManifestsStore, providers::ProviderConfigsStore,
67
};
78
use amp_object_store::ObjectStoreCreationError;
8-
use common::{BoxError, store::Store};
9+
use common::BoxError;
910
use monitoring::telemetry::metrics::Meter;
1011

1112
use crate::{controller_cmd, server_cmd, worker_cmd};
@@ -24,8 +25,12 @@ pub async fn run(
2425
.await
2526
.map_err(|err| Error::MetadataDbConnection(Box::new(err)))?;
2627

27-
let data_store = Store::new(metadata_db.clone(), config.data_store_url.clone())
28-
.map_err(Error::DataStoreCreation)?;
28+
let data_store = DataStore::new(
29+
metadata_db.clone(),
30+
config.data_store_url.clone(),
31+
config.parquet.cache_size_mb,
32+
)
33+
.map_err(Error::DataStoreCreation)?;
2934

3035
let dataset_store = {
3136
let provider_configs_store = ProviderConfigsStore::new(

crates/bin/ampd/src/worker_cmd.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use amp_config::Config;
2+
use amp_data_store::DataStore;
23
use amp_dataset_store::{
34
DatasetStore, manifests::DatasetManifestsStore, providers::ProviderConfigsStore,
45
};
56
use amp_object_store::ObjectStoreCreationError;
6-
use common::store::Store;
77
use monitoring::telemetry::metrics::Meter;
88
use worker::node_id::NodeId;
99

@@ -13,8 +13,12 @@ pub async fn run(config: Config, meter: Option<Meter>, node_id: NodeId) -> Resul
1313
.await
1414
.map_err(|err| Error::MetadataDbConnection(Box::new(err)))?;
1515

16-
let data_store = Store::new(metadata_db.clone(), config.data_store_url.clone())
17-
.map_err(Error::DataStoreCreation)?;
16+
let data_store = DataStore::new(
17+
metadata_db.clone(),
18+
config.data_store_url.clone(),
19+
config.parquet.cache_size_mb,
20+
)
21+
.map_err(Error::DataStoreCreation)?;
1822

1923
let dataset_store = {
2024
let provider_configs_store = ProviderConfigsStore::new(

crates/config/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ impl Config {
314314
self.max_mem_mb,
315315
self.query_max_mem_mb,
316316
&self.spill_location,
317-
self.parquet.cache_size_mb,
318317
)
319318
}
320319

crates/core/common/Cargo.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ license-file.workspace = true
66

77
[dependencies]
88
alloy.workspace = true
9-
amp-object-store = { path = "../object-store" }
9+
amp-data-store = { path = "../data-store" }
1010
async-stream.workspace = true
1111
async-trait.workspace = true
1212
axum.workspace = true
@@ -17,9 +17,6 @@ datafusion.workspace = true
1717
datafusion-datasource.workspace = true
1818
datafusion-tracing.workspace = true
1919
datasets-common = { path = "../datasets-common" }
20-
figment.workspace = true
21-
foyer = "0.21"
22-
fs-err.workspace = true
2320
futures.workspace = true
2421
governor.workspace = true
2522
indoc.workspace = true

0 commit comments

Comments
 (0)