Skip to content

Commit 1a4256f

Browse files
committed
netsync: Separate mix message request.
The current logic for requesting mix messages from peers was added to the generic request from peer paths because the old async model required a lot of additional plumbing. Now that the sync manager has been converted to a synchronous model, no additional plumbing is needed and therefore it is much simpler and more efficient to implement the request logic for mix messages independently. Also, only a single mix message for a missing pair request is ever needed at a time, so the additional overhead of taking a slice is not needed. Consequently, this separates the logic for requesting a mix message from a peer from the generic request from peer path into its own specialized method that is much more efficient.
1 parent d26d8fe commit 1a4256f

File tree

2 files changed

+29
-26
lines changed

2 files changed

+29
-26
lines changed

internal/netsync/manager.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2063,7 +2063,7 @@ func (m *SyncManager) SyncPeerID() int32 {
20632063
//
20642064
// This function is safe for concurrent access.
20652065
func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
2066-
tSpendHashes, mixHashes []chainhash.Hash) error {
2066+
tSpendHashes []chainhash.Hash) error {
20672067

20682068
if m.shutdownRequested() {
20692069
return nil
@@ -2167,32 +2167,36 @@ func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
21672167
return err
21682168
}
21692169

2170-
for i := range mixHashes {
2171-
// Skip mix messages that have already been requested.
2172-
mh := &mixHashes[i]
2173-
if _, ok := m.requestedMixMsgs[*mh]; ok {
2174-
continue
2175-
}
2176-
2177-
// Skip the message when it is already known.
2178-
if m.cfg.MixPool.HaveMessage(mh) {
2179-
continue
2180-
}
2170+
if len(gdMsg.InvList) > 0 {
2171+
peer.QueueMessage(gdMsg, nil)
2172+
}
21812173

2182-
err := gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeMix, mh))
2183-
if err != nil {
2184-
return fmt.Errorf("unexpected error encountered building request "+
2185-
"for inv vect mix hash %v: %w", mh, err)
2186-
}
2174+
return nil
2175+
}
21872176

2188-
m.requestedMixMsgs[*mh] = peer
2177+
// RequestMixMsgFromPeer the specified mix message from the given peer. It
2178+
// ensures all of the requests are tracked so the peer is not banned for sending
2179+
// unrequested data when it responds.
2180+
//
2181+
// This function is safe for concurrent access.
2182+
func (m *SyncManager) RequestMixMsgFromPeer(peer *Peer, mixHash *chainhash.Hash) {
2183+
if m.shutdownRequested() {
2184+
return
21892185
}
21902186

2191-
if len(gdMsg.InvList) > 0 {
2192-
peer.QueueMessage(gdMsg, nil)
2187+
// Skip mix messages that have already been requested or are already known.
2188+
m.requestMtx.Lock()
2189+
_, ok := m.requestedMixMsgs[*mixHash]
2190+
m.requestMtx.Unlock()
2191+
alreadyHave := ok || m.cfg.MixPool.HaveMessage(mixHash)
2192+
if alreadyHave {
2193+
return
21932194
}
21942195

2195-
return nil
2196+
gdMsg := wire.NewMsgGetDataSizeHint(1)
2197+
gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeMix, mixHash))
2198+
m.requestedMixMsgs[*mixHash] = peer
2199+
peer.QueueMessage(gdMsg, nil)
21962200
}
21972201

21982202
// ProcessBlock processes the provided block using the chain instance associated

server.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,7 +1344,7 @@ func (sp *serverPeer) OnMiningState(_ *peer.Peer, msg *wire.MsgMiningState) {
13441344
}
13451345

13461346
err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, blockHashes,
1347-
voteHashes, nil, nil)
1347+
voteHashes, nil)
13481348
if err != nil {
13491349
peerLog.Warnf("couldn't handle mining state message: %v", err)
13501350
}
@@ -1430,7 +1430,7 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) {
14301430
// requests the data advertised in the message from the peer.
14311431
func (sp *serverPeer) OnInitState(_ *peer.Peer, msg *wire.MsgInitState) {
14321432
err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer,
1433-
msg.BlockHashes, msg.VoteHashes, msg.TSpendHashes, nil)
1433+
msg.BlockHashes, msg.VoteHashes, msg.TSpendHashes)
14341434
if err != nil {
14351435
peerLog.Warnf("couldn't handle init state message: %v", err)
14361436
}
@@ -1844,9 +1844,8 @@ func (sp *serverPeer) onMixMessage(msg mixing.Message) {
18441844
}
18451845
var missingOwnPRErr *mixpool.MissingOwnPRError
18461846
if errors.As(err, &missingOwnPRErr) {
1847-
mixHashes := []chainhash.Hash{missingOwnPRErr.MissingPR}
1848-
sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, nil, nil, nil,
1849-
mixHashes)
1847+
mixHash := &missingOwnPRErr.MissingPR
1848+
sp.server.syncManager.RequestMixMsgFromPeer(sp.syncMgrPeer, mixHash)
18501849
return
18511850
}
18521851
if mixpool.IsBannable(err, sp.Services()) {

0 commit comments

Comments
 (0)