Skip to content

Commit f396937

Browse files
authored
feat: shutdown refactor (#472)
* shutdown refactor * clean up * use unused ports of anvil and run tests concurrently
1 parent 01509c1 commit f396937

File tree

12 files changed

+154
-259
lines changed

12 files changed

+154
-259
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ test-l1-sync:
7878
--locked \
7979
--all-features \
8080
--no-fail-fast \
81-
--test-threads=1 \
8281
--failure-output immediate \
8382
-E 'binary(l1_sync)'
8483

crates/chain-orchestrator/src/error.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,6 @@ pub enum ChainOrchestratorError {
9292
/// An error occurred while handling rollup node primitives.
9393
#[error("An error occurred while handling rollup node primitives: {0}")]
9494
RollupNodePrimitiveError(rollup_node_primitives::RollupNodePrimitiveError),
95-
/// Shutdown requested - this is not a real error but used to signal graceful shutdown.
96-
#[cfg(feature = "test-utils")]
97-
#[error("Shutdown requested")]
98-
Shutdown,
9995
}
10096

10197
impl CanRetry for ChainOrchestratorError {

crates/chain-orchestrator/src/event.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,6 @@ pub enum ChainOrchestratorEvent {
119119
},
120120
/// The head of the fork choice state has been updated in the engine driver.
121121
FcsHeadUpdated(BlockInfo),
122+
/// The chain orchestrator is shutting down.
123+
Shutdown,
122124
}

crates/chain-orchestrator/src/handle/command.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
3535
/// Returns a database handle for direct database access.
3636
#[cfg(feature = "test-utils")]
3737
DatabaseHandle(oneshot::Sender<std::sync::Arc<scroll_db::Database>>),
38-
/// Request the `ChainOrchestrator` to shutdown immediately.
39-
#[cfg(feature = "test-utils")]
40-
Shutdown(oneshot::Sender<()>),
4138
}
4239

4340
/// The database queries that can be sent to the rollup manager.

crates/chain-orchestrator/src/handle/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,4 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
149149
self.send_command(ChainOrchestratorCommand::DatabaseHandle(tx));
150150
rx.await
151151
}
152-
153-
/// Sends a command to shutdown the `ChainOrchestrator` immediately.
154-
/// This will cause the `ChainOrchestrator`'s event loop to exit gracefully.
155-
#[cfg(feature = "test-utils")]
156-
pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> {
157-
let (tx, rx) = oneshot::channel();
158-
self.send_command(ChainOrchestratorCommand::Shutdown(tx));
159-
rx.await
160-
}
161152
}

crates/chain-orchestrator/src/lib.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -189,19 +189,12 @@ impl<
189189
biased;
190190

191191
_guard = &mut shutdown => {
192+
self.notify(ChainOrchestratorEvent::Shutdown);
192193
break;
193194
}
194195
Some(command) = self.handle_rx.recv() => {
195-
match self.handle_command(command).await {
196-
#[cfg(feature = "test-utils")]
197-
Err(ChainOrchestratorError::Shutdown) => {
198-
tracing::info!(target: "scroll::chain_orchestrator", "Shutdown requested, exiting gracefully");
199-
break;
200-
}
201-
Err(err) => {
202-
tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command");
203-
}
204-
Ok(_) => {}
196+
if let Err(err) = self.handle_command(command).await {
197+
tracing::error!(target: "scroll::chain_orchestrator", ?err, "Error handling command");
205198
}
206199
}
207200
Some(event) = async {
@@ -442,13 +435,6 @@ impl<
442435
ChainOrchestratorCommand::DatabaseHandle(tx) => {
443436
let _ = tx.send(self.database.clone());
444437
}
445-
#[cfg(feature = "test-utils")]
446-
ChainOrchestratorCommand::Shutdown(tx) => {
447-
tracing::info!(target: "scroll::chain_orchestrator", "Received shutdown command, exiting event loop");
448-
let _ = tx.send(());
449-
// Return an error to signal shutdown
450-
return Err(ChainOrchestratorError::Shutdown);
451-
}
452438
}
453439

454440
Ok(())

crates/node/src/test_utils/fixture.rs

Lines changed: 67 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,25 @@ use alloy_rpc_types_anvil::ReorgOptions;
1818
use alloy_rpc_types_eth::Block;
1919
use alloy_signer_local::PrivateKeySigner;
2020
use alloy_transport::layers::RetryBackoffLayer;
21-
use reth_chainspec::EthChainSpec;
2221
use reth_e2e_test_utils::{wallet::Wallet, NodeHelperType, TmpDB};
2322
use reth_eth_wire_types::BasicNetworkPrimitives;
2423
use reth_fs_util::remove_dir_all;
2524
use reth_network::NetworkHandle;
2625
use reth_node_builder::NodeTypes;
26+
use reth_node_core::exit::NodeExitFuture;
2727
use reth_node_types::NodeTypesWithDBAdapter;
2828
use reth_provider::providers::BlockchainProvider;
2929
use reth_scroll_chainspec::SCROLL_DEV;
3030
use reth_scroll_primitives::ScrollPrimitives;
3131
use reth_tasks::TaskManager;
3232
use reth_tokio_util::EventStream;
3333
use rollup_node_chain_orchestrator::{ChainOrchestratorEvent, ChainOrchestratorHandle};
34-
use rollup_node_primitives::BlockInfo;
3534
use rollup_node_sequencer::L1MessageInclusionMode;
3635
use scroll_alloy_consensus::ScrollPooledTransaction;
37-
use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi};
3836
use scroll_alloy_rpc_types::Transaction;
39-
use scroll_engine::{Engine, ForkchoiceState};
4037
use std::{
4138
fmt::{Debug, Formatter},
39+
ops::{Deref, DerefMut},
4240
path::PathBuf,
4341
sync::Arc,
4442
};
@@ -58,8 +56,6 @@ pub struct TestFixture {
5856
pub chain_spec: Arc<<ScrollRollupNode as NodeTypes>::ChainSpec>,
5957
/// Optional Anvil instance for L1 simulation.
6058
pub anvil: Option<anvil::NodeHandle>,
61-
/// The task manager. Held in order to avoid dropping the node.
62-
pub tasks: TaskManager,
6359
/// The configuration for the nodes.
6460
pub config: ScrollRollupNodeConfig,
6561
}
@@ -99,6 +95,9 @@ pub type ScrollNetworkHandle =
9995
pub type TestBlockChainProvider =
10096
BlockchainProvider<NodeTypesWithDBAdapter<ScrollRollupNode, TmpDB>>;
10197

98+
/// The test node type for Scroll nodes.
99+
pub type ScrollTestNode = NodeHelperType<ScrollRollupNode, TestBlockChainProvider>;
100+
102101
/// The node type (sequencer or follower).
103102
#[derive(Debug)]
104103
pub enum NodeType {
@@ -108,12 +107,51 @@ pub enum NodeType {
108107
Follower,
109108
}
110109

110+
/// Components of a test node.
111+
pub struct ScrollNodeTestComponents {
112+
/// The node helper type for the test node.
113+
pub node: ScrollTestNode,
114+
/// The task manager for the test node.
115+
pub task_manager: TaskManager,
116+
/// The exit future for the test node.
117+
pub exit_future: NodeExitFuture,
118+
}
119+
120+
impl ScrollNodeTestComponents {
121+
/// Create new test node components.
122+
pub async fn new(
123+
node: ScrollTestNode,
124+
task_manager: TaskManager,
125+
exit_future: NodeExitFuture,
126+
) -> Self {
127+
Self { node, task_manager, exit_future }
128+
}
129+
}
130+
131+
impl std::fmt::Debug for ScrollNodeTestComponents {
132+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133+
f.debug_struct("ScrollNodeTestComponents").finish()
134+
}
135+
}
136+
137+
impl DerefMut for ScrollNodeTestComponents {
138+
fn deref_mut(&mut self) -> &mut Self::Target {
139+
&mut self.node
140+
}
141+
}
142+
143+
impl Deref for ScrollNodeTestComponents {
144+
type Target = ScrollTestNode;
145+
146+
fn deref(&self) -> &Self::Target {
147+
&self.node
148+
}
149+
}
150+
111151
/// Handle to a single test node with its components.
112152
pub struct NodeHandle {
113153
/// The underlying node context.
114-
pub node: NodeHelperType<ScrollRollupNode, TestBlockChainProvider>,
115-
/// Engine instance for this node.
116-
pub engine: Engine<Arc<dyn ScrollEngineApi + Send + Sync + 'static>>,
154+
pub node: ScrollNodeTestComponents,
117155
/// Chain orchestrator listener.
118156
pub chain_orchestrator_rx: EventStream<ChainOrchestratorEvent>,
119157
/// Chain orchestrator handle.
@@ -123,6 +161,15 @@ pub struct NodeHandle {
123161
}
124162

125163
impl NodeHandle {
164+
/// Create a new node handle.
165+
pub async fn new(node: ScrollNodeTestComponents, typ: NodeType) -> eyre::Result<Self> {
166+
let rollup_manager_handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
167+
let chain_orchestrator_rx =
168+
node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?;
169+
170+
Ok(Self { node, chain_orchestrator_rx, rollup_manager_handle, typ })
171+
}
172+
126173
/// Returns true if this is a handle to the sequencer.
127174
pub const fn is_sequencer(&self) -> bool {
128175
matches!(self.typ, NodeType::Sequencer)
@@ -598,7 +645,6 @@ impl TestFixtureBuilder {
598645
// Start Anvil if requested
599646
let anvil = if self.anvil_config.enabled {
600647
let handle = Self::spawn_anvil(
601-
self.anvil_config.port,
602648
self.anvil_config.state_path.as_deref(),
603649
self.anvil_config.chain_id,
604650
self.anvil_config.block_time,
@@ -623,9 +669,7 @@ impl TestFixtureBuilder {
623669
None
624670
};
625671

626-
let tasks = TaskManager::current();
627-
let (nodes, dbs, wallet) = setup_engine(
628-
&tasks,
672+
let (node_components, dbs, wallet) = setup_engine(
629673
self.config.clone(),
630674
self.num_nodes,
631675
chain_spec.clone(),
@@ -635,59 +679,39 @@ impl TestFixtureBuilder {
635679
)
636680
.await?;
637681

638-
let mut node_handles = Vec::with_capacity(nodes.len());
639-
for (index, node) in nodes.into_iter().enumerate() {
640-
let genesis_hash = node.inner.chain_spec().genesis_hash();
641-
642-
// Create engine for the node
643-
let auth_client = node.inner.engine_http_client();
644-
let engine_client = Arc::new(ScrollAuthApiEngineClient::new(auth_client))
645-
as Arc<dyn ScrollEngineApi + Send + Sync + 'static>;
646-
let fcs = ForkchoiceState::new(
647-
BlockInfo { hash: genesis_hash, number: 0 },
648-
Default::default(),
649-
Default::default(),
650-
);
651-
let engine = Engine::new(Arc::new(engine_client), fcs);
652-
653-
// Get handles if available
654-
let rollup_manager_handle = node.inner.add_ons_handle.rollup_manager_handle.clone();
655-
let chain_orchestrator_rx =
656-
node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?;
657-
658-
node_handles.push(Some(NodeHandle {
682+
let mut nodes = Vec::with_capacity(node_components.len());
683+
for (index, node) in node_components.into_iter().enumerate() {
684+
let handle = NodeHandle::new(
659685
node,
660-
engine,
661-
chain_orchestrator_rx,
662-
rollup_manager_handle,
663-
typ: if self.config.sequencer_args.sequencer_enabled && index == 0 {
686+
if self.config.sequencer_args.sequencer_enabled && index == 0 {
664687
NodeType::Sequencer
665688
} else {
666689
NodeType::Follower
667690
},
668-
}));
691+
)
692+
.await?;
693+
694+
nodes.push(Some(handle));
669695
}
670696

671697
Ok(TestFixture {
672-
nodes: node_handles,
698+
nodes,
673699
dbs,
674700
wallet: Arc::new(Mutex::new(wallet)),
675701
chain_spec,
676-
tasks,
677702
anvil,
678703
config: self.config,
679704
})
680705
}
681706

682707
/// Spawn an Anvil instance with the given configuration.
683708
async fn spawn_anvil(
684-
port: u16,
685709
state_path: Option<&std::path::Path>,
686710
chain_id: Option<u64>,
687711
block_time: Option<u64>,
688712
slots_in_an_epoch: u64,
689713
) -> eyre::Result<anvil::NodeHandle> {
690-
let mut config = anvil::NodeConfig { port, ..Default::default() };
714+
let mut config = anvil::NodeConfig { port: 0, ..Default::default() };
691715

692716
if let Some(id) = chain_id {
693717
config.chain_id = Some(id);

0 commit comments

Comments
 (0)