Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions db4-graph/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ where
dst_name: Option<GID>,
dst_id: VID,
eid: EID,
props: Vec<(String, usize, Prop)>,
layer_name: Option<String>,
layer_id: LayerId,
props: Vec<(String, usize, Prop)>,
) -> Result<(), StorageError> {
// Insert node ids into resolver.
if let Some(src_name) = src_name.as_ref() {
Expand Down Expand Up @@ -457,6 +457,8 @@ where
node_id: VID,
node_type_and_id: Option<(String, usize)>,
props: Vec<(String, usize, Prop)>,
layer_name: Option<String>,
layer_id: LayerId,
) -> Result<(), StorageError> {
// Insert node id into resolver.
if let Some(ref name) = node_name {
Expand All @@ -465,6 +467,19 @@ where
.set(name.as_ref(), node_id)?;
}

// Make layer name -> id mapping available to both edge and node meta.
if let Some(name) = layer_name.as_deref() {
self.graph()
.edge_meta()
.layer_meta()
.set_id(name, layer_id.0);

self.graph()
.node_meta()
.layer_meta()
.set_id(name, layer_id.0);
}

// Resolve segment and check LSN.
let (segment_id, pos) = self.graph().storage().nodes().resolve_pos(node_id);
self.resize_segments_to_vid(node_id);
Expand All @@ -490,13 +505,13 @@ where
.set_id(node_type.as_str(), node_type_id);
}

let node_writer = self.nodes.get_mut(segment_id).ok_or_else(|| {
let node_segment = self.nodes.get_mut(segment_id).ok_or_else(|| {
StorageError::GenericFailure(format!(
"Node segment {segment_id} not found during replay_add_node"
))
})?;

let mut node_writer = node_writer.writer();
let mut node_writer = node_segment.writer();

if !node_writer.has_node(pos, STATIC_GRAPH_LAYER_ID) {
node_writer.increment_seg_num_nodes();
Expand All @@ -510,11 +525,11 @@ where
node_writer.store_node_type(pos, STATIC_GRAPH_LAYER_ID, node_type_id);
}

// Add the node with its timestamp and props.
// Add the node with its timestamp and props to the specified layer.
node_writer.add_props(
t,
pos,
STATIC_GRAPH_LAYER_ID,
layer_id,
props
.into_iter()
.map(|(_, prop_id, prop_value)| (prop_id, prop_value)),
Expand Down
2 changes: 1 addition & 1 deletion db4-storage/src/api/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::{
use crate::{
LocalPOS,
error::StorageError,
gen_ts::LayerIter,
generic_time_ops::LayerIter,
pages::node_store::increment_and_clamp,
segments::node::segment::MemNodeSegment,
utils::{Iter2, Iter3, Iter4},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ use std::ops::Range;
#[derive(Clone, Debug)]
pub enum LayerIter<'a> {
One(LayerId),
LRef(&'a LayerIds),
LayerRef(&'a LayerIds),
Multiple(Multiple),
}

pub static ALL_LAYERS: LayerIter<'static> = LayerIter::LRef(&LayerIds::All);
pub static NONE_LAYERS: LayerIter<'static> = LayerIter::LRef(&LayerIds::None);
pub static ALL_LAYERS: LayerIter<'static> = LayerIter::LayerRef(&LayerIds::All);
pub static NONE_LAYERS: LayerIter<'static> = LayerIter::LayerRef(&LayerIds::None);

impl<'a> LayerIter<'a> {
pub fn into_iter(self, num_layers: usize) -> impl Iterator<Item = LayerId> + Send + Sync + 'a {
match self {
LayerIter::One(id) => Iter3::I(std::iter::once(id)),
LayerIter::LRef(layers) => Iter3::J(layers.iter(num_layers)),
LayerIter::LayerRef(layers) => Iter3::J(layers.iter(num_layers)),
LayerIter::Multiple(ids) => Iter3::K(ids.into_iter()),
}
}
Expand All @@ -41,7 +41,7 @@ impl From<LayerId> for LayerIter<'_> {

impl<'a> From<&'a LayerIds> for LayerIter<'a> {
fn from(layers: &'a LayerIds) -> Self {
LayerIter::LRef(layers)
LayerIter::LayerRef(layers)
}
}

Expand Down
6 changes: 3 additions & 3 deletions db4-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
gen_ts::{
generic_t_props::GenericTProps,
generic_time_ops::{
AdditionCellsRef, DeletionCellsRef, EdgeAdditionCellsRef, GenericTimeOps,
PropAdditionCellsRef,
},
generic_t_props::GenericTProps,
pages::{
GraphStore, ReadLockedGraphStore, edge_store::ReadLockedEdgeStorage,
node_store::ReadLockedNodeStorage,
Expand Down Expand Up @@ -35,8 +35,8 @@ use std::{

pub mod api;
pub mod dir;
pub mod gen_ts;
pub mod generic_t_props;
pub mod generic_time_ops;
pub mod pages;
pub mod persist;
pub mod properties;
Expand Down
5 changes: 5 additions & 0 deletions db4-storage/src/pages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ impl<
let wal = self.ext.wal();
let control_file = self.ext.control_file();

// Run a clean shutdown only if the DB is still Running.
if control_file.db_state() != DBState::Running {
return;
}

match self.flush() {
Ok(_) => {
// Log a checkpoint record in the WAL, indicating that the DB was shutdown
Expand Down
11 changes: 8 additions & 3 deletions db4-storage/src/persist/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use clap::{
};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use std::{iter, path::Path};
use tempfile::NamedTempFile;
use tracing::error;

pub const DEFAULT_MAX_PAGE_LEN_NODES: u32 = 600_000; // 2^17
Expand Down Expand Up @@ -32,9 +33,13 @@ pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized {
}

fn save_to_dir(&self, dir: &Path) -> Result<(), StorageError> {
let config_file = dir.join(CONFIG_FILE_NAME);
let config_file = std::fs::File::create(&config_file)?;
serde_json::to_writer_pretty(config_file, self)?;
let config_path = dir.join(CONFIG_FILE_NAME);
let mut tmp_file = NamedTempFile::new_in(dir)?;
serde_json::to_writer_pretty(&mut tmp_file, self)?;
tmp_file.as_file().sync_all()?;
tmp_file
.persist(&config_path)
.map_err(std::io::Error::from)?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion db4-storage/src/segments/additions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use raphtory_core::{
storage::timeindex::{EventTime, TimeIndexOps, TimeIndexWindow},
};

use crate::{gen_ts::EdgeEventOps, utils::Iter4};
use crate::{generic_time_ops::EdgeEventOps, utils::Iter4};

#[derive(Clone, Debug)]
pub enum MemAdditions<'a> {
Expand Down
2 changes: 1 addition & 1 deletion db4-storage/src/segments/edge/entry.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
EdgeAdditions, EdgeDeletions, EdgeTProps, LocalPOS,
api::edges::{EdgeEntryOps, EdgeRefOps},
gen_ts::{AdditionCellsRef, DeletionCellsRef, WithTimeCells},
generic_t_props::WithTProps,
generic_time_ops::{AdditionCellsRef, DeletionCellsRef, WithTimeCells},
segments::{additions::MemAdditions, edge::segment::MemEdgeSegment},
};
use raphtory_api::core::entities::{LayerId, edges::edge_ref::Dir, properties::prop::Prop};
Expand Down
2 changes: 1 addition & 1 deletion db4-storage/src/segments/node/entry.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
LocalPOS, NodeEdgeAdditions, NodePropAdditions, NodeTProps,
api::nodes::{NodeEntryOps, NodeRefOps},
gen_ts::{EdgeAdditionCellsRef, LayerIter, PropAdditionCellsRef, WithTimeCells},
generic_t_props::WithTProps,
generic_time_ops::{EdgeAdditionCellsRef, LayerIter, PropAdditionCellsRef, WithTimeCells},
segments::{additions::MemAdditions, node::segment::MemNodeSegment},
};
use itertools::Itertools;
Expand Down
4 changes: 2 additions & 2 deletions db4-storage/src/wal/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ impl GraphWalOps for NoWal {
_dst_name: Option<GidRef<'_>>,
_dst_id: VID,
_eid: EID,
_props: Vec<(&str, usize, Prop)>,
_layer_name: Option<&str>,
_layer_id: LayerId,
_props: Vec<(&str, usize, Prop)>,
) -> Result<LSN, StorageError> {
Ok(0)
}
Expand Down Expand Up @@ -126,7 +126,7 @@ impl GraphWalOps for NoWal {
fn replay_to_graph<G: GraphReplay>(
&self,
_graph: &mut G,
_start: LSN,
_start: Option<LSN>,
) -> Result<LSN, StorageError> {
panic!("NoWAL does not support replay")
}
Expand Down
15 changes: 11 additions & 4 deletions db4-storage/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ pub trait WalOps {
fn read(&self, lsn: LSN) -> Result<Option<ReplayRecord>, StorageError>;

/// Returns an iterator over the entries in the wal, starting from the given LSN.
fn replay(&self, start: LSN) -> impl Iterator<Item = Result<ReplayRecord, StorageError>>;
/// If `start` is `None`, replay begins at the first record in the WAL stream.
fn replay(
&self,
start: Option<LSN>,
) -> impl Iterator<Item = Result<ReplayRecord, StorageError>>;

/// Returns the current position in the WAL stream.
fn position(&self) -> LSN;
Expand Down Expand Up @@ -81,9 +85,9 @@ pub trait GraphWalOps {
dst_name: Option<GidRef<'_>>,
dst_id: VID,
eid: EID,
props: Vec<(&str, usize, Prop)>,
layer_name: Option<&str>,
layer_id: LayerId,
props: Vec<(&str, usize, Prop)>,
) -> Result<LSN, StorageError>;

fn log_add_edge_metadata(
Expand Down Expand Up @@ -163,11 +167,12 @@ pub trait GraphWalOps {
fn read_shutdown_checkpoint(&self, lsn: LSN) -> Result<LSN, StorageError>;

/// Replays and applies all the entries in the wal to the given graph, starting from the given LSN.
/// If `start` is `None`, replay begins at the first record in the WAL stream.
/// Returns the LSN immediately after the last entry in the WAL stream on success.
fn replay_to_graph<G: GraphReplay>(
&self,
graph: &mut G,
start: LSN,
start: Option<LSN>,
) -> Result<LSN, StorageError>;
}

Expand All @@ -183,9 +188,9 @@ pub trait GraphReplay {
dst_name: Option<GID>,
dst_id: VID,
eid: EID,
props: Vec<(String, usize, Prop)>,
layer_name: Option<String>,
layer_id: LayerId,
props: Vec<(String, usize, Prop)>,
) -> Result<(), StorageError>;

fn replay_add_edge_metadata(
Expand Down Expand Up @@ -220,6 +225,8 @@ pub trait GraphReplay {
node_id: VID,
node_type_and_id: Option<(String, usize)>,
props: Vec<(String, usize, Prop)>,
layer_name: Option<String>,
layer_id: LayerId,
) -> Result<(), StorageError>;

fn replay_add_node_metadata(
Expand Down
5 changes: 4 additions & 1 deletion db4-storage/src/wal/no_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ impl WalOps for NoWal {
Ok(())
}

fn replay(&self, _start: LSN) -> impl Iterator<Item = Result<ReplayRecord, StorageError>> {
fn replay(
&self,
_start: Option<LSN>,
) -> impl Iterator<Item = Result<ReplayRecord, StorageError>> {
let error = "Recovery is not supported for NoWAL";
std::iter::once(Err(StorageError::GenericFailure(error.to_string())))
}
Expand Down
2 changes: 2 additions & 0 deletions python/python/raphtory/graphql/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ class RemoteGraph(object):
id: str | int,
properties: Optional[dict] = None,
node_type: Optional[str] = None,
layer: Optional[str] = None,
) -> RemoteNode:
"""
Adds a new node with the given id and properties to the remote graph.
Expand All @@ -426,6 +427,7 @@ class RemoteGraph(object):
id (str | int): The id of the node.
properties (dict, optional): The properties of the node.
node_type (str, optional): The optional string which will be used as a node type
layer (str, optional): The optional layer where the node update should be written

Returns:
RemoteNode: the new remote node
Expand Down
2 changes: 0 additions & 2 deletions raphtory-graphql/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//! Pure Rust GraphQL client for Raphtory GraphQL server.
mod error;
pub mod raphtory_client;
pub mod remote_edge;
Expand Down
2 changes: 1 addition & 1 deletion raphtory-graphql/src/client/raphtory_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde_json::{json, Value as JsonValue};
use std::{collections::HashMap, io::Cursor};
use url::Url;

/// Pure Rust client for Raphtory GraphQL operations.
/// Client for interacting with a Raphtory GraphQL server.
#[derive(Clone, Debug)]
pub struct RaphtoryGraphQLClient {
pub(crate) url: Url,
Expand Down
4 changes: 1 addition & 3 deletions raphtory-graphql/src/client/remote_edge.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//! Pure Rust remote edge client for GraphQL updateGraph.edge(...) operations.
use crate::client::{
build_property_string, raphtory_client::RaphtoryGraphQLClient, remote_graph::build_query,
ClientError,
Expand All @@ -10,7 +8,7 @@ use raphtory_api::core::{
};
use std::collections::HashMap;

/// Pure Rust remote edge wrapper around `RaphtoryGraphQLClient`.
/// A handle to a remote edge on the server.
#[derive(Clone)]
pub struct GraphQLRemoteEdge {
pub path: String,
Expand Down
Loading
Loading