Skip to content

Commit 1232311

Browse files
committed
netsync: Separate mix message request.
The current logic for requesting mix messages from peers was added to the generic request from peer paths because the old async model required a lot of additional plumbing. Now that the sync manager has been converted to a synchronous model, no additional plumbing is needed and therefore it is much simpler and more efficient to implement the request logic for mix messages independently. Also, only a single mix message for a missing pair request is ever needed at a time, so the additional overhead of taking a slice is not needed. Consequently, this separates the logic for requesting a mix message from a peer from the generic request from peer path into its own specialized method that is much more efficient. Finally, it also changes the logic for determining if the mix message should be requested to make use of the same method as other paths that handle requesting mix messages. In particular, this means it will no longer attempt to request recently rejected or removed messages.
1 parent 429f244 commit 1232311

File tree

2 files changed

+33
-30
lines changed

2 files changed

+33
-30
lines changed

internal/netsync/manager.go

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2057,14 +2057,13 @@ func (m *SyncManager) SyncPeerID() int32 {
20572057
return 0
20582058
}
20592059

2060-
// RequestFromPeer requests any combination of blocks, votes, treasury spends,
2061-
// and mix messages from the given peer. It ensures all of the requests are
2062-
// tracked so the peer is not banned for sending unrequested data when it
2063-
// responds.
2060+
// RequestFromPeer requests any combination of blocks, votes, and treasury
2061+
// spends from the given peer. It ensures all of the requests are tracked so
2062+
// the peer is not banned for sending unrequested data when it responds.
20642063
//
20652064
// This function is safe for concurrent access.
20662065
func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
2067-
tSpendHashes, mixHashes []chainhash.Hash) error {
2066+
tSpendHashes []chainhash.Hash) error {
20682067

20692068
if m.shutdownRequested() {
20702069
return nil
@@ -2168,32 +2167,37 @@ func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
21682167
return err
21692168
}
21702169

2171-
for i := range mixHashes {
2172-
// Skip mix messages that have already been requested.
2173-
mh := &mixHashes[i]
2174-
if _, ok := m.requestedMixMsgs[*mh]; ok {
2175-
continue
2176-
}
2177-
2178-
// Skip the message when it is already known.
2179-
if m.cfg.MixPool.HaveMessage(mh) {
2180-
continue
2181-
}
2170+
if len(gdMsg.InvList) > 0 {
2171+
peer.QueueMessage(gdMsg, nil)
2172+
}
21822173

2183-
err := gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeMix, mh))
2184-
if err != nil {
2185-
return fmt.Errorf("unexpected error encountered building request "+
2186-
"for inv vect mix hash %v: %w", mh, err)
2187-
}
2174+
return nil
2175+
}
21882176

2189-
m.requestedMixMsgs[*mh] = peer
2177+
// RequestMixMsgFromPeer requests the specified mix message from the given peer.
2178+
// It ensures all of the requests are tracked so the peer is not banned for
2179+
// sending unrequested data when it responds.
2180+
//
2181+
// This function is safe for concurrent access.
2182+
func (m *SyncManager) RequestMixMsgFromPeer(peer *Peer, mixHash *chainhash.Hash) {
2183+
if m.shutdownRequested() {
2184+
return
21902185
}
21912186

2192-
if len(gdMsg.InvList) > 0 {
2193-
peer.QueueMessage(gdMsg, nil)
2187+
defer m.requestMtx.Unlock()
2188+
m.requestMtx.Lock()
2189+
2190+
// Skip mix messages that have already been requested or are otherwise not
2191+
// needed.
2192+
_, alreadyRequested := m.requestedMixMsgs[*mixHash]
2193+
if alreadyRequested || !m.needMixMsg(mixHash) {
2194+
return
21942195
}
21952196

2196-
return nil
2197+
gdMsg := wire.NewMsgGetDataSizeHint(1)
2198+
gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeMix, mixHash))
2199+
m.requestedMixMsgs[*mixHash] = peer
2200+
peer.QueueMessage(gdMsg, nil)
21972201
}
21982202

21992203
// ProcessBlock processes the provided block using the chain instance associated

server.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,7 +1344,7 @@ func (sp *serverPeer) OnMiningState(_ *peer.Peer, msg *wire.MsgMiningState) {
13441344
}
13451345

13461346
err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, blockHashes,
1347-
voteHashes, nil, nil)
1347+
voteHashes, nil)
13481348
if err != nil {
13491349
peerLog.Warnf("couldn't handle mining state message: %v", err)
13501350
}
@@ -1430,7 +1430,7 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) {
14301430
// requests the data advertised in the message from the peer.
14311431
func (sp *serverPeer) OnInitState(_ *peer.Peer, msg *wire.MsgInitState) {
14321432
err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer,
1433-
msg.BlockHashes, msg.VoteHashes, msg.TSpendHashes, nil)
1433+
msg.BlockHashes, msg.VoteHashes, msg.TSpendHashes)
14341434
if err != nil {
14351435
peerLog.Warnf("couldn't handle init state message: %v", err)
14361436
}
@@ -1852,9 +1852,8 @@ func (sp *serverPeer) onMixMessage(msg mixing.Message) {
18521852
}
18531853
var missingOwnPRErr *mixpool.MissingOwnPRError
18541854
if errors.As(err, &missingOwnPRErr) {
1855-
mixHashes := []chainhash.Hash{missingOwnPRErr.MissingPR}
1856-
sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, nil, nil, nil,
1857-
mixHashes)
1855+
mixHash := &missingOwnPRErr.MissingPR
1856+
sp.server.syncManager.RequestMixMsgFromPeer(sp.syncMgrPeer, mixHash)
18581857
return
18591858
}
18601859
if mixpool.IsBannable(err, sp.Services()) {

0 commit comments

Comments
 (0)