Skip to content

Commit 809f2f0

Browse files
committed
netsync: Simplify request maps.
This simplifies the request tracking for blocks, transactions, and mix messages by making use of a single global map for each type that tracks the peer instead of a global map plus an additional per-peer map for each type. Note that this change does make it very slightly more costly in terms of CPU usage to determine the in-flight requests for a specific peer, but, in practice, the new approach ends up being slightly faster overall despite that due to the combination of less map management performed under a mutex as well as the infrequent need to determine the in-flight requests for a specific peer. It also slightly reduces memory usage due to the removal of 3 maps per connected peer which totals to between 24 and 375 maps with typical default connection settings.
1 parent fea3cc0 commit 809f2f0

File tree

1 file changed

+83
-68
lines changed

1 file changed

+83
-68
lines changed

internal/netsync/manager.go

Lines changed: 83 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,6 @@ type Peer struct {
123123
// requested from the peer once.
124124
requestInitialStateOnce sync.Once
125125

126-
// These fields track pending requests for data from the peer. They are
127-
// owned by the sync manager and protected by its request mutex.
128-
requestedTxns map[chainhash.Hash]struct{}
129-
requestedBlocks map[chainhash.Hash]struct{}
130-
requestedMixMsgs map[chainhash.Hash]struct{}
131-
132126
// numConsecutiveOrphanHeaders tracks the number of consecutive header
133127
// messages sent by the peer that contain headers which do not connect. It
134128
// is used to detect peers that have either diverged so far they are no
@@ -164,11 +158,8 @@ type Peer struct {
164158
func NewPeer(peer *peerpkg.Peer) *Peer {
165159
servesData := peer.Services()&wire.SFNodeNetwork == wire.SFNodeNetwork
166160
return &Peer{
167-
Peer: peer,
168-
servesData: servesData,
169-
requestedTxns: make(map[chainhash.Hash]struct{}),
170-
requestedBlocks: make(map[chainhash.Hash]struct{}),
171-
requestedMixMsgs: make(map[chainhash.Hash]struct{}),
161+
Peer: peer,
162+
servesData: servesData,
172163
}
173164
}
174165

@@ -325,9 +316,9 @@ type SyncManager struct {
325316
// These fields track pending requests for data from all peers. They are
326317
// protected by the request mutex.
327318
requestMtx sync.Mutex
328-
requestedTxns map[chainhash.Hash]struct{}
319+
requestedTxns map[chainhash.Hash]*Peer
329320
requestedBlocks map[chainhash.Hash]*Peer
330-
requestedMixMsgs map[chainhash.Hash]struct{}
321+
requestedMixMsgs map[chainhash.Hash]*Peer
331322

332323
// The following fields are used to track the current sync peer. The peer
333324
// is protected by the associated mutex.
@@ -467,15 +458,48 @@ func (m *SyncManager) isRequestedBlock(hash *chainhash.Hash) bool {
467458
return ok
468459
}
469460

461+
// isRequestedBlockFromPeer returns whether or not the given block hash has been
462+
// requested from the given remote peer.
463+
//
464+
// This function MUST be called with the request mutex held (reads).
465+
func (m *SyncManager) isRequestedBlockFromPeer(peer *Peer, hash *chainhash.Hash) bool {
466+
requestedFrom, ok := m.requestedBlocks[*hash]
467+
return ok && requestedFrom == peer
468+
}
469+
470+
// isRequestedTxFromPeer returns whether or not the given transaction hash has
471+
// been requested from the given remote peer.
472+
//
473+
// This function MUST be called with the request mutex held (reads).
474+
func (m *SyncManager) isRequestedTxFromPeer(peer *Peer, hash *chainhash.Hash) bool {
475+
requestedFrom, ok := m.requestedTxns[*hash]
476+
return ok && requestedFrom == peer
477+
}
478+
479+
// isRequestedMixMsgFromPeer returns whether or not the given mix message hash
480+
// has been requested from the given remote peer.
481+
//
482+
// This function MUST be called with the request mutex held (reads).
483+
func (m *SyncManager) isRequestedMixMsgFromPeer(peer *Peer, hash *chainhash.Hash) bool {
484+
requestedFrom, ok := m.requestedMixMsgs[*hash]
485+
return ok && requestedFrom == peer
486+
}
487+
470488
// fetchNextBlocks creates and sends a request to the provided peer for the next
471489
// blocks to be downloaded based on the current headers.
472490
//
473491
// This function is safe for concurrent access.
474492
func (m *SyncManager) fetchNextBlocks(peer *Peer) {
475493
// Nothing to do if the target maximum number of blocks to request from the
476494
// peer at the same time are already in flight.
495+
var numInFlight int
477496
m.requestMtx.Lock()
478-
numInFlight := len(peer.requestedBlocks)
497+
for _, requestedFrom := range m.requestedBlocks {
498+
if requestedFrom != peer {
499+
continue
500+
}
501+
numInFlight++
502+
}
479503
m.requestMtx.Unlock()
480504
if numInFlight >= maxInFlightBlocks {
481505
return
@@ -515,7 +539,6 @@ func (m *SyncManager) fetchNextBlocks(peer *Peer) {
515539

516540
iv := wire.NewInvVect(wire.InvTypeBlock, hash)
517541
m.requestedBlocks[*hash] = peer
518-
peer.requestedBlocks[*hash] = struct{}{}
519542
gdmsg.AddInvVect(iv)
520543
}
521544
m.requestMtx.Unlock()
@@ -830,15 +853,18 @@ func (m *SyncManager) OnPeerDisconnected(peer *Peer) {
830853
inv.Type = wire.InvTypeTx
831854
m.requestMtx.Lock()
832855
TxHashes:
833-
for txHash := range peer.requestedTxns {
856+
for txHash, requestedFrom := range m.requestedTxns {
857+
if requestedFrom != peer {
858+
continue
859+
}
834860
inv.Hash = txHash
835861
m.peersMtx.Lock()
836862
for pp := range m.peers {
837863
if !pp.IsKnownInventory(&inv) {
838864
continue
839865
}
840866
requestQueues[pp] = append(requestQueues[pp], inv)
841-
pp.requestedTxns[txHash] = struct{}{}
867+
m.requestedTxns[txHash] = pp
842868
m.peersMtx.Unlock()
843869
continue TxHashes
844870
}
@@ -848,15 +874,18 @@ TxHashes:
848874
}
849875
inv.Type = wire.InvTypeBlock
850876
BlockHashes:
851-
for blockHash := range peer.requestedBlocks {
877+
for blockHash, requestedFrom := range m.requestedBlocks {
878+
if requestedFrom != peer {
879+
continue
880+
}
852881
inv.Hash = blockHash
853882
m.peersMtx.Lock()
854883
for pp := range m.peers {
855884
if !pp.IsKnownInventory(&inv) {
856885
continue
857886
}
858887
requestQueues[pp] = append(requestQueues[pp], inv)
859-
pp.requestedBlocks[blockHash] = struct{}{}
888+
m.requestedBlocks[blockHash] = pp
860889
m.peersMtx.Unlock()
861890
continue BlockHashes
862891
}
@@ -866,15 +895,18 @@ BlockHashes:
866895
}
867896
inv.Type = wire.InvTypeMix
868897
MixHashes:
869-
for mixHash := range peer.requestedMixMsgs {
898+
for mixHash, requestedFrom := range m.requestedMixMsgs {
899+
if requestedFrom != peer {
900+
continue
901+
}
870902
inv.Hash = mixHash
871903
m.peersMtx.Lock()
872904
for pp := range m.peers {
873905
if !pp.IsKnownInventory(&inv) {
874906
continue
875907
}
876908
requestQueues[pp] = append(requestQueues[pp], inv)
877-
pp.requestedMixMsgs[mixHash] = struct{}{}
909+
m.requestedMixMsgs[mixHash] = pp
878910
m.peersMtx.Unlock()
879911
continue MixHashes
880912
}
@@ -969,7 +1001,6 @@ func (m *SyncManager) OnTx(peer *Peer, tx *dcrutil.Tx) []*dcrutil.Tx {
9691001
// instances of trying to fetch it, or we failed to insert and thus
9701002
// we'll retry next time we get an inv.
9711003
m.requestMtx.Lock()
972-
delete(peer.requestedTxns, *txHash)
9731004
delete(m.requestedTxns, *txHash)
9741005
m.requestMtx.Unlock()
9751006

@@ -1034,7 +1065,6 @@ func (m *SyncManager) OnMixMsg(peer *Peer, msg mixing.Message) ([]mixing.Message
10341065
// to fetch it, or we failed to insert and thus we'll retry next time
10351066
// we get an inv.
10361067
m.requestMtx.Lock()
1037-
delete(peer.requestedMixMsgs, mixHash)
10381068
delete(m.requestedMixMsgs, mixHash)
10391069
m.requestMtx.Unlock()
10401070

@@ -1191,9 +1221,9 @@ func (m *SyncManager) OnBlock(peer *Peer, block *dcrutil.Block) {
11911221
// The remote peer is misbehaving when the block was not requested.
11921222
blockHash := block.Hash()
11931223
m.requestMtx.Lock()
1194-
_, exists := peer.requestedBlocks[*blockHash]
1224+
requested := m.isRequestedBlockFromPeer(peer, blockHash)
11951225
m.requestMtx.Unlock()
1196-
if !exists {
1226+
if !requested {
11971227
log.Warnf("Got unrequested block %v from %s -- disconnecting",
11981228
blockHash, peer)
11991229
peer.Disconnect()
@@ -1211,7 +1241,6 @@ func (m *SyncManager) OnBlock(peer *Peer, block *dcrutil.Block) {
12111241
// maps in order to help prevent duplicate requests.
12121242
forkLen, err := m.processBlock(block)
12131243
m.requestMtx.Lock()
1214-
delete(peer.requestedBlocks, *blockHash)
12151244
delete(m.requestedBlocks, *blockHash)
12161245
m.requestMtx.Unlock()
12171246
if err != nil {
@@ -1335,7 +1364,7 @@ func (m *SyncManager) OnBlock(peer *Peer, block *dcrutil.Block) {
13351364
m.syncPeerMtx.Unlock()
13361365
if isSyncPeer {
13371366
m.requestMtx.Lock()
1338-
numInFlight := len(peer.requestedBlocks)
1367+
numInFlight := len(m.requestedBlocks)
13391368
m.requestMtx.Unlock()
13401369
if numInFlight < minInFlightBlocks {
13411370
m.fetchNextBlocks(peer)
@@ -1717,7 +1746,6 @@ func (m *SyncManager) OnHeaders(peer *Peer, headersMsg *wire.MsgHeaders) {
17171746
}
17181747

17191748
m.requestedBlocks[*hash] = peer
1720-
peer.requestedBlocks[*hash] = struct{}{}
17211749
iv := wire.NewInvVect(wire.InvTypeBlock, hash)
17221750
gdmsg.AddInvVect(iv)
17231751
}
@@ -1760,21 +1788,18 @@ func (m *SyncManager) OnNotFound(peer *Peer, notFound *wire.MsgNotFound) {
17601788
m.requestMtx.Lock()
17611789
for _, inv := range notFound.InvList {
17621790
// Verify the hash was actually announced by the peer before deleting
1763-
// from the global requested maps.
1791+
// from the request maps.
17641792
switch inv.Type {
17651793
case wire.InvTypeBlock:
1766-
if _, exists := peer.requestedBlocks[inv.Hash]; exists {
1767-
delete(peer.requestedBlocks, inv.Hash)
1794+
if m.isRequestedBlockFromPeer(peer, &inv.Hash) {
17681795
delete(m.requestedBlocks, inv.Hash)
17691796
}
17701797
case wire.InvTypeTx:
1771-
if _, exists := peer.requestedTxns[inv.Hash]; exists {
1772-
delete(peer.requestedTxns, inv.Hash)
1798+
if m.isRequestedTxFromPeer(peer, &inv.Hash) {
17731799
delete(m.requestedTxns, inv.Hash)
17741800
}
17751801
case wire.InvTypeMix:
1776-
if _, exists := peer.requestedMixMsgs[inv.Hash]; exists {
1777-
delete(peer.requestedMixMsgs, inv.Hash)
1802+
if m.isRequestedMixMsgFromPeer(peer, &inv.Hash) {
17781803
delete(m.requestedMixMsgs, inv.Hash)
17791804
}
17801805
}
@@ -1885,8 +1910,7 @@ func (m *SyncManager) OnInv(peer *Peer, inv *wire.MsgInv) {
18851910
// Request the transaction if there is not one already pending.
18861911
m.requestMtx.Lock()
18871912
if _, exists := m.requestedTxns[iv.Hash]; !exists {
1888-
limitAdd(m.requestedTxns, iv.Hash, maxRequestedTxns)
1889-
limitAdd(peer.requestedTxns, iv.Hash, maxRequestedTxns)
1913+
limitAdd(m.requestedTxns, iv.Hash, peer, maxRequestedTxns)
18901914
requestQueue = append(requestQueue, iv)
18911915
}
18921916
m.requestMtx.Unlock()
@@ -1910,8 +1934,7 @@ func (m *SyncManager) OnInv(peer *Peer, inv *wire.MsgInv) {
19101934
// Request the mixing message if it is not already pending.
19111935
m.requestMtx.Lock()
19121936
if _, exists := m.requestedMixMsgs[iv.Hash]; !exists {
1913-
limitAdd(m.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs)
1914-
limitAdd(peer.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs)
1937+
limitAdd(m.requestedMixMsgs, iv.Hash, peer, maxRequestedMixMsgs)
19151938
requestQueue = append(requestQueue, iv)
19161939
}
19171940
m.requestMtx.Unlock()
@@ -1970,9 +1993,10 @@ func (m *SyncManager) OnInv(peer *Peer, inv *wire.MsgInv) {
19701993
// limitAdd is a helper function for maps that require a maximum limit by
19711994
// evicting a random value if adding the new value would cause it to
19721995
// overflow the maximum allowed.
1973-
func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) {
1974-
// Nothing to do if entry is already in the map.
1996+
func limitAdd(m map[chainhash.Hash]*Peer, hash chainhash.Hash, peer *Peer, limit int) {
1997+
// Replace existing entries.
19751998
if _, exists := m[hash]; exists {
1999+
m[hash] = peer
19762000
return
19772001
}
19782002
if len(m)+1 > limit {
@@ -1987,7 +2011,7 @@ func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) {
19872011
break
19882012
}
19892013
}
1990-
m[hash] = struct{}{}
2014+
m[hash] = peer
19912015
}
19922016

19932017
// stallHandler monitors the header sync process to detect stalls and disconnect
@@ -2066,32 +2090,28 @@ func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
20662090
err := gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, bh))
20672091
if err != nil {
20682092
return fmt.Errorf("unexpected error encountered building request "+
2069-
"for mining state block %v: %w", bh, err)
2093+
"for block %v: %w", bh, err)
20702094
}
20712095

2072-
peer.requestedBlocks[*bh] = struct{}{}
20732096
m.requestedBlocks[*bh] = peer
20742097
}
20752098

2076-
addTxsToRequest := func(txs []chainhash.Hash, txType stake.TxType) error {
2099+
addTxsToRequest := func(hashes []chainhash.Hash, txType stake.TxType) error {
20772100
// Return immediately if txs is nil.
2078-
if txs == nil {
2101+
if hashes == nil {
20792102
return nil
20802103
}
20812104

2082-
for i := range txs {
2083-
// If we've already requested this transaction, skip it.
2084-
tx := &txs[i]
2085-
_, alreadyReqP := peer.requestedTxns[*tx]
2086-
_, alreadyReqB := m.requestedTxns[*tx]
2087-
2088-
if alreadyReqP || alreadyReqB {
2105+
for i := range hashes {
2106+
// Skip the transaction when it has already been requested.
2107+
txHash := &hashes[i]
2108+
if _, ok := m.requestedTxns[*txHash]; ok {
20892109
continue
20902110
}
20912111

20922112
// Ask the transaction memory pool if the transaction is known
20932113
// to it in any form (main pool or orphan).
2094-
if m.cfg.TxMemPool.HaveTransaction(tx) {
2114+
if m.cfg.TxMemPool.HaveTransaction(txHash) {
20952115
continue
20962116
}
20972117

@@ -2101,7 +2121,7 @@ func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
21012121
// is to avoid requesting already known transactions.
21022122
//
21032123
// Check for a specific outpoint based on the tx type.
2104-
outpoint := wire.OutPoint{Hash: *tx}
2124+
outpoint := wire.OutPoint{Hash: *txHash}
21052125
switch txType {
21062126
case stake.TxTypeSSGen:
21072127
// The first two outputs of vote transactions are OP_RETURN <data>, and
@@ -2124,14 +2144,13 @@ func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
21242144
continue
21252145
}
21262146

2127-
err = gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeTx, tx))
2147+
err = gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeTx, txHash))
21282148
if err != nil {
21292149
return fmt.Errorf("unexpected error encountered building request "+
2130-
"for mining state vote %v: %w", tx, err)
2150+
"for tx %v: %w", txHash, err)
21312151
}
21322152

2133-
peer.requestedTxns[*tx] = struct{}{}
2134-
m.requestedTxns[*tx] = struct{}{}
2153+
m.requestedTxns[*txHash] = peer
21352154
}
21362155

21372156
return nil
@@ -2150,12 +2169,9 @@ func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
21502169
}
21512170

21522171
for i := range mixHashes {
2153-
// If we've already requested this mix message, skip it.
2172+
// Skip mix messages that have already been requested.
21542173
mh := &mixHashes[i]
2155-
_, alreadyReqP := peer.requestedMixMsgs[*mh]
2156-
_, alreadyReqB := m.requestedMixMsgs[*mh]
2157-
2158-
if alreadyReqP || alreadyReqB {
2174+
if _, ok := m.requestedMixMsgs[*mh]; ok {
21592175
continue
21602176
}
21612177

@@ -2170,8 +2186,7 @@ func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
21702186
"for inv vect mix hash %v: %w", mh, err)
21712187
}
21722188

2173-
peer.requestedMixMsgs[*mh] = struct{}{}
2174-
m.requestedMixMsgs[*mh] = struct{}{}
2189+
m.requestedMixMsgs[*mh] = peer
21752190
}
21762191

21772192
if len(gdMsg.InvList) > 0 {
@@ -2307,9 +2322,9 @@ func New(config *Config) *SyncManager {
23072322
cfg: *config,
23082323
rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate),
23092324
rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate),
2310-
requestedTxns: make(map[chainhash.Hash]struct{}),
2325+
requestedTxns: make(map[chainhash.Hash]*Peer),
23112326
requestedBlocks: make(map[chainhash.Hash]*Peer),
2312-
requestedMixMsgs: make(map[chainhash.Hash]struct{}),
2327+
requestedMixMsgs: make(map[chainhash.Hash]*Peer),
23132328
warnOnNoSync: true,
23142329
peers: make(map[*Peer]struct{}),
23152330
minKnownWork: minKnownWork,

0 commit comments

Comments
 (0)