From fa4da562af439928392d7e5946be262e24a211b3 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Tue, 9 Jun 2026 10:03:03 -0400 Subject: [PATCH 01/15] Call fsync before renaming metadata file --- raphtory-graphql/src/model/mod.rs | 15 --------------- raphtory/src/serialise/graph_folder.rs | 7 ++++++- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/raphtory-graphql/src/model/mod.rs b/raphtory-graphql/src/model/mod.rs index ea5a706c58..84a62ea0ae 100644 --- a/raphtory-graphql/src/model/mod.rs +++ b/raphtory-graphql/src/model/mod.rs @@ -178,7 +178,6 @@ impl QueryRoot { } /// Returns a graph - async fn graph<'a>( ctx: &Context<'a>, #[graphql( @@ -203,7 +202,6 @@ impl QueryRoot { /// Returns lightweight metadata for a graph (node/edge counts, timestamps) without loading it. /// Requires at least INTROSPECT permission. - async fn graph_metadata<'a>( ctx: &Context<'a>, #[graphql(desc = "Graph path relative to the root namespace.")] path: String, @@ -230,7 +228,6 @@ impl QueryRoot { /// Update graph query, has side effects to update graph state /// /// Returns:: GqlMutableGraph - async fn update_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Graph path relative to the root namespace.")] path: String, @@ -247,7 +244,6 @@ impl QueryRoot { /// Update graph query, has side effects to update graph state /// /// Returns:: GqlMutableGraph - async fn vectorise_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Graph path relative to the root namespace.")] path: String, @@ -279,7 +275,6 @@ impl QueryRoot { /// Create vectorised graph in the format used for queries /// /// Returns:: GqlVectorisedGraph - async fn vectorised_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Graph path relative to the root namespace.")] path: &str, @@ -310,7 +305,6 @@ impl QueryRoot { /// Returns a specific namespace at a given path /// /// Returns:: Namespace or error if no namespace found - async fn namespace<'a>(ctx: &Context<'a>, path: String) -> Result { let data = ctx.data_unchecked::(); Ok(Namespace::try_new(data.work_dir.clone(), path)?) @@ -332,7 +326,6 @@ impl QueryRoot { /// Encodes graph and returns as string. /// /// Returns:: Base64 url safe encoded string - async fn receive_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Graph path relative to the root namespace.")] path: String, @@ -365,7 +358,6 @@ impl Mut { } /// Delete graph from a path on the server. - async fn delete_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Graph path relative to the root namespace.")] path: String, @@ -379,7 +371,6 @@ impl Mut { } /// Creates a new graph. - async fn new_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Destination path relative to the root namespace.")] path: String, @@ -413,7 +404,6 @@ impl Mut { } /// Move graph from a path on the server to a new_path on the server. - async fn move_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Current graph path relative to the root namespace.")] path: &str, @@ -439,7 +429,6 @@ impl Mut { } /// Copy graph from a path on the server to a new_path on the server. - async fn copy_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Source graph path relative to the root namespace.")] path: &str, @@ -471,7 +460,6 @@ impl Mut { /// /// Returns:: /// name of the new graph - async fn upload_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Destination path relative to the root namespace.")] path: String, @@ -496,7 +484,6 @@ impl Mut { /// /// Returns:: /// path of the new graph - async fn send_graph<'a>( ctx: &Context<'a>, #[graphql(desc = "Destination path relative to the root namespace.")] path: &str, @@ -589,7 +576,6 @@ impl Mut { /// /// Returns:: /// name of the new graph - async fn create_subgraph<'a>( ctx: &Context<'a>, #[graphql(desc = "Source graph path relative to the root namespace.")] parent_path: &str, @@ -624,7 +610,6 @@ impl Mut { } /// (Experimental) Creates search index. - async fn create_index<'a>( ctx: &Context<'a>, #[graphql(desc = "Graph path relative to the root namespace.")] path: &str, diff --git a/raphtory/src/serialise/graph_folder.rs b/raphtory/src/serialise/graph_folder.rs index 1cf96c95ee..9c0e955ffd 100644 --- a/raphtory/src/serialise/graph_folder.rs +++ b/raphtory/src/serialise/graph_folder.rs @@ -234,6 +234,7 @@ pub trait GraphPaths { let mut file = File::open(self.meta_path()?)?; file.read_to_string(&mut json)?; } + let metadata: Metadata = serde_json::from_str(&json)?; Ok(metadata.meta) } @@ -245,11 +246,15 @@ pub trait GraphPaths { path: graph_path, meta: metadata, }; + let tmp_path = self.data_path()?.path.join(".tmp"); let tmp_file = File::create(&tmp_path)?; - serde_json::to_writer(tmp_file, &meta)?; + serde_json::to_writer(&tmp_file, &meta)?; + tmp_file.sync_all()?; // Flush data to disk before rename. + let path = self.meta_path()?; fs::rename(tmp_path, path)?; + Ok(()) } From ff1ffe7f65649b731bd39fa96777def1f1f2270c Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Tue, 9 Jun 2026 14:27:35 -0400 Subject: [PATCH 02/15] Use atomic rename for config file --- db4-storage/src/persist/config.rs | 11 ++++++++--- raphtory/src/serialise/graph_folder.rs | 11 ++++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/db4-storage/src/persist/config.rs b/db4-storage/src/persist/config.rs index 3303e6011f..4279f16b9d 100644 --- a/db4-storage/src/persist/config.rs +++ b/db4-storage/src/persist/config.rs @@ -5,6 +5,7 @@ use clap::{ }; use serde::{Deserialize, Serialize, de::DeserializeOwned}; use std::{iter, path::Path}; +use tempfile::NamedTempFile; use tracing::error; pub const DEFAULT_MAX_PAGE_LEN_NODES: u32 = 600_000; // 2^17 @@ -32,9 +33,13 @@ pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized { } fn save_to_dir(&self, dir: &Path) -> Result<(), StorageError> { - let config_file = dir.join(CONFIG_FILE_NAME); - let config_file = std::fs::File::create(&config_file)?; - serde_json::to_writer_pretty(config_file, self)?; + let config_path = dir.join(CONFIG_FILE_NAME); + let mut tmp_file = NamedTempFile::new_in(dir)?; + serde_json::to_writer_pretty(&mut tmp_file, self)?; + tmp_file.as_file().sync_all()?; + tmp_file + .persist(&config_path) + .map_err(std::io::Error::from)?; Ok(()) } diff --git a/raphtory/src/serialise/graph_folder.rs b/raphtory/src/serialise/graph_folder.rs index 9c0e955ffd..0a2980bd4b 100644 --- a/raphtory/src/serialise/graph_folder.rs +++ b/raphtory/src/serialise/graph_folder.rs @@ -22,6 +22,7 @@ use std::{ io::{self, ErrorKind, Read, Seek, Write}, path::{Path, PathBuf}, }; +use tempfile::NamedTempFile; use walkdir::WalkDir; use zip::{write::FileOptions, ZipArchive, ZipWriter}; @@ -247,13 +248,13 @@ pub trait GraphPaths { meta: metadata, }; - let tmp_path = self.data_path()?.path.join(".tmp"); - let tmp_file = File::create(&tmp_path)?; - serde_json::to_writer(&tmp_file, &meta)?; - tmp_file.sync_all()?; // Flush data to disk before rename. + let data_dir = &self.data_path()?.path; + let mut tmp_file = NamedTempFile::new_in(data_dir)?; + serde_json::to_writer(&mut tmp_file, &meta)?; + tmp_file.as_file().sync_all()?; let path = self.meta_path()?; - fs::rename(tmp_path, path)?; + tmp_file.persist(&path).map_err(io::Error::from)?; Ok(()) } From 018cbdf85dc99441d426ed5f1abdf606a72c84f9 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Mon, 15 Jun 2026 14:39:41 -0400 Subject: [PATCH 03/15] Skip shutdown flush if dropped during recovery --- db4-storage/src/pages/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index c849ce8549..28422e5d10 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -377,6 +377,11 @@ impl< let wal = self.ext.wal(); let control_file = self.ext.control_file(); + // If the DB is dropped in the middle of crash recovery, skip the shutdown flush. + if control_file.db_state() == DBState::CrashRecovery { + return; + } + match self.flush() { Ok(_) => { // Log a checkpoint record in the WAL, indicating that the DB was shutdown From d68a5dbfbdc6dafe40594a3c8a303f2f6264112f Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Tue, 16 Jun 2026 11:11:14 -0400 Subject: [PATCH 04/15] Add layer as arg to add_node in GraphQL client --- raphtory-graphql/src/client/remote_graph.rs | 32 ++++++++++++++++--- .../src/python/client/remote_graph.rs | 9 ++++-- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/raphtory-graphql/src/client/remote_graph.rs b/raphtory-graphql/src/client/remote_graph.rs index de8ab115bd..badea3d512 100644 --- a/raphtory-graphql/src/client/remote_graph.rs +++ b/raphtory-graphql/src/client/remote_graph.rs @@ -55,11 +55,18 @@ impl GraphQLRemoteGraph { id: G, properties: Option>, node_type: Option, + layer: Option, ) -> Result { let template = r#" { updateGraph(path: "{{ path }}") { - addNode(time: {{ time }}, name: "{{ name }}" {% if properties is not none %}, properties: {{ properties | safe }} {% endif %}{% if node_type is not none %}, nodeType: "{{ node_type }}"{% endif %}) { + addNode( + time: {{ time }}, + name: "{{ name }}" + {% if properties is not none %}, properties: {{ properties | safe }}{% endif %} + {% if node_type is not none %}, nodeType: "{{ node_type }}"{% endif %} + {% if layer is not none %}, layer: "{{ layer }}"{% endif %} + ) { success } } @@ -72,6 +79,7 @@ impl GraphQLRemoteGraph { name => id.to_string(), properties => properties.map(|p| build_property_string(p)), node_type => node_type, + layer => layer, }; let query = build_query(template, ctx)?; @@ -106,7 +114,12 @@ impl GraphQLRemoteGraph { let template = r#" { updateGraph(path: "{{ path }}") { - createNode(time: {{ time }}, name: "{{ name }}" {% if properties is not none %}, properties: {{ properties | safe }} {% endif %}{% if node_type is not none %}, nodeType: "{{ node_type }}"{% endif %}) { + createNode( + time: {{ time }}, + name: "{{ name }}" + {% if properties is not none %}, properties: {{ properties | safe }}{% endif %} + {% if node_type is not none %}, nodeType: "{{ node_type }}"{% endif %} + ) { success } } @@ -153,7 +166,13 @@ impl GraphQLRemoteGraph { let template = r#" { updateGraph(path: "{{ path }}") { - addEdge(time: {{ time }}, src: "{{ src }}", dst: "{{ dst }}" {% if properties is not none %}, properties: {{ properties | safe }} {% endif %}{% if layer is not none %}, layer: "{{ layer }}"{% endif %}) { + addEdge( + time: {{ time }}, + src: "{{ src }}", + dst: "{{ dst }}" + {% if properties is not none %}, properties: {{ properties | safe }}{% endif %} + {% if layer is not none %}, layer: "{{ layer }}"{% endif %} + ) { success } } @@ -297,7 +316,12 @@ impl GraphQLRemoteGraph { let template = r#" { updateGraph(path: "{{ path }}") { - deleteEdge(time: {{ time }}, src: "{{ src }}", dst: "{{ dst }}" {% if layer is not none %}, layer: "{{ layer }}"{% endif %}) { + deleteEdge( + time: {{ time }}, + src: "{{ src }}", + dst: "{{ dst }}" + {% if layer is not none %}, layer: "{{ layer }}"{% endif %} + ) { success } } diff --git a/raphtory-graphql/src/python/client/remote_graph.rs b/raphtory-graphql/src/python/client/remote_graph.rs index 15d0b0992d..8c545c1159 100644 --- a/raphtory-graphql/src/python/client/remote_graph.rs +++ b/raphtory-graphql/src/python/client/remote_graph.rs @@ -219,22 +219,27 @@ impl PyRemoteGraph { /// id (str | int): The id of the node. /// properties (dict, optional): The properties of the node. /// node_type (str, optional): The optional string which will be used as a node type + /// layer (str, optional): The optional layer where the node update should be written /// /// Returns: /// RemoteNode: the new remote node - #[pyo3(signature = (timestamp, id, properties = None, node_type = None))] + #[pyo3(signature = (timestamp, id, properties = None, node_type = None, layer = None))] pub fn add_node( &self, timestamp: EventTime, id: GID, properties: Option>, node_type: Option<&str>, + layer: Option<&str>, ) -> Result { let graph = Arc::clone(&self.graph); let node_type = node_type.map(|s| s.to_string()); + let layer = layer.map(|s| s.to_string()); let node = execute_async_task(move || async move { - graph.add_node(timestamp, id, properties, node_type).await + graph + .add_node(timestamp, id, properties, node_type, layer) + .await })?; Ok(PyRemoteNode::new(node)) From b4ec87570b55730cf8ecaf896ee16882fc183777 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Wed, 17 Jun 2026 13:32:40 -0400 Subject: [PATCH 05/15] Rename LRef to LayerRef --- db4-storage/src/gen_ts.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/db4-storage/src/gen_ts.rs b/db4-storage/src/gen_ts.rs index 60a432048e..49dad06cbf 100644 --- a/db4-storage/src/gen_ts.rs +++ b/db4-storage/src/gen_ts.rs @@ -10,18 +10,18 @@ use std::ops::Range; #[derive(Clone, Debug)] pub enum LayerIter<'a> { One(LayerId), - LRef(&'a LayerIds), + LayerRef(&'a LayerIds), Multiple(Multiple), } -pub static ALL_LAYERS: LayerIter<'static> = LayerIter::LRef(&LayerIds::All); -pub static NONE_LAYERS: LayerIter<'static> = LayerIter::LRef(&LayerIds::None); +pub static ALL_LAYERS: LayerIter<'static> = LayerIter::LayerRef(&LayerIds::All); +pub static NONE_LAYERS: LayerIter<'static> = LayerIter::LayerRef(&LayerIds::None); impl<'a> LayerIter<'a> { pub fn into_iter(self, num_layers: usize) -> impl Iterator + Send + Sync + 'a { match self { LayerIter::One(id) => Iter3::I(std::iter::once(id)), - LayerIter::LRef(layers) => Iter3::J(layers.iter(num_layers)), + LayerIter::LayerRef(layers) => Iter3::J(layers.iter(num_layers)), LayerIter::Multiple(ids) => Iter3::K(ids.into_iter()), } } @@ -41,7 +41,7 @@ impl From for LayerIter<'_> { impl<'a> From<&'a LayerIds> for LayerIter<'a> { fn from(layers: &'a LayerIds) -> Self { - LayerIter::LRef(layers) + LayerIter::LayerRef(layers) } } From 24a78049e45cd6314dc717e5056dc4fb54581557 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Wed, 17 Jun 2026 13:36:36 -0400 Subject: [PATCH 06/15] Rename gen_ts to generic_time_ops --- db4-storage/src/api/nodes.rs | 2 +- db4-storage/src/{gen_ts.rs => generic_time_ops.rs} | 0 db4-storage/src/lib.rs | 4 ++-- db4-storage/src/segments/additions.rs | 2 +- db4-storage/src/segments/edge/entry.rs | 2 +- db4-storage/src/segments/node/entry.rs | 2 +- raphtory-storage/src/graph/nodes/node_entry.rs | 2 +- raphtory-storage/src/graph/nodes/node_storage_ops.rs | 2 +- .../src/db/api/storage/graph/storage_ops/time_semantics.rs | 2 +- .../db/api/view/internal/time_semantics/filtered_node.rs | 6 +++--- 10 files changed, 12 insertions(+), 12 deletions(-) rename db4-storage/src/{gen_ts.rs => generic_time_ops.rs} (100%) diff --git a/db4-storage/src/api/nodes.rs b/db4-storage/src/api/nodes.rs index 51ac8ab405..5d6c72de7e 100644 --- a/db4-storage/src/api/nodes.rs +++ b/db4-storage/src/api/nodes.rs @@ -31,7 +31,7 @@ use std::{ use crate::{ LocalPOS, error::StorageError, - gen_ts::LayerIter, + generic_time_ops::LayerIter, pages::node_store::increment_and_clamp, segments::node::segment::MemNodeSegment, utils::{Iter2, Iter3, Iter4}, diff --git a/db4-storage/src/gen_ts.rs b/db4-storage/src/generic_time_ops.rs similarity index 100% rename from db4-storage/src/gen_ts.rs rename to db4-storage/src/generic_time_ops.rs diff --git a/db4-storage/src/lib.rs b/db4-storage/src/lib.rs index d569e955bd..f74a3c2672 100644 --- a/db4-storage/src/lib.rs +++ b/db4-storage/src/lib.rs @@ -1,5 +1,5 @@ use crate::{ - gen_ts::{ + generic_time_ops::{ AdditionCellsRef, DeletionCellsRef, EdgeAdditionCellsRef, GenericTimeOps, PropAdditionCellsRef, }, @@ -35,7 +35,7 @@ use std::{ pub mod api; pub mod dir; -pub mod gen_ts; +pub mod generic_time_ops; pub mod generic_t_props; pub mod pages; pub mod persist; diff --git a/db4-storage/src/segments/additions.rs b/db4-storage/src/segments/additions.rs index 93ae2dbbad..a22b66a456 100644 --- a/db4-storage/src/segments/additions.rs +++ b/db4-storage/src/segments/additions.rs @@ -6,7 +6,7 @@ use raphtory_core::{ storage::timeindex::{EventTime, TimeIndexOps, TimeIndexWindow}, }; -use crate::{gen_ts::EdgeEventOps, utils::Iter4}; +use crate::{generic_time_ops::EdgeEventOps, utils::Iter4}; #[derive(Clone, Debug)] pub enum MemAdditions<'a> { diff --git a/db4-storage/src/segments/edge/entry.rs b/db4-storage/src/segments/edge/entry.rs index ebce52a74d..e01fd4d6bb 100644 --- a/db4-storage/src/segments/edge/entry.rs +++ b/db4-storage/src/segments/edge/entry.rs @@ -1,7 +1,7 @@ use crate::{ EdgeAdditions, EdgeDeletions, EdgeTProps, LocalPOS, api::edges::{EdgeEntryOps, EdgeRefOps}, - gen_ts::{AdditionCellsRef, DeletionCellsRef, WithTimeCells}, + generic_time_ops::{AdditionCellsRef, DeletionCellsRef, WithTimeCells}, generic_t_props::WithTProps, segments::{additions::MemAdditions, edge::segment::MemEdgeSegment}, }; diff --git a/db4-storage/src/segments/node/entry.rs b/db4-storage/src/segments/node/entry.rs index 7d0bc872f0..60f046bf23 100644 --- a/db4-storage/src/segments/node/entry.rs +++ b/db4-storage/src/segments/node/entry.rs @@ -1,7 +1,7 @@ use crate::{ LocalPOS, NodeEdgeAdditions, NodePropAdditions, NodeTProps, api::nodes::{NodeEntryOps, NodeRefOps}, - gen_ts::{EdgeAdditionCellsRef, LayerIter, PropAdditionCellsRef, WithTimeCells}, + generic_time_ops::{EdgeAdditionCellsRef, LayerIter, PropAdditionCellsRef, WithTimeCells}, generic_t_props::WithTProps, segments::{additions::MemAdditions, node::segment::MemNodeSegment}, }; diff --git a/raphtory-storage/src/graph/nodes/node_entry.rs b/raphtory-storage/src/graph/nodes/node_entry.rs index 21a93f91f0..8cb8a39bf8 100644 --- a/raphtory-storage/src/graph/nodes/node_entry.rs +++ b/raphtory-storage/src/graph/nodes/node_entry.rs @@ -8,7 +8,7 @@ use raphtory_api::core::{ use raphtory_core::storage::timeindex::EventTime; use storage::{ api::nodes::{self, NodeEntryOps}, - gen_ts::LayerIter, + generic_time_ops::LayerIter, utils::Iter2, NodeEntry, NodeEntryRef, }; diff --git a/raphtory-storage/src/graph/nodes/node_storage_ops.rs b/raphtory-storage/src/graph/nodes/node_storage_ops.rs index ccb7ddee3e..d8ae43c4de 100644 --- a/raphtory-storage/src/graph/nodes/node_storage_ops.rs +++ b/raphtory-storage/src/graph/nodes/node_storage_ops.rs @@ -9,7 +9,7 @@ use raphtory_api::core::{ }; use raphtory_core::{entities::LayerVariants, storage::timeindex::EventTime}; use std::{borrow::Cow, ops::Range, sync::Arc}; -use storage::{api::nodes::NodeRefOps, gen_ts::LayerIter, utils::Iter3, NodeEntryRef}; +use storage::{api::nodes::NodeRefOps, generic_time_ops::LayerIter, utils::Iter3, NodeEntryRef}; pub trait NodeStorageOps<'a>: Copy + Sized + Send + Sync + 'a { fn degree(self, layers: &LayerIds, dir: Direction) -> usize; diff --git a/raphtory/src/db/api/storage/graph/storage_ops/time_semantics.rs b/raphtory/src/db/api/storage/graph/storage_ops/time_semantics.rs index 3ebabedee4..d9792327a1 100644 --- a/raphtory/src/db/api/storage/graph/storage_ops/time_semantics.rs +++ b/raphtory/src/db/api/storage/graph/storage_ops/time_semantics.rs @@ -14,7 +14,7 @@ use rayon::iter::ParallelIterator; use std::ops::Range; use storage::{ api::graph_props::{GraphPropEntryOps, GraphPropRefOps}, - gen_ts::ALL_LAYERS, + generic_time_ops::ALL_LAYERS, }; impl GraphTimeSemanticsOps for GraphStorage { diff --git a/raphtory/src/db/api/view/internal/time_semantics/filtered_node.rs b/raphtory/src/db/api/view/internal/time_semantics/filtered_node.rs index 3a01ea427c..6f4d61e984 100644 --- a/raphtory/src/db/api/view/internal/time_semantics/filtered_node.rs +++ b/raphtory/src/db/api/view/internal/time_semantics/filtered_node.rs @@ -16,7 +16,7 @@ use raphtory_api::core::{ }; use raphtory_storage::{core_ops::CoreGraphOps, graph::nodes::node_storage_ops::NodeStorageOps}; use std::{ops::Range, sync::Arc}; -use storage::gen_ts::LayerIter; +use storage::generic_time_ops::LayerIter; #[derive(Debug, Clone)] pub struct NodeHistory<'a, G> { @@ -229,7 +229,7 @@ impl<'b, G: GraphViewOps<'b>> TimeIndexOps<'b> for NodeHistory<'b, G> { fn layer_ids_with_static(layer_ids: &LayerIds) -> LayerIter<'_> { match layer_ids { // All layers already includes STATIC - LayerIds::All => LayerIter::LRef(layer_ids), + LayerIds::All => LayerIter::LayerRef(layer_ids), // No layers + static = just static LayerIds::None => LayerIter::One(STATIC_GRAPH_LAYER_ID), LayerIds::One(id) => { @@ -244,7 +244,7 @@ fn layer_ids_with_static(layer_ids: &LayerIds) -> LayerIter<'_> { } LayerIds::Multiple(ids) => { if ids.contains(STATIC_GRAPH_LAYER_ID) { - LayerIter::LRef(layer_ids) + LayerIter::LayerRef(layer_ids) } else { let mut combined: Vec<_> = std::iter::once(STATIC_GRAPH_LAYER_ID) .chain(ids.iter()) From 27d711a49b1f305ef6fbb6a55c38f70dee826d13 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Wed, 17 Jun 2026 15:44:38 -0400 Subject: [PATCH 07/15] Cleanup some docs --- raphtory-graphql/src/client/mod.rs | 2 -- raphtory-graphql/src/client/raphtory_client.rs | 2 +- raphtory-graphql/src/client/remote_edge.rs | 4 +--- raphtory-graphql/src/client/remote_graph.rs | 2 +- raphtory-graphql/src/client/remote_node.rs | 4 +--- 5 files changed, 4 insertions(+), 10 deletions(-) diff --git a/raphtory-graphql/src/client/mod.rs b/raphtory-graphql/src/client/mod.rs index 8eaf1cf6f8..ebd89fe366 100644 --- a/raphtory-graphql/src/client/mod.rs +++ b/raphtory-graphql/src/client/mod.rs @@ -1,5 +1,3 @@ -//! Pure Rust GraphQL client for Raphtory GraphQL server. - mod error; pub mod raphtory_client; pub mod remote_edge; diff --git a/raphtory-graphql/src/client/raphtory_client.rs b/raphtory-graphql/src/client/raphtory_client.rs index c8af4c36ae..1318d50a9a 100644 --- a/raphtory-graphql/src/client/raphtory_client.rs +++ b/raphtory-graphql/src/client/raphtory_client.rs @@ -8,7 +8,7 @@ use serde_json::{json, Value as JsonValue}; use std::{collections::HashMap, io::Cursor}; use url::Url; -/// Pure Rust client for Raphtory GraphQL operations. +/// Client for interacting with a Raphtory GraphQL server. #[derive(Clone, Debug)] pub struct RaphtoryGraphQLClient { pub(crate) url: Url, diff --git a/raphtory-graphql/src/client/remote_edge.rs b/raphtory-graphql/src/client/remote_edge.rs index fcacc74e5e..5cc34b16c0 100644 --- a/raphtory-graphql/src/client/remote_edge.rs +++ b/raphtory-graphql/src/client/remote_edge.rs @@ -1,5 +1,3 @@ -//! Pure Rust remote edge client for GraphQL updateGraph.edge(...) operations. - use crate::client::{ build_property_string, raphtory_client::RaphtoryGraphQLClient, remote_graph::build_query, ClientError, @@ -10,7 +8,7 @@ use raphtory_api::core::{ }; use std::collections::HashMap; -/// Pure Rust remote edge wrapper around `RaphtoryGraphQLClient`. +/// A handle to a remote edge on the server. #[derive(Clone)] pub struct GraphQLRemoteEdge { pub path: String, diff --git a/raphtory-graphql/src/client/remote_graph.rs b/raphtory-graphql/src/client/remote_graph.rs index badea3d512..be1173726b 100644 --- a/raphtory-graphql/src/client/remote_graph.rs +++ b/raphtory-graphql/src/client/remote_graph.rs @@ -22,7 +22,7 @@ pub fn build_query(template: &str, context: Value) -> Result Date: Thu, 18 Jun 2026 09:52:29 -0400 Subject: [PATCH 08/15] Rename error_if_exists to require_new --- raphtory/src/db/api/mutation/addition_ops.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/raphtory/src/db/api/mutation/addition_ops.rs b/raphtory/src/db/api/mutation/addition_ops.rs index a5c510a37c..352c8ce7f5 100644 --- a/raphtory/src/db/api/mutation/addition_ops.rs +++ b/raphtory/src/db/api/mutation/addition_ops.rs @@ -172,8 +172,8 @@ impl> + StaticGraphViewOps> Addit node_type: Option<&str>, layer: Option<&str>, ) -> Result, GraphError> { - let error_if_exists = false; - add_node_impl(self, t, v, props, node_type, error_if_exists, layer) + let require_new = false; + add_node_impl(self, t, v, props, node_type, require_new, layer) } fn create_node< @@ -191,8 +191,8 @@ impl> + StaticGraphViewOps> Addit node_type: Option<&str>, layer: Option<&str>, ) -> Result, GraphError> { - let error_if_exists = true; - add_node_impl(self, t, v, props, node_type, error_if_exists, layer) + let require_new = true; + add_node_impl(self, t, v, props, node_type, require_new, layer) } fn add_edge< @@ -336,7 +336,7 @@ fn add_node_impl< v: V, props: PII, node_type: Option<&str>, - error_if_exists: bool, + require_new: bool, layer: Option<&str>, ) -> Result, GraphError> { let transaction_manager = graph.core_graph().transaction_manager()?; @@ -389,7 +389,7 @@ fn add_node_impl< let is_new = writer.node().is_new(); let node_id = writer.node().inner(); - if error_if_exists && !is_new { + if require_new && !is_new { drop(writer); let node_id = graph.node(node_id).unwrap().id(); return Err(GraphError::NodeExistsError(node_id)); From ece0e20d8516a5630d47c088fdf57aa0fe6965af Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Thu, 18 Jun 2026 12:23:57 -0400 Subject: [PATCH 09/15] Set layer_id when replaying add_node --- db4-graph/src/replay.rs | 25 +++++++++++++++---- db4-storage/src/pages/mod.rs | 4 +-- db4-storage/src/wal/entry.rs | 2 +- db4-storage/src/wal/mod.rs | 6 +++-- .../src/core/entities/properties/meta.rs | 2 +- raphtory-storage/src/mutation/addition_ops.rs | 3 ++- .../src/mutation/addition_ops_ext.rs | 2 ++ raphtory/src/db/api/mutation/addition_ops.rs | 2 +- raphtory/src/db/graph/edge.rs | 2 +- 9 files changed, 34 insertions(+), 14 deletions(-) diff --git a/db4-graph/src/replay.rs b/db4-graph/src/replay.rs index fffe0dca6a..06baeac977 100644 --- a/db4-graph/src/replay.rs +++ b/db4-graph/src/replay.rs @@ -39,9 +39,9 @@ where dst_name: Option, dst_id: VID, eid: EID, + props: Vec<(String, usize, Prop)>, layer_name: Option, layer_id: LayerId, - props: Vec<(String, usize, Prop)>, ) -> Result<(), StorageError> { // Insert node ids into resolver. if let Some(src_name) = src_name.as_ref() { @@ -457,6 +457,8 @@ where node_id: VID, node_type_and_id: Option<(String, usize)>, props: Vec<(String, usize, Prop)>, + layer_name: Option, + layer_id: LayerId, ) -> Result<(), StorageError> { // Insert node id into resolver. if let Some(ref name) = node_name { @@ -465,6 +467,19 @@ where .set(name.as_ref(), node_id)?; } + // Make layer name -> id mapping available to both edge and node meta. + if let Some(name) = layer_name.as_deref() { + self.graph() + .edge_meta() + .layer_meta() + .set_id(name, layer_id.0); + + self.graph() + .node_meta() + .layer_meta() + .set_id(name, layer_id.0); + } + // Resolve segment and check LSN. let (segment_id, pos) = self.graph().storage().nodes().resolve_pos(node_id); self.resize_segments_to_vid(node_id); @@ -490,13 +505,13 @@ where .set_id(node_type.as_str(), node_type_id); } - let node_writer = self.nodes.get_mut(segment_id).ok_or_else(|| { + let node_segment = self.nodes.get_mut(segment_id).ok_or_else(|| { StorageError::GenericFailure(format!( "Node segment {segment_id} not found during replay_add_node" )) })?; - let mut node_writer = node_writer.writer(); + let mut node_writer = node_segment.writer(); if !node_writer.has_node(pos, STATIC_GRAPH_LAYER_ID) { node_writer.increment_seg_num_nodes(); @@ -510,11 +525,11 @@ where node_writer.store_node_type(pos, STATIC_GRAPH_LAYER_ID, node_type_id); } - // Add the node with its timestamp and props. + // Add the node with its timestamp and props to the specified layer. node_writer.add_props( t, pos, - STATIC_GRAPH_LAYER_ID, + layer_id, props .into_iter() .map(|(_, prop_id, prop_value)| (prop_id, prop_value)), diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index 28422e5d10..ab8415163f 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -377,8 +377,8 @@ impl< let wal = self.ext.wal(); let control_file = self.ext.control_file(); - // If the DB is dropped in the middle of crash recovery, skip the shutdown flush. - if control_file.db_state() == DBState::CrashRecovery { + // Run a clean shutdown only if the DB is still Running. + if control_file.db_state() != DBState::Running { return; } diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index 002723af7d..d9ee20d39c 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -21,9 +21,9 @@ impl GraphWalOps for NoWal { _dst_name: Option>, _dst_id: VID, _eid: EID, + _props: Vec<(&str, usize, Prop)>, _layer_name: Option<&str>, _layer_id: LayerId, - _props: Vec<(&str, usize, Prop)>, ) -> Result { Ok(0) } diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index e933504121..efa4be9ddf 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -81,9 +81,9 @@ pub trait GraphWalOps { dst_name: Option>, dst_id: VID, eid: EID, + props: Vec<(&str, usize, Prop)>, layer_name: Option<&str>, layer_id: LayerId, - props: Vec<(&str, usize, Prop)>, ) -> Result; fn log_add_edge_metadata( @@ -183,9 +183,9 @@ pub trait GraphReplay { dst_name: Option, dst_id: VID, eid: EID, + props: Vec<(String, usize, Prop)>, layer_name: Option, layer_id: LayerId, - props: Vec<(String, usize, Prop)>, ) -> Result<(), StorageError>; fn replay_add_edge_metadata( @@ -220,6 +220,8 @@ pub trait GraphReplay { node_id: VID, node_type_and_id: Option<(String, usize)>, props: Vec<(String, usize, Prop)>, + layer_name: Option, + layer_id: LayerId, ) -> Result<(), StorageError>; fn replay_add_node_metadata( diff --git a/raphtory-api/src/core/entities/properties/meta.rs b/raphtory-api/src/core/entities/properties/meta.rs index 2296342cff..de17571649 100644 --- a/raphtory-api/src/core/entities/properties/meta.rs +++ b/raphtory-api/src/core/entities/properties/meta.rs @@ -13,7 +13,7 @@ use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use std::{ - fmt::{Debug, Formatter, Write}, + fmt::{Debug, Formatter}, ops::{Deref, DerefMut}, sync::{ atomic::{self, AtomicUsize}, diff --git a/raphtory-storage/src/mutation/addition_ops.rs b/raphtory-storage/src/mutation/addition_ops.rs index 8f7c9bcc20..34dfa217ad 100644 --- a/raphtory-storage/src/mutation/addition_ops.rs +++ b/raphtory-storage/src/mutation/addition_ops.rs @@ -56,7 +56,8 @@ pub trait InternalAdditionOps { node_type: Option<&str>, ) -> Result<(VID, usize), Self::Error>; - /// SAFETY this function assumes it is called from behind a sharded structure that does not allow the same id to be resolved at the same time by more than 1 thread + /// SAFETY this function assumes it is called from behind a sharded structure that does not + /// allow the same id to be resolved at the same time by more than 1 thread unsafe fn bulk_load_resolve_node(&self, id: GidRef<'_>) -> Result; /// validate the GidRef is the correct type diff --git a/raphtory-storage/src/mutation/addition_ops_ext.rs b/raphtory-storage/src/mutation/addition_ops_ext.rs index a37082717c..c1b71b27c2 100644 --- a/raphtory-storage/src/mutation/addition_ops_ext.rs +++ b/raphtory-storage/src/mutation/addition_ops_ext.rs @@ -683,8 +683,10 @@ impl InternalAdditionOps for TemporalGraph { } }, }; + let (segment_id, _) = self.storage().nodes().resolve_pos(node_vid); let writer = self.storage().node_writer(segment_id); + Ok(AtomicAddNode { writer, vid: MaybeNew::Existing(node_vid), diff --git a/raphtory/src/db/api/mutation/addition_ops.rs b/raphtory/src/db/api/mutation/addition_ops.rs index 352c8ce7f5..9f800808c2 100644 --- a/raphtory/src/db/api/mutation/addition_ops.rs +++ b/raphtory/src/db/api/mutation/addition_ops.rs @@ -265,9 +265,9 @@ impl> + StaticGraphViewOps> Addit dst_gid, dst_id, edge_id, + props_for_wal, layer, layer_id, - props_for_wal, )?; let props = props_with_status diff --git a/raphtory/src/db/graph/edge.rs b/raphtory/src/db/graph/edge.rs index 62316ce79a..a9c02cc4e6 100644 --- a/raphtory/src/db/graph/edge.rs +++ b/raphtory/src/db/graph/edge.rs @@ -505,9 +505,9 @@ impl EdgeView { dst_name, dst, edge_id, + props_for_wal, layer, layer_id, - props_for_wal, ) .map_err(into_graph_err)?; From 8e63f3e98ffe9fa01f707800fb782ec153e5a5a0 Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Thu, 18 Jun 2026 12:29:32 -0400 Subject: [PATCH 10/15] Run fmt --- db4-storage/src/lib.rs | 4 +- db4-storage/src/segments/edge/entry.rs | 2 +- db4-storage/src/segments/node/entry.rs | 2 +- raphtory-graphql/src/lib.rs | 3 - raphtory-graphql/src/model/graph/filtering.rs | 43 +++++-- raphtory-tests/tests/test_filters.rs | 46 ++++---- raphtory/src/db/api/state/ops/filter.rs | 22 ++-- .../graph/views/filter/model/degree_filter.rs | 105 +++++++++++------- .../src/db/graph/views/filter/model/mod.rs | 3 +- .../views/filter/model/node_filter/mod.rs | 45 +++++--- .../src/python/filter/node_filter_builders.rs | 12 +- 11 files changed, 178 insertions(+), 109 deletions(-) diff --git a/db4-storage/src/lib.rs b/db4-storage/src/lib.rs index f74a3c2672..b565625141 100644 --- a/db4-storage/src/lib.rs +++ b/db4-storage/src/lib.rs @@ -1,9 +1,9 @@ use crate::{ + generic_t_props::GenericTProps, generic_time_ops::{ AdditionCellsRef, DeletionCellsRef, EdgeAdditionCellsRef, GenericTimeOps, PropAdditionCellsRef, }, - generic_t_props::GenericTProps, pages::{ GraphStore, ReadLockedGraphStore, edge_store::ReadLockedEdgeStorage, node_store::ReadLockedNodeStorage, @@ -35,8 +35,8 @@ use std::{ pub mod api; pub mod dir; -pub mod generic_time_ops; pub mod generic_t_props; +pub mod generic_time_ops; pub mod pages; pub mod persist; pub mod properties; diff --git a/db4-storage/src/segments/edge/entry.rs b/db4-storage/src/segments/edge/entry.rs index e01fd4d6bb..ffc604b2d1 100644 --- a/db4-storage/src/segments/edge/entry.rs +++ b/db4-storage/src/segments/edge/entry.rs @@ -1,8 +1,8 @@ use crate::{ EdgeAdditions, EdgeDeletions, EdgeTProps, LocalPOS, api::edges::{EdgeEntryOps, EdgeRefOps}, - generic_time_ops::{AdditionCellsRef, DeletionCellsRef, WithTimeCells}, generic_t_props::WithTProps, + generic_time_ops::{AdditionCellsRef, DeletionCellsRef, WithTimeCells}, segments::{additions::MemAdditions, edge::segment::MemEdgeSegment}, }; use raphtory_api::core::entities::{LayerId, edges::edge_ref::Dir, properties::prop::Prop}; diff --git a/db4-storage/src/segments/node/entry.rs b/db4-storage/src/segments/node/entry.rs index 60f046bf23..3e93366cb3 100644 --- a/db4-storage/src/segments/node/entry.rs +++ b/db4-storage/src/segments/node/entry.rs @@ -1,8 +1,8 @@ use crate::{ LocalPOS, NodeEdgeAdditions, NodePropAdditions, NodeTProps, api::nodes::{NodeEntryOps, NodeRefOps}, - generic_time_ops::{EdgeAdditionCellsRef, LayerIter, PropAdditionCellsRef, WithTimeCells}, generic_t_props::WithTProps, + generic_time_ops::{EdgeAdditionCellsRef, LayerIter, PropAdditionCellsRef, WithTimeCells}, segments::{additions::MemAdditions, node::segment::MemNodeSegment}, }; use itertools::Itertools; diff --git a/raphtory-graphql/src/lib.rs b/raphtory-graphql/src/lib.rs index 184f6a0049..1a3f2acfde 100644 --- a/raphtory-graphql/src/lib.rs +++ b/raphtory-graphql/src/lib.rs @@ -536,7 +536,6 @@ mod graphql_test { graph } - fn degree_graph_with_add_edge_only() -> Graph { let graph = Graph::new(); @@ -646,8 +645,6 @@ mod graphql_test { ); } - - #[tokio::test] async fn test_unique_temporal_properties() { let g = Graph::new(); diff --git a/raphtory-graphql/src/model/graph/filtering.rs b/raphtory-graphql/src/model/graph/filtering.rs index 3033016e1b..2ab6872c9c 100644 --- a/raphtory-graphql/src/model/graph/filtering.rs +++ b/raphtory-graphql/src/model/graph/filtering.rs @@ -1,4 +1,7 @@ -use crate::model::{graph::{node_id::GqlNodeId, property::Value, timeindex::GqlTimeInput}, plugins::operation}; +use crate::model::{ + graph::{node_id::GqlNodeId, property::Value, timeindex::GqlTimeInput}, + plugins::operation, +}; use async_graphql::dynamic::ValueAccessor; use dynamic_graphql::{ internal::{ @@ -7,16 +10,35 @@ use dynamic_graphql::{ Enum, InputObject, OneOfInput, }; use raphtory::{ - db::{api::{state::ops::Degree, view::internal::filtered_edge}, graph::views::filter::model::{ - ComposableFilter, DynFilter, DynView, NoFilter, ViewWrapOps, degree_filter::DegreeFilter, edge_filter::{CompositeEdgeFilter, EdgeFilter}, filter::{Filter, FilterValue}, filter_operator::FilterOperator, graph_filter::GraphFilter, is_active_edge_filter::IsActiveEdge, is_active_node_filter::IsActiveNode, is_deleted_filter::IsDeletedEdge, is_self_loop_filter::IsSelfLoopEdge, is_valid_filter::IsValidEdge, latest_filter::Latest as LatestWrap, layered_filter::Layered, node_filter::{CompositeNodeFilter, NodeFilter}, property_filter::{Op, PropertyFilter, PropertyFilterValue, PropertyRef}, snapshot_filter::{SnapshotAt as SnapshotAtWrap, SnapshotLatest as SnapshotLatestWrap}, windowed_filter::Windowed - }}, + db::{ + api::{state::ops::Degree, view::internal::filtered_edge}, + graph::views::filter::model::{ + degree_filter::DegreeFilter, + edge_filter::{CompositeEdgeFilter, EdgeFilter}, + filter::{Filter, FilterValue}, + filter_operator::FilterOperator, + graph_filter::GraphFilter, + is_active_edge_filter::IsActiveEdge, + is_active_node_filter::IsActiveNode, + is_deleted_filter::IsDeletedEdge, + is_self_loop_filter::IsSelfLoopEdge, + is_valid_filter::IsValidEdge, + latest_filter::Latest as LatestWrap, + layered_filter::Layered, + node_filter::{CompositeNodeFilter, NodeFilter}, + property_filter::{Op, PropertyFilter, PropertyFilterValue, PropertyRef}, + snapshot_filter::{SnapshotAt as SnapshotAtWrap, SnapshotLatest as SnapshotLatestWrap}, + windowed_filter::Windowed, + ComposableFilter, DynFilter, DynView, NoFilter, ViewWrapOps, + }, + }, errors::GraphError, }; -use raphtory_api::core::Direction; use raphtory_api::core::{ entities::{properties::prop::Prop, Layer, GID}, storage::timeindex::{AsTime, EventTime}, utils::time::IntoTime, + Direction, }; use serde::{Deserialize, Serialize}; use std::{ @@ -304,7 +326,6 @@ pub struct PropertyFilterNew { pub where_: PropCondition, } - /// Filters nodes by computed degree with a directional scope. /// /// `DegreeFilterNew` lets callers filter on: @@ -330,8 +351,8 @@ pub enum DegreeDirection { impl From for Direction { fn from(d: DegreeDirection) -> Self { match d { - DegreeDirection::In => Direction::IN, - DegreeDirection::Out => Direction::OUT, + DegreeDirection::In => Direction::IN, + DegreeDirection::Out => Direction::OUT, DegreeDirection::Both => Direction::BOTH, } } @@ -342,7 +363,7 @@ impl From for String { match d { DegreeDirection::In => "in_degree".to_string(), DegreeDirection::Out => "out_degree".to_string(), - DegreeDirection::Both => "degree".to_string(), + DegreeDirection::Both => "degree".to_string(), } } } @@ -1440,8 +1461,8 @@ impl TryFrom for CompositeNodeFilter { direction: core_direction, operator, value, - ops - })) + ops, + })) } GqlNodeFilter::Property(prop) => { let prop_ref = PropertyRef::Property(prop.name.clone()); diff --git a/raphtory-tests/tests/test_filters.rs b/raphtory-tests/tests/test_filters.rs index a4bc92771c..5976a103ba 100644 --- a/raphtory-tests/tests/test_filters.rs +++ b/raphtory-tests/tests/test_filters.rs @@ -1647,32 +1647,38 @@ fn init_edges_graph_with_str_ids_del< mod test_node_filter { -use crate::{ + use crate::{ init_nodes_graph, init_nodes_graph_with_num_ids, init_nodes_graph_with_str_ids, IdentityGraphTransformer, }; + use proptest::proptest; use raphtory::{ - algorithms::alternating_mask::alternating_mask, core::entities::VID, db::{ - api::view::{Filter, filter_ops::NodeSelect}, - graph::{ - views::filter::{ - CreateFilter, model::{ - ComposableFilter, CompositeNodeFilter, NodeViewFilterOps, PropertyFilterFactory, TryAsCompositeFilter, ViewWrapOps, degree_filter::DegreeFilterFactory, node_filter::ops::{NodeFilterOps, NodeIdFilterOps}, property_filter::ops::{ListAggOps, PropertyFilterOps} - } + algorithms::alternating_mask::alternating_mask, + core::entities::VID, + db::{ + api::view::{filter_ops::NodeSelect, Filter}, + graph::views::filter::{ + model::{ + degree_filter::DegreeFilterFactory, + node_filter::ops::{NodeFilterOps, NodeIdFilterOps}, + property_filter::ops::{ElemQualifierOps, ListAggOps, PropertyFilterOps}, + ComposableFilter, CompositeNodeFilter, NodeViewFilterOps, + PropertyFilterFactory, TryAsCompositeFilter, ViewWrapOps, }, + CreateFilter, }, - }, errors::GraphError, prelude::{ - AdditionOps, Graph, GraphViewOps, NO_PROPS, NodeFilter, NodeStateOps, NodeViewOps, TimeOps - } + }, + errors::GraphError, + prelude::{ + AdditionOps, Graph, GraphViewOps, IntoProp, NodeFilter, NodeStateOps, NodeViewOps, + TimeOps, NO_PROPS, + }, }; + use raphtory_api::core::{entities::properties::prop::Prop, Direction}; use raphtory_tests::assertions::{ assert_filter_nodes_results, assert_search_nodes_results, assert_select_nodes_results, TestVariants, }; - use raphtory_api::core::{Direction, entities::properties::prop::Prop}; - use raphtory::prelude::IntoProp; - use raphtory::db::graph::views::filter::model::property_filter::ops::ElemQualifierOps; - use proptest::proptest; fn sort_vids(mut vids: Vec) -> Vec { vids.sort(); @@ -1717,7 +1723,8 @@ use crate::{ .map(|n| n.node) .collect::>(); - let expected_filter_nodes = candidates_with_history_after_filtering(graph, expected_select_nodes.clone()); + let expected_filter_nodes = + candidates_with_history_after_filtering(graph, expected_select_nodes.clone()); let filtered_event_graph = graph.filter(filter.clone()).unwrap(); let filtered_event_nodes = sort_vids( @@ -1799,7 +1806,6 @@ use crate::{ graph } - fn degree_graph_with_add_edge_only() -> Graph { let graph = Graph::new(); @@ -1844,7 +1850,6 @@ use crate::{ graph } - // Property-based tests for degree filtering proptest! { #[test] @@ -2033,7 +2038,7 @@ use crate::{ |d| d > threshold as usize && d < (threshold + 5) as usize, &format!("OUT > {} AND OUT < {}", threshold, threshold + 5), ); - } + } #[test] fn prop_degree_filter_or(threshold in 0u64..15) { @@ -2158,7 +2163,7 @@ use crate::{ &format!("OUT is_not_in({}, {})", val1, val2), ); } - } + } #[test] fn test_degree_filter_with_invalid_expressions() { @@ -13021,4 +13026,3 @@ mod test_edge_composite_filter { ); } } - diff --git a/raphtory/src/db/api/state/ops/filter.rs b/raphtory/src/db/api/state/ops/filter.rs index 11c5a90747..25e4b70d47 100644 --- a/raphtory/src/db/api/state/ops/filter.rs +++ b/raphtory/src/db/api/state/ops/filter.rs @@ -2,18 +2,25 @@ use crate::{ db::{ api::{ state::{ - Index, ops::{Const, Degree, IntoDynNodeOp, NodeOp, TypeId} + ops::{Const, Degree, IntoDynNodeOp, NodeOp, TypeId}, + Index, }, view::internal::{GraphView, NodeList}, }, graph::{ create_node_type_filter, - views::filter::model::{FilterOperator, degree_filter::DegreeFilter, filter::{Filter, FilterValue}, node_filter::NodeFilter, property_filter::PropertyFilterValue}, + views::filter::model::{ + degree_filter::DegreeFilter, + filter::{Filter, FilterValue}, + node_filter::NodeFilter, + property_filter::PropertyFilterValue, + FilterOperator, + }, }, }, prelude::{GraphViewOps, PropertyFilter}, }; -use raphtory_api::core::entities::{VID, properties::prop::Prop}; +use raphtory_api::core::entities::{properties::prop::Prop, VID}; use raphtory_core::entities::nodes::node_ref::AsNodeRef; use raphtory_storage::graph::{graph::GraphStorage, nodes::node_storage_ops::NodeStorageOps}; use std::sync::Arc; @@ -225,19 +232,19 @@ impl NodeOp for NodePropertyFilterOp { pub struct NodeDegreeFilterOp { degree: Degree, operator: FilterOperator, - value: PropertyFilterValue + value: PropertyFilterValue, } impl NodeDegreeFilterOp { pub(crate) fn new(graph: G, filter: DegreeFilter) -> Self { let degree = Degree { dir: filter.direction, - view: graph + view: graph, }; Self { degree, operator: filter.operator, - value: filter.value + value: filter.value, } } } @@ -248,7 +255,8 @@ impl NodeOp for NodeDegreeFilterOp { fn apply(&self, storage: &GraphStorage, node: VID) -> Self::Output { let node_degree = self.degree.apply(storage, node); let node_degree_prop = Prop::U64(node_degree as u64); - self.operator.apply_to_property(&self.value, Some(&node_degree_prop)) + self.operator + .apply_to_property(&self.value, Some(&node_degree_prop)) } } diff --git a/raphtory/src/db/graph/views/filter/model/degree_filter.rs b/raphtory/src/db/graph/views/filter/model/degree_filter.rs index 18e5f3d463..ef6bc5c56f 100644 --- a/raphtory/src/db/graph/views/filter/model/degree_filter.rs +++ b/raphtory/src/db/graph/views/filter/model/degree_filter.rs @@ -1,24 +1,35 @@ -use std::collections::HashSet; -use std::sync::Arc; - -use raphtory_api::core::entities::properties::prop::PropType; -use raphtory_api::core::{Direction, entities::properties::prop::Prop}; -use raphtory_core::entities::{VID}; +use std::{collections::HashSet, sync::Arc}; + +use crate::{ + db::{ + api::{ + state::ops::{filter::NodeDegreeFilterOp, GraphView}, + view::{GraphViewOps, NodeViewOps}, + }, + graph::views::filter::{ + model, + model::{ + property_filter::{ + builders::{PropertyExprBuilder, PropertyExprBuilderInput}, + Op, PropertyFilter, PropertyFilterInput, PropertyFilterValue, PropertyRef, + }, + CombinedFilter, ComposableFilter, CompositeNodeFilter, EntityMarker, + FilterOperator, InternalPropertyFilterBuilder, NodeFilter, TryAsCompositeFilter, + }, + node_filtered_graph::NodeFilteredGraph, + CreateFilter, + }, + }, + errors::GraphError, +}; +use raphtory_api::core::{ + entities::properties::prop::{Prop, PropType}, + Direction, +}; +use raphtory_core::entities::VID; use raphtory_storage::graph::nodes::{node_ref::NodeStorageRef, node_storage_ops::NodeStorageOps}; -use crate::db::api::state::ops::GraphView; -use crate::db::api::state::ops::filter::NodeDegreeFilterOp; -use crate::db::graph::views::filter::CreateFilter; -use crate::db::graph::views::filter::model::{ComposableFilter, CompositeNodeFilter, NodeFilter}; -use crate::db::graph::views::filter::model::property_filter::{Op, PropertyFilterInput, PropertyRef, PropertyFilter}; -use crate::db::graph::views::filter::model::property_filter::builders::{PropertyExprBuilder, PropertyExprBuilderInput}; -use crate::db::graph::views::filter::model::{CombinedFilter, EntityMarker, InternalPropertyFilterBuilder, TryAsCompositeFilter}; -use crate::db::graph::views::filter::model; -use crate::db::graph::views::filter::node_filtered_graph::NodeFilteredGraph; -use crate::db::{api::view::{GraphViewOps, NodeViewOps}, graph::views::filter::model::{FilterOperator, property_filter::PropertyFilterValue}}; -use crate::errors::GraphError; use std::{fmt, fmt::Display}; - #[derive(Clone)] pub struct DegreeFilterBuilder { direction: Direction, @@ -42,7 +53,6 @@ pub struct DegreeFilter { pub ops: Vec, } - impl CreateFilter for DegreeFilter { type EntityFiltered<'graph, G: GraphViewOps<'graph>> = NodeFilteredGraph>; @@ -73,22 +83,30 @@ impl CreateFilter for DegreeFilter { )); } match self.operator { - FilterOperator::Eq | FilterOperator::Ne| FilterOperator::Gt | FilterOperator::Ge | FilterOperator::Lt | FilterOperator::Le | FilterOperator::IsIn | FilterOperator::IsNotIn => {}, + FilterOperator::Eq + | FilterOperator::Ne + | FilterOperator::Gt + | FilterOperator::Ge + | FilterOperator::Lt + | FilterOperator::Le + | FilterOperator::IsIn + | FilterOperator::IsNotIn => {} _ => { - return Err(GraphError::InvalidFilter( - format!("degree filter does not support operator {:?}", self.operator) - )); + return Err(GraphError::InvalidFilter(format!( + "degree filter does not support operator {:?}", + self.operator + ))); } } let value = match self.value { PropertyFilterValue::Single(ref prop_val) => { let casted_val = prop_val.clone().try_cast(PropType::U64).ok_or_else(|| { GraphError::InvalidFilter(format!( - "degree filter expects an integer value, got {}", + "degree filter expects an integer value, got {}", prop_val.to_string() )) })?; - + PropertyFilterValue::Single(casted_val) } PropertyFilterValue::Set(ref prop_vals) => { @@ -97,7 +115,7 @@ impl CreateFilter for DegreeFilter { .map(|val| { val.clone().try_cast(PropType::U64).ok_or_else(|| { GraphError::InvalidFilter(format!( - "degree filter expects an integer value, got {}", + "degree filter expects an integer value, got {}", val.to_string() )) }) @@ -108,11 +126,11 @@ impl CreateFilter for DegreeFilter { } PropertyFilterValue::None => { return Err(GraphError::InvalidFilter( - "degree filter requires a value".to_string() + "degree filter requires a value".to_string(), )); } - }; - let mut filter = self.clone(); + }; + let mut filter = self.clone(); filter.value = value; Ok(NodeDegreeFilterOp::new(graph, filter)) } @@ -126,19 +144,20 @@ impl CreateFilter for DegreeFilter { } impl TryAsCompositeFilter for DegreeFilter { - fn try_as_composite_edge_filter(&self) -> Result { - Err(GraphError::NotSupported) + fn try_as_composite_edge_filter( + &self, + ) -> Result { + Err(GraphError::NotSupported) } fn try_as_composite_exploded_edge_filter( &self, - ) -> Result - { - Err(GraphError::NotSupported) - } + ) -> Result { + Err(GraphError::NotSupported) + } fn try_as_composite_node_filter(&self) -> Result { Ok(CompositeNodeFilter::Degree(self.clone())) } -} +} fn property_ref(direction: &Direction) -> PropertyRef { match direction { @@ -150,7 +169,7 @@ fn property_ref(direction: &Direction) -> PropertyRef { impl InternalPropertyFilterBuilder for DegreeFilterBuilder where - DegreeFilter: CombinedFilter + DegreeFilter: CombinedFilter, { type Filter = DegreeFilter; type ExprBuilder = DegreeFilterBuilder; @@ -170,10 +189,10 @@ where fn filter(&self, filter: PropertyFilterInput) -> Self::Filter { DegreeFilter { - value: filter.prop_value, - direction: self.direction, - operator: filter.operator, - ops: filter.ops, + value: filter.prop_value, + direction: self.direction, + operator: filter.operator, + ops: filter.ops, } } @@ -187,7 +206,7 @@ where impl ComposableFilter for DegreeFilter {} pub trait DegreeFilterFactory { - fn in_degree(&self) -> DegreeFilterBuilder; + fn in_degree(&self) -> DegreeFilterBuilder; fn out_degree(&self) -> DegreeFilterBuilder; fn degree(&self) -> DegreeFilterBuilder; } @@ -203,4 +222,4 @@ impl Display for DegreeFilter { }; property_filter.fmt(f) } -} +} diff --git a/raphtory/src/db/graph/views/filter/model/mod.rs b/raphtory/src/db/graph/views/filter/model/mod.rs index ef20e4609e..b95369693f 100644 --- a/raphtory/src/db/graph/views/filter/model/mod.rs +++ b/raphtory/src/db/graph/views/filter/model/mod.rs @@ -56,6 +56,7 @@ use raphtory_api::core::{ use std::{ops::Deref, sync::Arc}; pub mod and_filter; +pub mod degree_filter; pub mod edge_filter; pub mod exploded_edge_filter; pub mod filter; @@ -75,8 +76,6 @@ pub mod or_filter; pub mod property_filter; pub mod snapshot_filter; pub mod windowed_filter; -pub mod degree_filter; - #[derive(Debug, Copy, Clone)] pub struct NoFilter; diff --git a/raphtory/src/db/graph/views/filter/model/node_filter/mod.rs b/raphtory/src/db/graph/views/filter/model/node_filter/mod.rs index 4dd4cc38fd..85bf52f82e 100644 --- a/raphtory/src/db/graph/views/filter/model/node_filter/mod.rs +++ b/raphtory/src/db/graph/views/filter/model/node_filter/mod.rs @@ -1,31 +1,47 @@ use crate::{ + api::core::Direction, db::{ api::{ state::{ - NodeStateValue, TypedNodeState, ops::{ - NodeOp, TypeId, filter::{ + ops::{ + filter::{ AndOp, MaskOp, NodeIdFilterOp, NodeNameFilterOp, NodeTypeFilterOp, NotOp, OrOp, - } - } + }, + NodeOp, TypeId, + }, + NodeStateValue, TypedNodeState, }, - view::{BoxableGraphView, internal::GraphView}, + view::{internal::GraphView, BoxableGraphView}, }, graph::views::filter::{ - CreateFilter, model::{ - AndFilter, CombinedFilter, ComposableFilter, CompositeExplodedEdgeFilter, EntityMarker, InternalPropertyFilterFactory, InternalViewWrapOps, NodeViewFilterOps, NotFilter, OrFilter, TryAsCompositeFilter, Wrap, degree_filter::{DegreeFilter, DegreeFilterBuilder}, edge_filter::CompositeEdgeFilter, filter::Filter, is_active_node_filter::IsActiveNode, latest_filter::Latest, layered_filter::Layered, node_filter::{ + model::{ + degree_filter::{DegreeFilter, DegreeFilterBuilder, DegreeFilterFactory}, + edge_filter::CompositeEdgeFilter, + filter::Filter, + is_active_node_filter::IsActiveNode, + latest_filter::Latest, + layered_filter::Layered, + node_filter::{ builders::{NodeIdFilterBuilder, NodeNameFilterBuilder, NodeTypeFilterBuilder}, validate::validate, - }, node_state_filter::NodeStateBoolColOp, property_filter::builders::{MetadataFilterBuilder, PropertyFilterBuilder}, snapshot_filter::{SnapshotAt, SnapshotLatest}, windowed_filter::Windowed - }, node_filtered_graph::NodeFilteredGraph + }, + node_state_filter::NodeStateBoolColOp, + property_filter::builders::{MetadataFilterBuilder, PropertyFilterBuilder}, + snapshot_filter::{SnapshotAt, SnapshotLatest}, + windowed_filter::Windowed, + AndFilter, CombinedFilter, ComposableFilter, CompositeExplodedEdgeFilter, + EntityMarker, InternalPropertyFilterFactory, InternalViewWrapOps, + NodeViewFilterOps, NotFilter, OrFilter, TryAsCompositeFilter, Wrap, + }, + node_filtered_graph::NodeFilteredGraph, + CreateFilter, }, }, errors::GraphError, prelude::{GraphViewOps, PropertyFilter}, }; use raphtory_api::core::storage::timeindex::EventTime; -use crate::api::core::Direction; -use crate::db::graph::views::filter::model::degree_filter::DegreeFilterFactory; use std::{fmt, fmt::Display, sync::Arc}; pub mod builders; @@ -106,19 +122,18 @@ impl InternalPropertyFilterFactory for NodeFilter { impl DegreeFilterFactory for NodeFilter { fn degree(&self) -> DegreeFilterBuilder { - DegreeFilterBuilder::new(Direction::BOTH) + DegreeFilterBuilder::new(Direction::BOTH) } fn in_degree(&self) -> DegreeFilterBuilder { - DegreeFilterBuilder::new(Direction::IN) + DegreeFilterBuilder::new(Direction::IN) } fn out_degree(&self) -> DegreeFilterBuilder { - DegreeFilterBuilder::new(Direction::OUT) + DegreeFilterBuilder::new(Direction::OUT) } } - impl NodeViewFilterOps for NodeFilter { type Output = T; diff --git a/raphtory/src/python/filter/node_filter_builders.rs b/raphtory/src/python/filter/node_filter_builders.rs index 84317343c8..1fd4147f11 100644 --- a/raphtory/src/python/filter/node_filter_builders.rs +++ b/raphtory/src/python/filter/node_filter_builders.rs @@ -1,8 +1,14 @@ use crate::{ db::graph::views::filter::model::{ - NodeViewFilterOps, PropertyFilterFactory, ViewWrapOps, degree_filter::DegreeFilterFactory, node_filter::{ - NodeFilter, builders::{NodeIdFilterBuilder, NodeNameFilterBuilder, NodeTypeFilterBuilder}, ops::{NodeFilterOps, NodeIdFilterOps} - }, node_state_filter::NodeStateBoolColOp, property_filter::builders::{MetadataFilterBuilder, PropertyFilterBuilder} + degree_filter::DegreeFilterFactory, + node_filter::{ + builders::{NodeIdFilterBuilder, NodeNameFilterBuilder, NodeTypeFilterBuilder}, + ops::{NodeFilterOps, NodeIdFilterOps}, + NodeFilter, + }, + node_state_filter::NodeStateBoolColOp, + property_filter::builders::{MetadataFilterBuilder, PropertyFilterBuilder}, + NodeViewFilterOps, PropertyFilterFactory, ViewWrapOps, }, python::{ filter::{ From 9cb73143f19d99a6d2f89afa8ff667e11ab4e65f Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 18 Jun 2026 19:15:35 +0000 Subject: [PATCH 11/15] chore: apply tidy-public auto-fixes --- python/python/raphtory/graphql/__init__.pyi | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/python/raphtory/graphql/__init__.pyi b/python/python/raphtory/graphql/__init__.pyi index 38b0a230fb..5ca61e19e0 100644 --- a/python/python/raphtory/graphql/__init__.pyi +++ b/python/python/raphtory/graphql/__init__.pyi @@ -417,6 +417,7 @@ class RemoteGraph(object): id: str | int, properties: Optional[dict] = None, node_type: Optional[str] = None, + layer: Optional[str] = None, ) -> RemoteNode: """ Adds a new node with the given id and properties to the remote graph. @@ -426,6 +427,7 @@ class RemoteGraph(object): id (str | int): The id of the node. properties (dict, optional): The properties of the node. node_type (str, optional): The optional string which will be used as a node type + layer (str, optional): The optional layer where the node update should be written Returns: RemoteNode: the new remote node From bbfe3a5020c1c56c942c34d5d66c376f09d2ca8c Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Thu, 18 Jun 2026 16:23:40 -0400 Subject: [PATCH 12/15] layer --- raphtory-tests/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raphtory-tests/src/utils.rs b/raphtory-tests/src/utils.rs index 52f4301c07..ff7b095058 100644 --- a/raphtory-tests/src/utils.rs +++ b/raphtory-tests/src/utils.rs @@ -1310,9 +1310,9 @@ pub enum GraphMutation { src: u64, dst: u64, time: i64, - layer: Option>, props: Vec<(String, Prop)>, metadata: Vec<(String, Prop)>, + layer: Option>, }, DeleteEdge { src: u64, From 23b57b958c7a6b3fdac6d631f3eacec3477a59cb Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Thu, 18 Jun 2026 22:14:19 -0400 Subject: [PATCH 13/15] Modify replay to take None for starting LSN --- db4-storage/src/wal/entry.rs | 2 +- db4-storage/src/wal/mod.rs | 6 ++++-- db4-storage/src/wal/no_wal.rs | 2 +- raphtory-storage/src/recovery_ops.rs | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index d9ee20d39c..f772e9bb30 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -126,7 +126,7 @@ impl GraphWalOps for NoWal { fn replay_to_graph( &self, _graph: &mut G, - _start: LSN, + _start: Option, ) -> Result { panic!("NoWAL does not support replay") } diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index efa4be9ddf..acaa1d21fa 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -25,7 +25,8 @@ pub trait WalOps { fn read(&self, lsn: LSN) -> Result, StorageError>; /// Returns an iterator over the entries in the wal, starting from the given LSN. - fn replay(&self, start: LSN) -> impl Iterator>; + /// If `start` is `None`, replay begins at the first record in the WAL stream. + fn replay(&self, start: Option) -> impl Iterator>; /// Returns the current position in the WAL stream. fn position(&self) -> LSN; @@ -163,11 +164,12 @@ pub trait GraphWalOps { fn read_shutdown_checkpoint(&self, lsn: LSN) -> Result; /// Replays and applies all the entries in the wal to the given graph, starting from the given LSN. + /// If `start` is `None`, replay begins at the first record in the WAL stream. /// Returns the LSN immediately after the last entry in the WAL stream on success. fn replay_to_graph( &self, graph: &mut G, - start: LSN, + start: Option, ) -> Result; } diff --git a/db4-storage/src/wal/no_wal.rs b/db4-storage/src/wal/no_wal.rs index 7f563080ca..321dd04d32 100644 --- a/db4-storage/src/wal/no_wal.rs +++ b/db4-storage/src/wal/no_wal.rs @@ -17,7 +17,7 @@ impl WalOps for NoWal { Ok(()) } - fn replay(&self, _start: LSN) -> impl Iterator> { + fn replay(&self, _start: Option) -> impl Iterator> { let error = "Recovery is not supported for NoWAL"; std::iter::once(Err(StorageError::GenericFailure(error.to_string()))) } diff --git a/raphtory-storage/src/recovery_ops.rs b/raphtory-storage/src/recovery_ops.rs index 1f94473a9a..ad74dd3f96 100644 --- a/raphtory-storage/src/recovery_ops.rs +++ b/raphtory-storage/src/recovery_ops.rs @@ -29,9 +29,9 @@ pub trait RecoveryOps: DurabilityOps + InternalAdditionOps Date: Fri, 19 Jun 2026 09:09:17 -0400 Subject: [PATCH 14/15] Cargo fmt --- db4-storage/src/wal/mod.rs | 5 ++++- db4-storage/src/wal/no_wal.rs | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index acaa1d21fa..e78e6b45ba 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -26,7 +26,10 @@ pub trait WalOps { /// Returns an iterator over the entries in the wal, starting from the given LSN. /// If `start` is `None`, replay begins at the first record in the WAL stream. - fn replay(&self, start: Option) -> impl Iterator>; + fn replay( + &self, + start: Option, + ) -> impl Iterator>; /// Returns the current position in the WAL stream. fn position(&self) -> LSN; diff --git a/db4-storage/src/wal/no_wal.rs b/db4-storage/src/wal/no_wal.rs index 321dd04d32..05e3d9e0ad 100644 --- a/db4-storage/src/wal/no_wal.rs +++ b/db4-storage/src/wal/no_wal.rs @@ -17,7 +17,10 @@ impl WalOps for NoWal { Ok(()) } - fn replay(&self, _start: Option) -> impl Iterator> { + fn replay( + &self, + _start: Option, + ) -> impl Iterator> { let error = "Recovery is not supported for NoWAL"; std::iter::once(Err(StorageError::GenericFailure(error.to_string()))) } From 6898b0a57456e6db4c9a5b9b8c888f946f6ef2fc Mon Sep 17 00:00:00 2001 From: Fadhil Abubaker Date: Mon, 22 Jun 2026 22:53:17 -0400 Subject: [PATCH 15/15] Run flush on drop even if there's no WAL --- db4-storage/src/pages/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index ab8415163f..48cb4e4b94 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -377,8 +377,13 @@ impl< let wal = self.ext.wal(); let control_file = self.ext.control_file(); - // Run a clean shutdown only if the DB is still Running. - if control_file.db_state() != DBState::Running { + // Skip running a clean flush if the DB is in shutdown or crash recovery state. + // Note that the state can be Shutdown after the graph is loaded and before recovery + // is complete. + if matches!( + control_file.db_state(), + DBState::Shutdown | DBState::CrashRecovery + ) { return; }