Skip to content

Commit bfb2d3c

Browse files
committed
feat: add replay mode
Allows replaying transactions when starting from a snapshot. Very much a beta feature still.
1 parent f252905 commit bfb2d3c

File tree

8 files changed

+360
-11
lines changed

8 files changed

+360
-11
lines changed

runner/benchmark/definition.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,18 @@ type DatadirConfig struct {
9696
Validator *string `yaml:"validator"`
9797
}
9898

99+
// ReplayConfig specifies configuration for replaying transactions from an
100+
// external node instead of generating synthetic transactions.
101+
type ReplayConfig struct {
102+
// SourceRPCURL is the RPC endpoint of the node to fetch transactions from
103+
SourceRPCURL string `yaml:"source_rpc_url"`
104+
105+
// StartBlock is the first block to replay transactions from.
106+
// If not specified (0), it will be automatically detected from the
107+
// snapshot's head block + 1.
108+
StartBlock uint64 `yaml:"start_block,omitempty"`
109+
}
110+
99111
// TestDefinition is the user-facing YAML configuration for specifying a
100112
// matrix of benchmark runs.
101113
type TestDefinition struct {
@@ -105,6 +117,7 @@ type TestDefinition struct {
105117
Tags *map[string]string `yaml:"tags"`
106118
Variables []Param `yaml:"variables"`
107119
ProofProgram *ProofProgramOptions `yaml:"proof_program"`
120+
Replay *ReplayConfig `yaml:"replay"`
108121
}
109122

110123
func (bc *TestDefinition) Check() error {

runner/benchmark/matrix.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type TestPlan struct {
1717
Snapshot *SnapshotDefinition
1818
ProofProgram *ProofProgramOptions
1919
Thresholds *ThresholdConfig
20+
Replay *ReplayConfig
2021
}
2122

2223
func NewTestPlanFromConfig(c TestDefinition, testFileName string, config *BenchmarkConfig) (*TestPlan, error) {
@@ -42,6 +43,7 @@ func NewTestPlanFromConfig(c TestDefinition, testFileName string, config *Benchm
4243
Snapshot: c.Snapshot,
4344
ProofProgram: proofProgram,
4445
Thresholds: c.Metrics,
46+
Replay: c.Replay,
4547
}, nil
4648
}
4749

runner/network/consensus/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ type ConsensusClientOptions struct {
2121
GasLimit uint64
2222
// GasLimitSetup is the gas limit for the setup payload
2323
GasLimitSetup uint64
24+
// AllowTxFailures allows transactions to fail without stopping the benchmark.
25+
// When true, failed transactions are logged as warnings instead of errors.
26+
// Useful for replay mode where some transactions may fail due to state differences.
27+
AllowTxFailures bool
2428
}
2529

2630
// BaseConsensusClient contains common functionality shared between different consensus client implementations.

runner/network/consensus/validator_consensus.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,18 @@ func (f *SyncingConsensusClient) propose(ctx context.Context, payload *engine.Ex
6464
}
6565

6666
// Start starts the fake consensus client.
67-
func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64) error {
67+
func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, lastSetupBlock uint64) error {
6868
f.log.Info("Starting sync benchmark", "num_payloads", len(payloads))
6969
m := metrics.NewBlockMetrics()
7070
for i := 0; i < len(payloads); i++ {
71-
m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock))))
71+
m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(lastSetupBlock))))
7272
f.log.Info("Proposing payload", "payload_index", i)
7373
err := f.propose(ctx, &payloads[i], m)
7474
if err != nil {
7575
return err
7676
}
7777

78-
if payloads[i].Number >= firstTestBlock {
78+
if payloads[i].Number > lastSetupBlock {
7979
err = metricsCollector.Collect(ctx, m)
8080
if err != nil {
8181
f.log.Error("Failed to collect metrics", "error", err)
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package mempool
2+
3+
import (
4+
"context"
5+
"math/big"
6+
"sync"
7+
8+
"github.com/ethereum/go-ethereum/common"
9+
"github.com/ethereum/go-ethereum/core/types"
10+
"github.com/ethereum/go-ethereum/ethclient"
11+
"github.com/ethereum/go-ethereum/log"
12+
)
13+
14+
// ReplayMempool fetches transactions from an external node and replays them.
15+
// It iterates through blocks from a source node and provides transactions
16+
// block-by-block for the benchmark to replay.
17+
type ReplayMempool struct {
18+
log log.Logger
19+
client *ethclient.Client
20+
21+
lock sync.Mutex
22+
23+
// startBlock is the first block to fetch transactions from
24+
startBlock uint64
25+
26+
// currentBlock tracks which block we're fetching next
27+
currentBlock uint64
28+
29+
// chainID for transaction signing validation
30+
chainID *big.Int
31+
32+
// addressNonce tracks the latest nonce for each address
33+
addressNonce map[common.Address]uint64
34+
}
35+
36+
// NewReplayMempool creates a new ReplayMempool that fetches transactions
37+
// from the given RPC endpoint starting from the specified block.
38+
func NewReplayMempool(log log.Logger, rpcURL string, startBlock uint64, chainID *big.Int) (*ReplayMempool, error) {
39+
client, err := ethclient.Dial(rpcURL)
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
return &ReplayMempool{
45+
log: log,
46+
client: client,
47+
startBlock: startBlock,
48+
currentBlock: startBlock,
49+
chainID: chainID,
50+
addressNonce: make(map[common.Address]uint64),
51+
}, nil
52+
}
53+
54+
// AddTransactions is a no-op for ReplayMempool since transactions come from the source node.
55+
func (m *ReplayMempool) AddTransactions(_ []*types.Transaction) {
56+
// No-op: transactions are fetched from the source node, not added manually
57+
}
58+
59+
// NextBlock fetches the next block from the source node and returns its transactions.
60+
// Returns (mempoolTxs, sequencerTxs) where:
61+
// - mempoolTxs: regular transactions to be sent via eth_sendRawTransaction
62+
// - sequencerTxs: deposit transactions to be included in payload attributes
63+
func (m *ReplayMempool) NextBlock() ([][]byte, [][]byte) {
64+
m.lock.Lock()
65+
defer m.lock.Unlock()
66+
67+
ctx := context.Background()
68+
69+
block, err := m.client.BlockByNumber(ctx, big.NewInt(int64(m.currentBlock)))
70+
if err != nil {
71+
m.log.Warn("Failed to fetch block", "block", m.currentBlock, "error", err)
72+
return nil, nil
73+
}
74+
75+
m.log.Info("Fetched block for replay",
76+
"block", m.currentBlock,
77+
"txs", len(block.Transactions()),
78+
"gas_used", block.GasUsed(),
79+
)
80+
81+
m.currentBlock++
82+
83+
mempoolTxs := make([][]byte, 0)
84+
sequencerTxs := make([][]byte, 0)
85+
86+
for _, tx := range block.Transactions() {
87+
// Track nonces for GetTransactionCount
88+
from, err := types.Sender(types.NewIsthmusSigner(m.chainID), tx)
89+
if err != nil {
90+
// Try with London signer for older transactions
91+
from, err = types.Sender(types.NewLondonSigner(m.chainID), tx)
92+
if err != nil {
93+
m.log.Warn("Failed to get sender", "tx", tx.Hash(), "error", err)
94+
continue
95+
}
96+
}
97+
m.addressNonce[from] = tx.Nonce()
98+
99+
txBytes, err := tx.MarshalBinary()
100+
if err != nil {
101+
m.log.Warn("Failed to marshal transaction", "tx", tx.Hash(), "error", err)
102+
continue
103+
}
104+
105+
// Deposit transactions go to sequencer, others go to mempool
106+
if tx.Type() == types.DepositTxType {
107+
sequencerTxs = append(sequencerTxs, txBytes)
108+
} else {
109+
mempoolTxs = append(mempoolTxs, txBytes)
110+
}
111+
}
112+
113+
return mempoolTxs, sequencerTxs
114+
}
115+
116+
// GetTransactionCount returns the latest nonce for an address.
117+
func (m *ReplayMempool) GetTransactionCount(address common.Address) uint64 {
118+
m.lock.Lock()
119+
defer m.lock.Unlock()
120+
return m.addressNonce[address]
121+
}
122+
123+
// CurrentBlock returns the current block number being replayed.
124+
func (m *ReplayMempool) CurrentBlock() uint64 {
125+
m.lock.Lock()
126+
defer m.lock.Unlock()
127+
return m.currentBlock
128+
}
129+
130+
// Close closes the underlying RPC client connection.
131+
func (m *ReplayMempool) Close() {
132+
m.client.Close()
133+
}
134+
135+
var _ FakeMempool = &ReplayMempool{}
136+

runner/network/network_benchmark.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ type NetworkBenchmark struct {
4545

4646
transactionPayload payload.Definition
4747
ports portmanager.PortManager
48+
replayConfig *benchmark.ReplayConfig
4849
}
4950

5051
// NewNetworkBenchmark creates a new network benchmark and initializes the payload worker and consensus client
51-
func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequencerOptions *config.InternalClientOptions, validatorOptions *config.InternalClientOptions, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, ports portmanager.PortManager) (*NetworkBenchmark, error) {
52+
func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequencerOptions *config.InternalClientOptions, validatorOptions *config.InternalClientOptions, proofConfig *benchmark.ProofProgramOptions, transactionPayload payload.Definition, ports portmanager.PortManager, replayConfig *benchmark.ReplayConfig) (*NetworkBenchmark, error) {
5253
return &NetworkBenchmark{
5354
log: log,
5455
sequencerOptions: sequencerOptions,
@@ -57,6 +58,7 @@ func NewNetworkBenchmark(config *benchtypes.TestConfig, log log.Logger, sequence
5758
proofConfig: proofConfig,
5859
transactionPayload: transactionPayload,
5960
ports: ports,
61+
replayConfig: replayConfig,
6062
}, nil
6163
}
6264

@@ -107,15 +109,35 @@ func (nb *NetworkBenchmark) benchmarkSequencer(ctx context.Context, l1Chain *l1C
107109
}
108110
}()
109111

110-
benchmark := newSequencerBenchmark(nb.log, *nb.testConfig, sequencerClient, l1Chain, nb.transactionPayload)
111-
executionData, lastBlock, err := benchmark.Run(ctx, metricsCollector)
112+
var executionData []engine.ExecutableData
113+
var lastBlock uint64
114+
115+
// Use replay benchmark if replay config is provided
116+
if nb.replayConfig != nil {
117+
nb.log.Info("Using replay sequencer benchmark",
118+
"source_rpc", nb.replayConfig.SourceRPCURL,
119+
"start_block", nb.replayConfig.StartBlock,
120+
)
121+
replayBenchmark := NewReplaySequencerBenchmark(
122+
nb.log,
123+
*nb.testConfig,
124+
sequencerClient,
125+
l1Chain,
126+
nb.replayConfig.SourceRPCURL,
127+
nb.replayConfig.StartBlock,
128+
)
129+
executionData, lastBlock, err = replayBenchmark.Run(ctx, metricsCollector)
130+
} else {
131+
benchmark := newSequencerBenchmark(nb.log, *nb.testConfig, sequencerClient, l1Chain, nb.transactionPayload)
132+
executionData, lastBlock, err = benchmark.Run(ctx, metricsCollector)
133+
}
112134

113135
if err != nil {
114136
sequencerClient.Stop()
115-
return nil, 0, nil, fmt.Errorf("failed to run sequencer benchmark: %w", err)
137+
return nil, 0, nil, err
116138
}
117139

118-
return executionData, lastBlock, sequencerClient, nil
140+
return executionData, lastBlock, sequencerClient, err
119141
}
120142

121143
func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloads []engine.ExecutableData, lastSetupBlock uint64, l1Chain *l1Chain, sequencerClient types.ExecutionClient) error {

0 commit comments

Comments
 (0)