Skip to content

Commit 68a9760

Browse files
committed
netsync: Rework external request from peer.
This reworks the logic that allows code external to the sync manager to request blocks, votes, and treasury spend transactions to both simplify and optimize it. The method now makes use of the same logic the sync manager itself uses when requesting data and no longer potentially returns an error. Of particular note is that it now uses the same much more efficient code path that the sync manager itself uses to determine if a transaction is needed as opposed to the rather expensive legacy utxo-based query approach. Finally, this also modifies the logic to potentially make more than one request if needed since it is technically more accurate and avoids any possibility of error even though the current request sizes are nowhere near the limits.
1 parent 67c037e commit 68a9760

File tree

2 files changed

+35
-96
lines changed

2 files changed

+35
-96
lines changed

internal/netsync/manager.go

Lines changed: 32 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,11 @@ package netsync
88
import (
99
"context"
1010
"errors"
11-
"fmt"
1211
"math"
1312
"sync"
1413
"sync/atomic"
1514
"time"
1615

17-
"github.com/decred/dcrd/blockchain/stake/v5"
1816
"github.com/decred/dcrd/chaincfg/chainhash"
1917
"github.com/decred/dcrd/chaincfg/v3"
2018
"github.com/decred/dcrd/container/apbf"
@@ -2057,120 +2055,67 @@ func (m *SyncManager) SyncPeerID() int32 {
20572055
}
20582056

20592057
// RequestFromPeer requests any combination of blocks, votes, and treasury
2060-
// spends, from the given peer. It ensures all of the requests are tracked so
2058+
// spends from the given peer. It ensures all of the requests are tracked so
20612059
// the peer is not banned for sending unrequested data when it responds.
20622060
//
20632061
// This function is safe for concurrent access.
2064-
func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes,
2065-
tSpendHashes []chainhash.Hash) error {
2066-
2062+
func (m *SyncManager) RequestFromPeer(peer *Peer, blocks, voteHashes, tSpendHashes []chainhash.Hash) {
20672063
if m.shutdownRequested() {
2068-
return nil
2064+
return
20692065
}
20702066

20712067
defer m.requestMtx.Unlock()
20722068
m.requestMtx.Lock()
20732069

2074-
// Add the blocks to the request.
2070+
// Request as many needed blocks as possible at once.
2071+
var numRequested uint32
20752072
gdMsg := wire.NewMsgGetData()
20762073
for i := range blocks {
2077-
// Skip the block when it has already been requested.
2078-
bh := &blocks[i]
2079-
if m.isRequestedBlock(bh) {
2080-
continue
2081-
}
2082-
2083-
// Skip the block when it is already known.
2084-
if m.cfg.Chain.HaveBlock(bh) {
2074+
// Skip the block when it has already been requested or is already
2075+
// known.
2076+
blockHash := &blocks[i]
2077+
if m.isRequestedBlock(blockHash) || m.cfg.Chain.HaveBlock(blockHash) {
20852078
continue
20862079
}
20872080

2088-
err := gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, bh))
2089-
if err != nil {
2090-
return fmt.Errorf("unexpected error encountered building request "+
2091-
"for block %v: %w", bh, err)
2081+
gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, blockHash))
2082+
m.requestedBlocks[*blockHash] = peer
2083+
numRequested++
2084+
if numRequested == wire.MaxInvPerMsg {
2085+
// Send full getdata message and reset.
2086+
peer.QueueMessage(gdMsg, nil)
2087+
gdMsg = wire.NewMsgGetData()
2088+
numRequested = 0
20922089
}
2093-
2094-
m.requestedBlocks[*bh] = peer
20952090
}
20962091

2097-
addTxsToRequest := func(hashes []chainhash.Hash, txType stake.TxType) error {
2098-
// Return immediately if txs is nil.
2099-
if hashes == nil {
2100-
return nil
2101-
}
2102-
2092+
// Request as many needed votes and treasury spend transactions as possible
2093+
// at once.
2094+
for _, hashes := range [][]chainhash.Hash{voteHashes, tSpendHashes} {
21032095
for i := range hashes {
2104-
// Skip the transaction when it has already been requested.
2096+
// Skip the transaction when it has already been requested or is
2097+
// otherwise not needed.
21052098
txHash := &hashes[i]
2106-
if _, ok := m.requestedTxns[*txHash]; ok {
2107-
continue
2108-
}
2109-
2110-
// Ask the transaction memory pool if the transaction is known
2111-
// to it in any form (main pool or orphan).
2112-
if m.cfg.TxMemPool.HaveTransaction(txHash) {
2113-
continue
2114-
}
2115-
2116-
// Check if the transaction exists from the point of view of the main
2117-
// chain tip. Note that this is only a best effort since it is expensive
2118-
// to check existence of every output and the only purpose of this check
2119-
// is to avoid requesting already known transactions.
2120-
//
2121-
// Check for a specific outpoint based on the tx type.
2122-
outpoint := wire.OutPoint{Hash: *txHash}
2123-
switch txType {
2124-
case stake.TxTypeSSGen:
2125-
// The first two outputs of vote transactions are OP_RETURN <data>, and
2126-
// therefore never exist as an unspent txo. Use the third output, as
2127-
// the third output (and subsequent outputs) are OP_SSGEN outputs.
2128-
outpoint.Index = 2
2129-
outpoint.Tree = wire.TxTreeStake
2130-
case stake.TxTypeTSpend:
2131-
// The first output of a tSpend transaction is OP_RETURN <data>, and
2132-
// therefore never exists as an unspent txo. Use the second output, as
2133-
// the second output (and subsequent outputs) are OP_TGEN outputs.
2134-
outpoint.Index = 1
2135-
outpoint.Tree = wire.TxTreeStake
2136-
}
2137-
entry, err := m.cfg.Chain.FetchUtxoEntry(outpoint)
2138-
if err != nil {
2139-
return err
2140-
}
2141-
if entry != nil {
2099+
_, alreadyRequested := m.requestedTxns[*txHash]
2100+
if alreadyRequested || !m.needTx(txHash) {
21422101
continue
21432102
}
21442103

2145-
err = gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeTx, txHash))
2146-
if err != nil {
2147-
return fmt.Errorf("unexpected error encountered building request "+
2148-
"for vote %v: %w", txHash, err)
2149-
}
2150-
2104+
gdMsg.AddInvVect(wire.NewInvVect(wire.InvTypeTx, txHash))
21512105
m.requestedTxns[*txHash] = peer
2106+
numRequested++
2107+
if numRequested == wire.MaxInvPerMsg {
2108+
// Send full getdata message and reset.
2109+
peer.QueueMessage(gdMsg, nil)
2110+
gdMsg = wire.NewMsgGetData()
2111+
numRequested = 0
2112+
}
21522113
}
2153-
2154-
return nil
2155-
}
2156-
2157-
// Add the vote transactions to the request.
2158-
err := addTxsToRequest(voteHashes, stake.TxTypeSSGen)
2159-
if err != nil {
2160-
return err
2161-
}
2162-
2163-
// Add the tspend transactions to the request.
2164-
err = addTxsToRequest(tSpendHashes, stake.TxTypeTSpend)
2165-
if err != nil {
2166-
return err
21672114
}
21682115

21692116
if len(gdMsg.InvList) > 0 {
21702117
peer.QueueMessage(gdMsg, nil)
21712118
}
2172-
2173-
return nil
21742119
}
21752120

21762121
// RequestMixMsgFromPeer the specified mix message from the given peer. It

server.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,11 +1343,8 @@ func (sp *serverPeer) OnMiningState(_ *peer.Peer, msg *wire.MsgMiningState) {
13431343
}
13441344
}
13451345

1346-
err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, blockHashes,
1346+
sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, blockHashes,
13471347
voteHashes, nil)
1348-
if err != nil {
1349-
peerLog.Warnf("couldn't handle mining state message: %v", err)
1350-
}
13511348
}
13521349

13531350
// OnGetInitState is invoked when a peer receives a getinitstate wire message.
@@ -1429,11 +1426,8 @@ func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) {
14291426
// OnInitState is invoked when a peer receives a initstate wire message. It
14301427
// requests the data advertised in the message from the peer.
14311428
func (sp *serverPeer) OnInitState(_ *peer.Peer, msg *wire.MsgInitState) {
1432-
err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer,
1433-
msg.BlockHashes, msg.VoteHashes, msg.TSpendHashes)
1434-
if err != nil {
1435-
peerLog.Warnf("couldn't handle init state message: %v", err)
1436-
}
1429+
sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, msg.BlockHashes,
1430+
msg.VoteHashes, msg.TSpendHashes)
14371431
}
14381432

14391433
// OnTx is invoked when a peer receives a tx wire message. It blocks until the

0 commit comments

Comments
 (0)