forked from ipfs/rainbow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsetup_bitswap.go
More file actions
132 lines (113 loc) · 4.45 KB
/
setup_bitswap.go
File metadata and controls
132 lines (113 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package main
import (
"context"
"time"
"github.com/ipfs/boxo/routing/providerquerymanager"
"github.com/ipfs/boxo/bitswap"
bsclient "github.com/ipfs/boxo/bitswap/client"
"github.com/ipfs/boxo/bitswap/network"
bsnet "github.com/ipfs/boxo/bitswap/network/bsnet"
"github.com/ipfs/boxo/bitswap/network/httpnet"
bsserver "github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
metri "github.com/ipfs/go-metrics-interface"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)
func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface {
bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
connEvtMgr := network.NewConnectEventManager()
var exnet network.BitSwapNetwork
bn := bsnet.NewFromIpfsHost(h, bsnet.WithConnectEventManager(connEvtMgr))
if cfg.HTTPRetrievalEnable {
htnet := httpnet.New(h,
httpnet.WithHTTPWorkers(cfg.HTTPRetrievalWorkers),
httpnet.WithMaxDontHaveErrors(cfg.HTTPRetrievalMaxDontHaveErrors),
httpnet.WithAllowlist(cfg.HTTPRetrievalAllowlist),
httpnet.WithDenylist(cfg.HTTPRetrievalDenylist),
httpnet.WithUserAgent("rainbow/"+buildVersion()),
httpnet.WithMetricsLabelsForEndpoints(cfg.HTTPRetrievalMetricsLabelsForEndpoints),
httpnet.WithConnectEventManager(connEvtMgr),
)
exnet = network.New(h.Peerstore(), bn, htnet)
} else {
exnet = bn
}
// Custom query manager with the content router and the host
// and our custom options to overwrite the default.
pqm, err := providerquerymanager.New(exnet, cr,
providerquerymanager.WithMaxInProcessRequests(cfg.RoutingMaxRequests),
providerquerymanager.WithMaxProviders(cfg.RoutingMaxProviders),
providerquerymanager.WithMaxTimeout(cfg.RoutingMaxTimeout),
providerquerymanager.WithIgnoreProviders(cfg.RoutingIgnoreProviders...),
)
if err != nil {
panic(err)
}
context.AfterFunc(ctx, func() {
pqm.Close()
})
// --- Client Options
// bitswap.RebroadcastDelay: default is 1 minute to search for a random
// live-want (1 CID). I think we want to search for random live-wants more
// often although probably it overlaps with general rebroadcasts.
const rebroadcastDelay = 10 * time.Second
// bitswap.ProviderSearchDelay: default is 1 second.
const providerSearchDelay = 1 * time.Second
// --- Bitswap Client Options
clientOpts := []bsclient.Option{
bsclient.RebroadcastDelay(rebroadcastDelay),
bsclient.ProviderSearchDelay(providerSearchDelay),
bsclient.WithDefaultProviderQueryManager(false), // we pass it in manually
}
if !cfg.BitswapEnableDuplicateBlockStats {
clientOpts = append(clientOpts, bsclient.WithoutDuplicatedBlockStats())
}
// If peering and shared cache are both enabled, we initialize both a
// Client and a Server with custom request filter and custom options.
// client+server is more expensive but necessary when deployment requires
// serving cached blocks to safelisted peerids
if cfg.PeeringSharedCache && len(cfg.Peering) > 0 {
var peerBlockRequestFilter bsserver.PeerBlockRequestFilter
// Set up request filter to only respond to request for safelisted (peered) nodes
peers := make(map[peer.ID]struct{}, len(cfg.Peering))
for _, a := range cfg.Peering {
peers[a.ID] = struct{}{}
}
peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
_, ok := peers[p]
return ok
}
// turn bitswap clients option into bitswap options
var opts []bitswap.Option
for _, o := range clientOpts {
opts = append(opts, bitswap.WithClientOption(o))
}
// ---- Server Options
opts = append(opts,
bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter),
// When we don't have a block, don't reply. This reduces processment.
bitswap.SetSendDontHaves(false),
bitswap.WithWantHaveReplaceSize(cfg.BitswapWantHaveReplaceSize),
)
// Initialize client+server
bswap := bitswap.New(bsctx, exnet, pqm, bstore, opts...)
exnet.Start(bswap)
return &noNotifyExchange{bswap}
}
// By default, rainbow runs with bitswap client alone
bswap := bsclient.New(bsctx, exnet, pqm, bstore, clientOpts...)
exnet.Start(bswap)
return bswap
}
type noNotifyExchange struct {
exchange.Interface
}
func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Rainbow does not notify when we get new blocks in our Blockservice.
return nil
}