Skip to content

Commit 099b2f2

Browse files
authored
Merge branch 'main' into feature/reduce-lock
2 parents 3f9fde4 + 3b1e71a commit 099b2f2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2027
-218
lines changed

adapter/distribution_server.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type DistributionServer struct {
2121
engine *distribution.Engine
2222
catalog *distribution.CatalogStore
2323
coordinator kv.Coordinator
24+
readTracker *kv.ActiveTimestampTracker
2425
reloadRetry struct {
2526
attempts int
2627
interval time.Duration
@@ -39,6 +40,12 @@ func WithDistributionCoordinator(coordinator kv.Coordinator) DistributionServerO
3940
}
4041
}
4142

43+
func WithDistributionActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) DistributionServerOption {
44+
return func(s *DistributionServer) {
45+
s.readTracker = tracker
46+
}
47+
}
48+
4249
// WithCatalogReloadRetryPolicy configures the retry policy used after split
4350
// commit when waiting for the local catalog snapshot to become visible.
4451
func WithCatalogReloadRetryPolicy(attempts int, interval time.Duration) DistributionServerOption {
@@ -140,6 +147,8 @@ func (s *DistributionServer) SplitRange(ctx context.Context, req *pb.SplitRangeR
140147
if err != nil {
141148
return nil, err
142149
}
150+
readPin := s.pinReadTS(snapshot.ReadTS)
151+
defer readPin.Release()
143152
if err := validateExpectedCatalogVersion(snapshot.Version, req.GetExpectedCatalogVersion()); err != nil {
144153
return nil, err
145154
}
@@ -175,6 +184,13 @@ func (s *DistributionServer) SplitRange(ctx context.Context, req *pb.SplitRangeR
175184
}, nil
176185
}
177186

187+
func (s *DistributionServer) pinReadTS(ts uint64) *kv.ActiveTimestampToken {
188+
if s == nil || s.readTracker == nil {
189+
return nil
190+
}
191+
return s.readTracker.Pin(ts)
192+
}
193+
178194
func (s *DistributionServer) verifyCatalogLeader() error {
179195
if s.coordinator == nil {
180196
return grpcStatusError(codes.FailedPrecondition, errDistributionCoordinatorRequired.Error())

adapter/grpc.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/bootjp/elastickv/store"
1313
"github.com/cockroachdb/errors"
1414
"github.com/spaolacci/murmur3"
15+
"google.golang.org/grpc/codes"
16+
"google.golang.org/grpc/status"
1517
)
1618

1719
var _ pb.RawKVServer = (*GRPCServer)(nil)
@@ -89,6 +91,9 @@ func (r *GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.Raw
8991
if errors.Is(err, store.ErrKeyNotFound) {
9092
return &pb.RawGetResponse{Value: nil, Exists: false}, nil
9193
}
94+
if errors.Is(err, store.ErrReadTSCompacted) {
95+
return nil, errors.WithStack(status.Error(codes.FailedPrecondition, store.ErrReadTSCompacted.Error()))
96+
}
9297
if err != nil {
9398
return nil, errors.WithStack(err)
9499
}
@@ -135,6 +140,9 @@ func (r *GRPCServer) RawScanAt(ctx context.Context, req *pb.RawScanAtRequest) (*
135140
res, err = r.store.ScanAt(ctx, req.StartKey, req.EndKey, limit, readTS)
136141
}
137142
if err != nil {
143+
if errors.Is(err, store.ErrReadTSCompacted) {
144+
return &pb.RawScanAtResponse{Kv: nil}, errors.WithStack(status.Error(codes.FailedPrecondition, store.ErrReadTSCompacted.Error()))
145+
}
138146
return &pb.RawScanAtResponse{Kv: nil}, errors.WithStack(err)
139147
}
140148

adapter/redis.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ type RedisServer struct {
243243
listen net.Listener
244244
store store.MVCCStore
245245
coordinator kv.Coordinator
246+
readTracker *kv.ActiveTimestampTracker
246247
redisTranscoder *redisTranscoder
247248
pubsub *redisPubSub
248249
scriptMu sync.RWMutex
@@ -258,6 +259,14 @@ type RedisServer struct {
258259
route map[string]func(conn redcon.Conn, cmd redcon.Command)
259260
}
260261

262+
type RedisServerOption func(*RedisServer)
263+
264+
func WithRedisActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) RedisServerOption {
265+
return func(r *RedisServer) {
266+
r.readTracker = tracker
267+
}
268+
}
269+
261270
type connState struct {
262271
inTxn bool
263272
queue []redcon.Command
@@ -283,7 +292,7 @@ type redisResult struct {
283292
err error
284293
}
285294

286-
func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore, coordinate kv.Coordinator, leaderRedis map[raft.ServerAddress]string, relay *RedisPubSubRelay) *RedisServer {
295+
func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore, coordinate kv.Coordinator, leaderRedis map[raft.ServerAddress]string, relay *RedisPubSubRelay, opts ...RedisServerOption) *RedisServer {
287296
if relay == nil {
288297
relay = NewRedisPubSubRelay()
289298
}
@@ -383,6 +392,11 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore
383392
cmdZRevRangeByScore: r.zrevrangebyscore,
384393
cmdZScore: r.zscore,
385394
}
395+
for _, opt := range opts {
396+
if opt != nil {
397+
opt(r)
398+
}
399+
}
386400

387401
return r
388402
}
@@ -402,6 +416,13 @@ func (r *RedisServer) readTS() uint64 {
402416
return snapshotTS(r.coordinator.Clock(), r.store)
403417
}
404418

419+
func (r *RedisServer) pinReadTS(ts uint64) *kv.ActiveTimestampToken {
420+
if r == nil || r.readTracker == nil {
421+
return nil
422+
}
423+
return r.readTracker.Pin(ts)
424+
}
425+
405426
func (r *RedisServer) Run() error {
406427
err := redcon.Serve(r.listen,
407428
func(conn redcon.Conn, cmd redcon.Command) {
@@ -1905,6 +1926,8 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err
19051926
if err != nil {
19061927
return nil, err
19071928
}
1929+
readPin := r.pinReadTS(startTS)
1930+
defer readPin.Release()
19081931

19091932
ctx := &txnContext{
19101933
server: r,

adapter/redis_lua.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func (r *RedisServer) runLuaScript(conn redcon.Conn, script string, evalArgs [][
118118
var reply luaReply
119119
err = r.retryRedisWrite(ctx, func() error {
120120
scriptCtx := newLuaScriptContext(r)
121+
defer scriptCtx.Close()
121122
state := newRedisLuaState()
122123
defer state.Close()
123124
state.SetContext(ctx)
@@ -847,6 +848,7 @@ func (r *RedisServer) execLuaCompat(conn redcon.Conn, command string, args [][]b
847848
var reply luaReply
848849
err := r.retryRedisWrite(ctx, func() error {
849850
scriptCtx := newLuaScriptContext(r)
851+
defer scriptCtx.Close()
850852
nextReply, err := scriptCtx.exec(command, stringArgs)
851853
if err != nil {
852854
return err

adapter/redis_lua_context.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
type luaScriptContext struct {
1717
server *RedisServer
1818
startTS uint64
19+
readPin *kv.ActiveTimestampToken
1920

2021
touched map[string]struct{}
2122
deleted map[string]bool
@@ -161,9 +162,11 @@ var luaRenameHandlers = map[redisValueType]luaRenameHandler{
161162
}
162163

163164
func newLuaScriptContext(server *RedisServer) *luaScriptContext {
165+
startTS := server.readTS()
164166
return &luaScriptContext{
165167
server: server,
166-
startTS: server.readTS(),
168+
startTS: startTS,
169+
readPin: server.pinReadTS(startTS),
167170
touched: map[string]struct{}{},
168171
deleted: map[string]bool{},
169172
strings: map[string]*luaStringState{},
@@ -176,6 +179,13 @@ func newLuaScriptContext(server *RedisServer) *luaScriptContext {
176179
}
177180
}
178181

182+
func (c *luaScriptContext) Close() {
183+
if c == nil || c.readPin == nil {
184+
return
185+
}
186+
c.readPin.Release()
187+
}
188+
179189
func (c *luaScriptContext) exec(command string, args []string) (luaReply, error) {
180190
if handler, ok := luaCommandHandlers[command]; ok {
181191
return handler(c, args)

adapter/test_util.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/Jille/raft-grpc-leader-rpc/leaderhealth"
1414
transport "github.com/Jille/raft-grpc-transport"
1515
"github.com/Jille/raftadmin"
16+
internalutil "github.com/bootjp/elastickv/internal"
1617
"github.com/bootjp/elastickv/kv"
1718
pb "github.com/bootjp/elastickv/proto"
1819
"github.com/bootjp/elastickv/store"
@@ -23,7 +24,6 @@ import (
2324
"github.com/stretchr/testify/require"
2425
"golang.org/x/sys/unix"
2526
"google.golang.org/grpc"
26-
"google.golang.org/grpc/credentials/insecure"
2727
)
2828

2929
func shutdown(nodes []Node) {
@@ -350,7 +350,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) (
350350
r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg, electionTimeout)
351351
assert.NoError(t, err)
352352

353-
s := grpc.NewServer()
353+
s := grpc.NewServer(internalutil.GRPCServerOptions()...)
354354
trx := kv.NewTransaction(r)
355355
coordinator := kv.NewCoordinator(trx, r)
356356
relay := NewRedisPubSubRelay()
@@ -416,9 +416,7 @@ func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg ra
416416
Level: hclog.LevelFromString("WARN"),
417417
})
418418

419-
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{
420-
grpc.WithTransportCredentials(insecure.NewCredentials()),
421-
})
419+
tm := transport.New(raft.ServerAddress(myAddress), internalutil.GRPCDialOptions())
422420

423421
r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport())
424422
if err != nil {

cmd/server/demo.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/hashicorp/raft"
3131
"golang.org/x/sync/errgroup"
3232
"google.golang.org/grpc"
33-
"google.golang.org/grpc/credentials/insecure"
3433
)
3534

3635
var (
@@ -206,7 +205,7 @@ func joinCluster(ctx context.Context, nodes []config) error {
206205
}
207206

208207
// Connect to leader
209-
conn, err := grpc.NewClient(leader.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
208+
conn, err := grpc.NewClient(leader.address, internalutil.GRPCDialOptions()...)
210209
if err != nil {
211210
return fmt.Errorf("failed to dial leader: %w", err)
212211
}
@@ -322,7 +321,7 @@ func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotSto
322321
}
323322

324323
func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate, distServer *adapter.DistributionServer, relay *adapter.RedisPubSubRelay) (*grpc.Server, *adapter.GRPCServer) {
325-
s := grpc.NewServer()
324+
s := grpc.NewServer(internalutil.GRPCServerOptions()...)
326325
trx := kv.NewTransaction(r)
327326
routedStore := kv.NewLeaderRoutedStore(st, coordinator)
328327
gs := adapter.NewGRPCServer(routedStore, coordinator, adapter.WithCloseStore())
@@ -336,7 +335,7 @@ func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordina
336335
return s, gs
337336
}
338337

339-
func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, coordinator *kv.Coordinate, addr, redisAddr, raftRedisMapStr string, relay *adapter.RedisPubSubRelay) (*adapter.RedisServer, error) {
338+
func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, coordinator *kv.Coordinate, addr, redisAddr, raftRedisMapStr string, relay *adapter.RedisPubSubRelay, readTracker *kv.ActiveTimestampTracker) (*adapter.RedisServer, error) {
340339
leaderRedis := make(map[raft.ServerAddress]string)
341340
if raftRedisMapStr != "" {
342341
parts := strings.SplitSeq(raftRedisMapStr, ",")
@@ -355,7 +354,7 @@ func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, co
355354
return nil, errors.WithStack(err)
356355
}
357356
routedStore := kv.NewLeaderRoutedStore(st, coordinator)
358-
return adapter.NewRedisServer(l, redisAddr, routedStore, coordinator, leaderRedis, relay), nil
357+
return adapter.NewRedisServer(l, redisAddr, routedStore, coordinator, leaderRedis, relay, adapter.WithRedisActiveTimestampTracker(readTracker)), nil
359358
}
360359

361360
func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
@@ -370,6 +369,7 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
370369

371370
st := store.NewMVCCStore()
372371
fsm := kv.NewKvFSM(st)
372+
readTracker := kv.NewActiveTimestampTracker()
373373

374374
// Config
375375
c := raft.DefaultConfig()
@@ -381,9 +381,7 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
381381
})
382382

383383
// Transport
384-
tm := transport.New(raft.ServerAddress(cfg.address), []grpc.DialOption{
385-
grpc.WithTransportCredentials(insecure.NewCredentials()),
386-
})
384+
tm := transport.New(raft.ServerAddress(cfg.address), internalutil.GRPCDialOptions())
387385

388386
r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport())
389387
if err != nil {
@@ -405,9 +403,18 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
405403
distEngine,
406404
distCatalog,
407405
adapter.WithDistributionCoordinator(coordinator),
406+
adapter.WithDistributionActiveTimestampTracker(readTracker),
408407
)
409408
metricsRegistry := monitoring.NewRegistry(cfg.raftID, cfg.address)
410409
metricsRegistry.RaftObserver().Start(ctx, []monitoring.RaftRuntime{{GroupID: 1, Raft: r}}, raftObserveInterval)
410+
compactor := kv.NewFSMCompactor(
411+
[]kv.FSMCompactRuntime{{
412+
GroupID: 1,
413+
Raft: r,
414+
Store: st,
415+
}},
416+
kv.WithFSMCompactorActiveTimestampTracker(readTracker),
417+
)
411418
relay := adapter.NewRedisPubSubRelay()
412419

413420
s, grpcSvc := setupGRPC(r, st, tm, coordinator, distServer, relay)
@@ -420,7 +427,7 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
420427
_ = grpcSock.Close()
421428
})
422429

423-
rd, err := setupRedis(ctx, lc, st, coordinator, cfg.address, cfg.redisAddress, cfg.raftRedisMap, relay)
430+
rd, err := setupRedis(ctx, lc, st, coordinator, cfg.address, cfg.redisAddress, cfg.raftRedisMap, relay, readTracker)
424431
if err != nil {
425432
return err
426433
}
@@ -447,6 +454,7 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
447454
}
448455

449456
eg.Go(catalogWatcherTask(ctx, distCatalog, distEngine))
457+
eg.Go(func() error { return compactor.Run(ctx) })
450458
eg.Go(grpcShutdownTask(ctx, s, grpcSock, cfg.address, grpcSvc))
451459
eg.Go(grpcServeTask(s, grpcSock, cfg.address))
452460
eg.Go(redisShutdownTask(ctx, rd, cfg.redisAddress))

0 commit comments

Comments
 (0)