Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 57 additions & 57 deletions src/bootstrap/replication/service.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::bootstrap::replication::error::BootstrapError;
use crate::bootstrap::replication::peer_discovery::PeerDiscoverer;
// use crate::bootstrap::replication::peer_discovery::PeerDiscoverer;
use crate::bootstrap::replication::rpc_client::RpcClientsManager;
use crate::cfg::Config;
use crate::core::validations;
use crate::core::validations::message::validate_message_hash;
use crate::network::gossip;
// use crate::network::gossip;
use crate::proto::shard_trie_entry_with_message::TrieMessage;
use crate::proto::{self, ReplicationTriePartStatus, ShardSnapshotMetadata};
use crate::storage::store::block_engine::BlockEngine;
Expand Down Expand Up @@ -40,7 +40,7 @@ use std::{
};
use std::{fs, io};
use tokio::signal::ctrl_c;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinSet;
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -150,7 +150,7 @@ pub struct ReplicatorBootstrap {
shutdown: Arc<AtomicBool>,
fc_network: crate::proto::FarcasterNetwork,
statsd_client: StatsdClientWrapper,
gossip_config: gossip::Config,
// gossip_config: gossip::Config,
data_shard_ids: Vec<u32>,
shard0_metadata: Arc<Mutex<ShardSnapshotMetadata>>,
rocksdb_dir: String,
Expand All @@ -165,7 +165,7 @@ impl ReplicatorBootstrap {
shutdown: Arc::new(AtomicBool::new(false)),
fc_network: app_config.fc_network,
statsd_client,
gossip_config: app_config.gossip.clone(),
// gossip_config: app_config.gossip.clone(),
data_shard_ids: app_config.consensus.shard_ids.clone(),
shard0_metadata: Arc::new(Mutex::new(ShardSnapshotMetadata::default())),
rocksdb_dir: app_config.rocksdb_dir.clone(),
Expand All @@ -186,7 +186,7 @@ impl ReplicatorBootstrap {
shutdown: Arc::new(AtomicBool::new(false)),
fc_network: app_config.fc_network,
statsd_client,
gossip_config: app_config.gossip.clone(),
// gossip_config: app_config.gossip.clone(),
data_shard_ids: app_config.consensus.shard_ids.clone(),
shard0_metadata: Arc::new(Mutex::new(ShardSnapshotMetadata::default())),
rocksdb_dir: app_config.rocksdb_dir.clone(),
Expand Down Expand Up @@ -269,39 +269,39 @@ impl ReplicatorBootstrap {
}

// Determine target height from first manager (all should agree) for discovery validation
let first_manager = rpc_client_managers
.values()
.next()
.ok_or_else(|| BootstrapError::GenericError("No RPC managers initialized".into()))?;
let target_height = first_manager.get_metadata().height;

let (mut disc_shutdown_tx_opt, mut discovery_thread_handle_opt) = {
let (disc_shutdown_tx, disc_shutdown_rx) = oneshot::channel();

// Clone/move all inputs needed inside the new thread. We construct PeerDiscoverer
// inside the thread so we don't have to move a !Send swarm across threads.
let gossip_config = self.gossip_config.clone();
let rpc_mgr = first_manager.clone();
let statsd = self.statsd_client.clone();
let network = self.fc_network;
let handle = tokio::spawn(async move {
match PeerDiscoverer::new(
&gossip_config,
rpc_mgr,
target_height,
network,
statsd,
disc_shutdown_rx,
)
.await
{
Ok(peer_discoverer) => peer_discoverer.run().await,
Err(e) => warn!("Peer discovery disabled (failed to start): {}", e),
}
});

(Some(disc_shutdown_tx), Some(handle))
};
// let first_manager = rpc_client_managers
// .values()
// .next()
// .ok_or_else(|| BootstrapError::GenericError("No RPC managers initialized".into()))?;
// let target_height = first_manager.get_metadata().height;

// let (mut disc_shutdown_tx_opt, mut discovery_thread_handle_opt) = {
// let (disc_shutdown_tx, disc_shutdown_rx) = oneshot::channel();

// // Clone/move all inputs needed inside the new thread. We construct PeerDiscoverer
// // inside the thread so we don't have to move a !Send swarm across threads.
// let gossip_config = self.gossip_config.clone();
// let rpc_mgr = first_manager.clone();
// let statsd = self.statsd_client.clone();
// let network = self.fc_network;
// let handle = tokio::spawn(async move {
// match PeerDiscoverer::new(
// &gossip_config,
// rpc_mgr,
// target_height,
// network,
// statsd,
// disc_shutdown_rx,
// )
// .await
// {
// Ok(peer_discoverer) => peer_discoverer.run().await,
// Err(e) => warn!("Peer discovery disabled (failed to start): {}", e),
// }
// });

// (Some(disc_shutdown_tx), Some(handle))
// };

// Create tasks for each shard to run in parallel
let mut shard_tasks = JoinSet::new();
Expand Down Expand Up @@ -385,26 +385,26 @@ impl ReplicatorBootstrap {
info!("All shard tasks have been shut down.");
};

let shutdown_gossip_discovery_fn =
async |disc_shutdown_tx: &mut Option<oneshot::Sender<()>>,
discovery_thread_handle: &mut Option<tokio::task::JoinHandle<()>>| {
if let (Some(tx), Some(handle)) =
(disc_shutdown_tx.take(), discovery_thread_handle.take())
{
let _ = tx.send(());
let _ = handle.await;
}
};
// let shutdown_gossip_discovery_fn =
// async |disc_shutdown_tx: &mut Option<oneshot::Sender<()>>,
// discovery_thread_handle: &mut Option<tokio::task::JoinHandle<()>>| {
// if let (Some(tx), Some(handle)) =
// (disc_shutdown_tx.take(), discovery_thread_handle.take())
// {
// let _ = tx.send(());
// let _ = handle.await;
// }
// };

loop {
if shard_tasks.is_empty() {
// All tasks completed successfully
// Shut down discovery task
shutdown_gossip_discovery_fn(
&mut disc_shutdown_tx_opt,
&mut discovery_thread_handle_opt,
)
.await;
// shutdown_gossip_discovery_fn(
// &mut disc_shutdown_tx_opt,
// &mut discovery_thread_handle_opt,
// )
// .await;

// Write the final metadata from the server into the new DB.
self.write_final_metadata_to_db(
Expand Down Expand Up @@ -444,13 +444,13 @@ impl ReplicatorBootstrap {
error!("Shard task failed: {}", e);
// Shutdown all remaining tasks as well
shutdown_and_drain_tasks_fn(&mut shard_tasks).await;
shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await;
// shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await;

return Err(e);
}
Err(e) => {
error!("Shard task join error: {}", e);
shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await;
// shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await;
return Err(BootstrapError::TransactionReplayError(format!(
"Shard task join error: {}",
e
Expand All @@ -461,7 +461,7 @@ impl ReplicatorBootstrap {
_ = ctrl_c() => {
info!("Shutdown signal received, stopping all shard replication tasks");
shutdown_and_drain_tasks_fn(&mut shard_tasks).await;
shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await;
// shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await;

return Ok(WorkUnitResponse::Stopped);
}
Expand Down
Loading