-
Notifications
You must be signed in to change notification settings - Fork 493
Support FoundationDB for consensus and timestamp oracle #33819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Still losing data somewhere, it's getting closer though: |
|
Tell me if you want any help with setting up FoundationDB in mzcompose. I'm very interested to see benchmark results, as well as the limits test to find new limits (with its artificial limits because of things becoming too slow removed). |
|
I added a testdrive variant that runs against FoundationDB, but it requires a bunch of changes to make Mz compile in docker. One issue is that the FoundationDB client library needs to be dynamically linked, which is a novel problem for us. At the moment, everything in Materialize is statically linked, so we need to make sure that our base images contain the right library for the compile and runtime to be happy. |
29c58b5 to
72eeacb
Compare
|
Feature Benchmark looks pretty similar, ~2% slower on average: https://docs.google.com/spreadsheets/d/1iC-gxHKOgz-kkQDKgsq_aem-Y_8cVMZaeBDqT1sR5JE/edit?gid=2146535294#gid=2146535294 But it doesn't benchmark DDLs mostly. |
|
The feature benchmark had a few small regressions in SmallInserts and Subscribes: https://buildkite.com/materialize/nightly/builds/13768 |
|
When enabling FoundationDB consensus in Parallel Workload with 10x the number of objects (to stress it a bit more), I'm seeing a novel panic: Parallel Workload (0dt deploy) Could it be related to FoundationDB? If not I'll open a separate issue. |
2f90d20 to
9b16322
Compare
cccbb3a to
e470f2e
Compare
5935663 to
672b444
Compare
|
Regarding the formatting change, it's caused by diff --git i/Cargo.lock w/Cargo.lock
index 8442f81338..9b30f4e517 100644
--- i/Cargo.lock
+++ w/Cargo.lock
@@ -1451,6 +1451,8 @@ dependencies = [
"cexpr",
"clang-sys",
"itertools 0.10.5",
+ "log",
+ "prettyplease",
"proc-macro2",
"quote",
"regex",
@@ -6404,6 +6406,7 @@ version = "0.0.0"
dependencies = [
"aho-corasick",
"anyhow",
+ "bindgen",
"bytes",
"bytesize",
"chrono",
diff --git i/src/expr/Cargo.toml w/src/expr/Cargo.toml
index d8818fe866..4054f044e3 100644
--- i/src/expr/Cargo.toml
+++ w/src/expr/Cargo.toml
@@ -18,6 +18,7 @@ name = "window_functions"
harness = false
[dependencies]
+bindgen = { version = "0.72.1", features = ["prettyplease"] }
aho-corasick = "1.1.4"
anyhow = "1.0.100"
bytes = "1.10.1"
diff --git i/src/expr/src/scalar/snapshots/mz_expr__scalar__func__array_array_concat.snap w/src/expr/src/scalar/snapshots/mz_expr__scalar__func__array_array_concat.snap
index e82c0b728a..a36548b107 100644
--- i/src/expr/src/scalar/snapshots/mz_expr__scalar__func__array_array_concat.snap
+++ w/src/expr/src/scalar/snapshots/mz_expr__scalar__func__array_array_concat.snap
@@ -1,6 +1,6 @@
---
source: src/expr/src/scalar/func.rs
-expression: "#[sqlfunc(\n output_type_expr = \"input_type_a.scalar_type.without_modifiers().nullable(true)\",\n is_infix_op = true,\n sqlname = \"||\",\n propagates_nulls = false,\n introduces_nulls = false\n)]\nfn array_array_concat<'a>(\n a: Datum<'a>,\n b: Datum<'a>,\n temp_storage: &'a RowArena,\n) -> Result<Datum<'a>, EvalError> {\n if a.is_null() {\n return Ok(b);\n } else if b.is_null() {\n return Ok(a);\n }\n let a_array = a.unwrap_array();\n let b_array = b.unwrap_array();\n let a_dims: Vec<ArrayDimension> = a_array.dims().into_iter().collect();\n let b_dims: Vec<ArrayDimension> = b_array.dims().into_iter().collect();\n let a_ndims = a_dims.len();\n let b_ndims = b_dims.len();\n if a_ndims == 0 {\n return Ok(b);\n } else if b_ndims == 0 {\n return Ok(a);\n }\n #[allow(clippy::as_conversions)]\n if (a_ndims as isize - b_ndims as isize).abs() > 1 {\n return Err(EvalError::IncompatibleArrayDimensions {\n dims: Some((a_ndims, b_ndims)),\n });\n }\n let mut dims;\n match a_ndims.cmp(&b_ndims) {\n Ordering::Equal => {\n if &a_dims[1..] != &b_dims[1..] {\n return Err(EvalError::IncompatibleArrayDimensions {\n dims: None,\n });\n }\n dims = vec![\n ArrayDimension { lower_bound : a_dims[0].lower_bound, length : a_dims[0]\n .length + b_dims[0].length, }\n ];\n dims.extend(&a_dims[1..]);\n }\n Ordering::Less => {\n if &a_dims[..] != &b_dims[1..] {\n return Err(EvalError::IncompatibleArrayDimensions {\n dims: None,\n });\n }\n dims = vec![\n ArrayDimension { lower_bound : b_dims[0].lower_bound, length : b_dims[0]\n .length + 1, }\n ];\n dims.extend(a_dims);\n }\n Ordering::Greater => {\n if &a_dims[1..] != &b_dims[..] {\n return Err(EvalError::IncompatibleArrayDimensions {\n dims: None,\n });\n }\n dims = vec![\n ArrayDimension { lower_bound : a_dims[0].lower_bound, length : a_dims[0]\n .length + 1, }\n ];\n dims.extend(b_dims);\n }\n }\n let elems = a_array.elements().iter().chain(b_array.elements().iter());\n Ok(temp_storage.try_make_datum(|packer| packer.try_push_array(&dims, elems))?)\n}\n"
+expression: "#[sqlfunc(\n output_type_expr = \"input_type_a.scalar_type.without_modifiers().nullable(true)\",\n is_infix_op = true,\n sqlname = \"||\",\n propagates_nulls = false,\n introduces_nulls = false\n)]\nfn array_array_concat<'a>(\n a: Datum<'a>,\n b: Datum<'a>,\n temp_storage: &'a RowArena,\n) -> Result<Datum<'a>, EvalError> {\n if a.is_null() {\n return Ok(b);\n } else if b.is_null() {\n return Ok(a);\n }\n let a_array = a.unwrap_array();\n let b_array = b.unwrap_array();\n let a_dims: Vec<ArrayDimension> = a_array.dims().into_iter().collect();\n let b_dims: Vec<ArrayDimension> = b_array.dims().into_iter().collect();\n let a_ndims = a_dims.len();\n let b_ndims = b_dims.len();\n if a_ndims == 0 {\n return Ok(b);\n } else if b_ndims == 0 {\n return Ok(a);\n }\n #[allow(clippy::as_conversions)]\n if (a_ndims as isize - b_ndims as isize).abs() > 1 {\n return Err(EvalError::IncompatibleArrayDimensions {\n dims: Some((a_ndims, b_ndims)),\n });\n }\n let mut dims;\n match a_ndims.cmp(&b_ndims) {\n Ordering::Equal => {\n if &a_dims[1..] != &b_dims[1..] {\n return Err(EvalError::IncompatibleArrayDimensions {\n dims: None,\n });\n }\n dims = vec![\n ArrayDimension {\n lower_bound: a_dims[0].lower_bound,\n length: a_dims[0].length + b_dims[0].length,\n },\n ];\n dims.extend(&a_dims[1..]);\n }\n Ordering::Less => {\n if &a_dims[..] != &b_dims[1..] {\n return Err(EvalError::IncompatibleArrayDimensions {\n dims: None,\n });\n }\n dims = vec![\n ArrayDimension {\n lower_bound: b_dims[0].lower_bound,\n length: b_dims[0].length + 1,\n },\n ];\n dims.extend(a_dims);\n }\n Ordering::Greater => {\n if &a_dims[1..] != &b_dims[..] {\n return Err(EvalError::IncompatibleArrayDimensions {\n dims: None,\n });\n }\n dims = vec![\n ArrayDimension {\n lower_bound: a_dims[0].lower_bound,\n length: a_dims[0].length + 1,\n },\n ];\n dims.extend(b_dims);\n }\n }\n let elems = a_array.elements().iter().chain(b_array.elements().iter());\n Ok(temp_storage.try_make_datum(|packer| packer.try_push_array(&dims, elems))?)\n}\n"
---
#[derive(
proptest_derive::Arbitrary,
@@ -97,8 +97,10 @@ fn array_array_concat<'a>(
});
}
dims = vec![
- ArrayDimension { lower_bound : a_dims[0].lower_bound, length : a_dims[0]
- .length + b_dims[0].length, }
+ ArrayDimension {
+ lower_bound: a_dims[0].lower_bound,
+ length: a_dims[0].length + b_dims[0].length,
+ },
];
dims.extend(&a_dims[1..]);
}
@@ -109,8 +111,10 @@ fn array_array_concat<'a>(
});
}
dims = vec![
- ArrayDimension { lower_bound : b_dims[0].lower_bound, length : b_dims[0]
- .length + 1, }
+ ArrayDimension {
+ lower_bound: b_dims[0].lower_bound,
+ length: b_dims[0].length + 1,
+ },
];
dims.extend(a_dims);
}
@@ -121,8 +125,10 @@ fn array_array_concat<'a>(
});
}
dims = vec![
- ArrayDimension { lower_bound : a_dims[0].lower_bound, length : a_dims[0]
- .length + 1, }
+ ArrayDimension {
+ lower_bound: a_dims[0].lower_bound,
+ length: a_dims[0].length + 1,
+ },
];
dims.extend(b_dims);
}
diff --git i/src/expr/src/scalar/snapshots/mz_expr__scalar__func__parse_ident.snap w/src/expr/src/scalar/snapshots/mz_expr__scalar__func__parse_ident.snap
index 431ef1191a..b82fda3bfc 100644
--- i/src/expr/src/scalar/snapshots/mz_expr__scalar__func__parse_ident.snap
+++ w/src/expr/src/scalar/snapshots/mz_expr__scalar__func__parse_ident.snap
@@ -1,6 +1,6 @@
---
source: src/expr/src/scalar/func.rs
-expression: "#[sqlfunc()]\nfn parse_ident<'a>(\n ident: &'a str,\n strict: bool,\n) -> Result<ArrayRustType<Cow<'a, str>>, EvalError> {\n fn is_ident_start(c: char) -> bool {\n matches!(c, 'A'..='Z' | 'a'..='z' | '_' | '\\u{80}'..= char::MAX)\n }\n fn is_ident_cont(c: char) -> bool {\n matches!(c, '0'..='9' | '$') || is_ident_start(c)\n }\n let mut elems = vec![];\n let buf = &mut LexBuf::new(ident);\n let mut after_dot = false;\n buf.take_while(|ch| ch.is_ascii_whitespace());\n loop {\n let mut missing_ident = true;\n let c = buf.next();\n if c == Some('\"') {\n let s = buf.take_while(|ch| !matches!(ch, '\"'));\n if buf.next() != Some('\"') {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: Some(\"String has unclosed double quotes.\".into()),\n });\n }\n elems.push(Cow::Borrowed(s));\n missing_ident = false;\n } else if c.map(is_ident_start).unwrap_or(false) {\n buf.prev();\n let s = buf.take_while(is_ident_cont);\n elems.push(Cow::Owned(s.to_ascii_lowercase()));\n missing_ident = false;\n }\n if missing_ident {\n if c == Some('.') {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: Some(\"No valid identifier before \\\".\\\".\".into()),\n });\n } else if after_dot {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: Some(\"No valid identifier after \\\".\\\".\".into()),\n });\n } else {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: None,\n });\n }\n }\n buf.take_while(|ch| ch.is_ascii_whitespace());\n match buf.next() {\n Some('.') => {\n after_dot = true;\n buf.take_while(|ch| ch.is_ascii_whitespace());\n }\n Some(_) if strict => {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: None,\n });\n }\n _ => break,\n }\n }\n Ok(elems.into())\n}\n"
+expression: "#[sqlfunc()]\nfn parse_ident<'a>(\n ident: &'a str,\n strict: bool,\n) -> Result<ArrayRustType<Cow<'a, str>>, EvalError> {\n fn is_ident_start(c: char) -> bool {\n matches!(c, 'A'..='Z' | 'a'..='z' | '_' | '\\u{80}'..=char::MAX)\n }\n fn is_ident_cont(c: char) -> bool {\n matches!(c, '0'..='9' | '$') || is_ident_start(c)\n }\n let mut elems = vec![];\n let buf = &mut LexBuf::new(ident);\n let mut after_dot = false;\n buf.take_while(|ch| ch.is_ascii_whitespace());\n loop {\n let mut missing_ident = true;\n let c = buf.next();\n if c == Some('\"') {\n let s = buf.take_while(|ch| !matches!(ch, '\"'));\n if buf.next() != Some('\"') {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: Some(\"String has unclosed double quotes.\".into()),\n });\n }\n elems.push(Cow::Borrowed(s));\n missing_ident = false;\n } else if c.map(is_ident_start).unwrap_or(false) {\n buf.prev();\n let s = buf.take_while(is_ident_cont);\n elems.push(Cow::Owned(s.to_ascii_lowercase()));\n missing_ident = false;\n }\n if missing_ident {\n if c == Some('.') {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: Some(\"No valid identifier before \\\".\\\".\".into()),\n });\n } else if after_dot {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: Some(\"No valid identifier after \\\".\\\".\".into()),\n });\n } else {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: None,\n });\n }\n }\n buf.take_while(|ch| ch.is_ascii_whitespace());\n match buf.next() {\n Some('.') => {\n after_dot = true;\n buf.take_while(|ch| ch.is_ascii_whitespace());\n }\n Some(_) if strict => {\n return Err(EvalError::InvalidIdentifier {\n ident: ident.into(),\n detail: None,\n });\n }\n _ => break,\n }\n }\n Ok(elems.into())\n}\n"
---
#[derive(
proptest_derive::Arbitrary,
@@ -57,7 +57,7 @@ fn parse_ident<'a>(
strict: bool,
) -> Result<ArrayRustType<Cow<'a, str>>, EvalError> {
fn is_ident_start(c: char) -> bool {
- matches!(c, 'A'..='Z' | 'a'..='z' | '_' | '\u{80}'..= char::MAX)
+ matches!(c, 'A'..='Z' | 'a'..='z' | '_' | '\u{80}'..=char::MAX)
}
fn is_ident_cont(c: char) -> bool {
matches!(c, '0'..='9' | '$') || is_ident_start(c) |
bkirwi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve of the persist changes! All suggestions nonblocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements FoundationDB support as an alternative backend for consensus and timestamp oracle functionality in Materialize.
Changes:
- Adds FoundationDB implementations for consensus (
FdbConsensus) and timestamp oracle (FdbTimestampOracle) - Refactors metadata store configuration to support multiple backends through a unified interface
- Updates test infrastructure to allow switching between PostgreSQL, CockroachDB, and FoundationDB backends
Reviewed changes
Copilot reviewed 69 out of 73 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
src/foundationdb/ |
New crate providing shared FoundationDB utilities including network initialization and URL parsing |
src/persist/src/foundationdb.rs |
Implements Consensus trait backed by FoundationDB with key-value storage schema |
src/timestamp-oracle/src/foundationdb_oracle.rs |
Implements TimestampOracle trait backed by FoundationDB for read/write timestamp tracking |
src/timestamp-oracle/src/config.rs |
Unified configuration enum allowing selection between Postgres and FoundationDB backends |
misc/python/materialize/mzcompose/services/metadata_store.py |
New module centralizing metadata store selection logic |
misc/python/materialize/mzcompose/services/foundationdb.py |
FoundationDB service definition for mzcompose with dynamic cluster file generation |
test/foundationdb/ |
Test suite validating FoundationDB backend functionality |
| Various test files | Updates import paths to use new metadata_store module |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
62498e8 to
92a01dd
Compare
Implement a consensus provider and timestamp oracle based on FoundationDB. Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
* Move fdb testdrive to nightly. * Extract metadata-store logic into own module. * Integrate the tests into regular tests and use `libc::_exit` to avoid segfaults. Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Implements a consensus and timestamp oracle backend to talk to FoundationDB.
Run:
bin/environmentd --optimized --postgres-url foundationdb:Status:
libfdb_con aarch64 Mac.TODO