Skip to content

Commit 67fc954

Browse files
committed
Add background FSM compaction
1 parent e8d532c commit 67fc954

17 files changed

+1039
-20
lines changed

adapter/distribution_server.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type DistributionServer struct {
2121
engine *distribution.Engine
2222
catalog *distribution.CatalogStore
2323
coordinator kv.Coordinator
24+
readTracker *kv.ActiveTimestampTracker
2425
reloadRetry struct {
2526
attempts int
2627
interval time.Duration
@@ -39,6 +40,12 @@ func WithDistributionCoordinator(coordinator kv.Coordinator) DistributionServerO
3940
}
4041
}
4142

43+
func WithDistributionActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) DistributionServerOption {
44+
return func(s *DistributionServer) {
45+
s.readTracker = tracker
46+
}
47+
}
48+
4249
// WithCatalogReloadRetryPolicy configures the retry policy used after split
4350
// commit when waiting for the local catalog snapshot to become visible.
4451
func WithCatalogReloadRetryPolicy(attempts int, interval time.Duration) DistributionServerOption {
@@ -140,6 +147,8 @@ func (s *DistributionServer) SplitRange(ctx context.Context, req *pb.SplitRangeR
140147
if err != nil {
141148
return nil, err
142149
}
150+
readPin := s.pinReadTS(snapshot.ReadTS)
151+
defer readPin.Release()
143152
if err := validateExpectedCatalogVersion(snapshot.Version, req.GetExpectedCatalogVersion()); err != nil {
144153
return nil, err
145154
}
@@ -175,6 +184,13 @@ func (s *DistributionServer) SplitRange(ctx context.Context, req *pb.SplitRangeR
175184
}, nil
176185
}
177186

187+
func (s *DistributionServer) pinReadTS(ts uint64) *kv.ActiveTimestampToken {
188+
if s == nil || s.readTracker == nil {
189+
return nil
190+
}
191+
return s.readTracker.Pin(ts)
192+
}
193+
178194
func (s *DistributionServer) verifyCatalogLeader() error {
179195
if s.coordinator == nil {
180196
return grpcStatusError(codes.FailedPrecondition, errDistributionCoordinatorRequired.Error())

adapter/grpc.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/bootjp/elastickv/store"
1313
"github.com/cockroachdb/errors"
1414
"github.com/spaolacci/murmur3"
15+
"google.golang.org/grpc/codes"
16+
"google.golang.org/grpc/status"
1517
)
1618

1719
var _ pb.RawKVServer = (*GRPCServer)(nil)
@@ -89,6 +91,9 @@ func (r *GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.Raw
8991
if errors.Is(err, store.ErrKeyNotFound) {
9092
return &pb.RawGetResponse{Value: nil, Exists: false}, nil
9193
}
94+
if errors.Is(err, store.ErrReadTSCompacted) {
95+
return nil, errors.WithStack(status.Error(codes.FailedPrecondition, store.ErrReadTSCompacted.Error()))
96+
}
9297
if err != nil {
9398
return nil, errors.WithStack(err)
9499
}
@@ -135,6 +140,9 @@ func (r *GRPCServer) RawScanAt(ctx context.Context, req *pb.RawScanAtRequest) (*
135140
res, err = r.store.ScanAt(ctx, req.StartKey, req.EndKey, limit, readTS)
136141
}
137142
if err != nil {
143+
if errors.Is(err, store.ErrReadTSCompacted) {
144+
return &pb.RawScanAtResponse{Kv: nil}, errors.WithStack(status.Error(codes.FailedPrecondition, store.ErrReadTSCompacted.Error()))
145+
}
138146
return &pb.RawScanAtResponse{Kv: nil}, errors.WithStack(err)
139147
}
140148

adapter/redis.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ type RedisServer struct {
243243
listen net.Listener
244244
store store.MVCCStore
245245
coordinator kv.Coordinator
246+
readTracker *kv.ActiveTimestampTracker
246247
redisTranscoder *redisTranscoder
247248
pubsub *redisPubSub
248249
scriptMu sync.RWMutex
@@ -258,6 +259,14 @@ type RedisServer struct {
258259
route map[string]func(conn redcon.Conn, cmd redcon.Command)
259260
}
260261

262+
type RedisServerOption func(*RedisServer)
263+
264+
func WithRedisActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) RedisServerOption {
265+
return func(r *RedisServer) {
266+
r.readTracker = tracker
267+
}
268+
}
269+
261270
type connState struct {
262271
inTxn bool
263272
queue []redcon.Command
@@ -283,7 +292,7 @@ type redisResult struct {
283292
err error
284293
}
285294

286-
func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore, coordinate kv.Coordinator, leaderRedis map[raft.ServerAddress]string, relay *RedisPubSubRelay) *RedisServer {
295+
func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore, coordinate kv.Coordinator, leaderRedis map[raft.ServerAddress]string, relay *RedisPubSubRelay, opts ...RedisServerOption) *RedisServer {
287296
if relay == nil {
288297
relay = NewRedisPubSubRelay()
289298
}
@@ -383,6 +392,11 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore
383392
cmdZRevRangeByScore: r.zrevrangebyscore,
384393
cmdZScore: r.zscore,
385394
}
395+
for _, opt := range opts {
396+
if opt != nil {
397+
opt(r)
398+
}
399+
}
386400

387401
return r
388402
}
@@ -402,6 +416,13 @@ func (r *RedisServer) readTS() uint64 {
402416
return snapshotTS(r.coordinator.Clock(), r.store)
403417
}
404418

419+
func (r *RedisServer) pinReadTS(ts uint64) *kv.ActiveTimestampToken {
420+
if r == nil || r.readTracker == nil {
421+
return nil
422+
}
423+
return r.readTracker.Pin(ts)
424+
}
425+
405426
func (r *RedisServer) Run() error {
406427
err := redcon.Serve(r.listen,
407428
func(conn redcon.Conn, cmd redcon.Command) {
@@ -1905,6 +1926,8 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err
19051926
if err != nil {
19061927
return nil, err
19071928
}
1929+
readPin := r.pinReadTS(startTS)
1930+
defer readPin.Release()
19081931

19091932
ctx := &txnContext{
19101933
server: r,

adapter/redis_lua.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func (r *RedisServer) runLuaScript(conn redcon.Conn, script string, evalArgs [][
118118
var reply luaReply
119119
err = r.retryRedisWrite(ctx, func() error {
120120
scriptCtx := newLuaScriptContext(r)
121+
defer scriptCtx.Close()
121122
state := newRedisLuaState()
122123
defer state.Close()
123124
state.SetContext(ctx)
@@ -847,6 +848,7 @@ func (r *RedisServer) execLuaCompat(conn redcon.Conn, command string, args [][]b
847848
var reply luaReply
848849
err := r.retryRedisWrite(ctx, func() error {
849850
scriptCtx := newLuaScriptContext(r)
851+
defer scriptCtx.Close()
850852
nextReply, err := scriptCtx.exec(command, stringArgs)
851853
if err != nil {
852854
return err

adapter/redis_lua_context.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
type luaScriptContext struct {
1717
server *RedisServer
1818
startTS uint64
19+
readPin *kv.ActiveTimestampToken
1920

2021
touched map[string]struct{}
2122
deleted map[string]bool
@@ -161,9 +162,11 @@ var luaRenameHandlers = map[redisValueType]luaRenameHandler{
161162
}
162163

163164
func newLuaScriptContext(server *RedisServer) *luaScriptContext {
165+
startTS := server.readTS()
164166
return &luaScriptContext{
165167
server: server,
166-
startTS: server.readTS(),
168+
startTS: startTS,
169+
readPin: server.pinReadTS(startTS),
167170
touched: map[string]struct{}{},
168171
deleted: map[string]bool{},
169172
strings: map[string]*luaStringState{},
@@ -176,6 +179,13 @@ func newLuaScriptContext(server *RedisServer) *luaScriptContext {
176179
}
177180
}
178181

182+
func (c *luaScriptContext) Close() {
183+
if c == nil || c.readPin == nil {
184+
return
185+
}
186+
c.readPin.Release()
187+
}
188+
179189
func (c *luaScriptContext) exec(command string, args []string) (luaReply, error) {
180190
if handler, ok := luaCommandHandlers[command]; ok {
181191
return handler(c, args)

cmd/server/demo.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordina
335335
return s, gs
336336
}
337337

338-
func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, coordinator *kv.Coordinate, addr, redisAddr, raftRedisMapStr string, relay *adapter.RedisPubSubRelay) (*adapter.RedisServer, error) {
338+
func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, coordinator *kv.Coordinate, addr, redisAddr, raftRedisMapStr string, relay *adapter.RedisPubSubRelay, readTracker *kv.ActiveTimestampTracker) (*adapter.RedisServer, error) {
339339
leaderRedis := make(map[raft.ServerAddress]string)
340340
if raftRedisMapStr != "" {
341341
parts := strings.SplitSeq(raftRedisMapStr, ",")
@@ -354,7 +354,7 @@ func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, co
354354
return nil, errors.WithStack(err)
355355
}
356356
routedStore := kv.NewLeaderRoutedStore(st, coordinator)
357-
return adapter.NewRedisServer(l, redisAddr, routedStore, coordinator, leaderRedis, relay), nil
357+
return adapter.NewRedisServer(l, redisAddr, routedStore, coordinator, leaderRedis, relay, adapter.WithRedisActiveTimestampTracker(readTracker)), nil
358358
}
359359

360360
func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
@@ -369,6 +369,7 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
369369

370370
st := store.NewMVCCStore()
371371
fsm := kv.NewKvFSM(st)
372+
readTracker := kv.NewActiveTimestampTracker()
372373

373374
// Config
374375
c := raft.DefaultConfig()
@@ -402,9 +403,18 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
402403
distEngine,
403404
distCatalog,
404405
adapter.WithDistributionCoordinator(coordinator),
406+
adapter.WithDistributionActiveTimestampTracker(readTracker),
405407
)
406408
metricsRegistry := monitoring.NewRegistry(cfg.raftID, cfg.address)
407409
metricsRegistry.RaftObserver().Start(ctx, []monitoring.RaftRuntime{{GroupID: 1, Raft: r}}, raftObserveInterval)
410+
compactor := kv.NewFSMCompactor(
411+
[]kv.FSMCompactRuntime{{
412+
GroupID: 1,
413+
Raft: r,
414+
Store: st,
415+
}},
416+
kv.WithFSMCompactorActiveTimestampTracker(readTracker),
417+
)
408418
relay := adapter.NewRedisPubSubRelay()
409419

410420
s, grpcSvc := setupGRPC(r, st, tm, coordinator, distServer, relay)
@@ -417,7 +427,7 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
417427
_ = grpcSock.Close()
418428
})
419429

420-
rd, err := setupRedis(ctx, lc, st, coordinator, cfg.address, cfg.redisAddress, cfg.raftRedisMap, relay)
430+
rd, err := setupRedis(ctx, lc, st, coordinator, cfg.address, cfg.redisAddress, cfg.raftRedisMap, relay, readTracker)
421431
if err != nil {
422432
return err
423433
}
@@ -444,6 +454,7 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
444454
}
445455

446456
eg.Go(catalogWatcherTask(ctx, distCatalog, distEngine))
457+
eg.Go(func() error { return compactor.Run(ctx) })
447458
eg.Go(grpcShutdownTask(ctx, s, grpcSock, cfg.address, grpcSvc))
448459
eg.Go(grpcServeTask(s, grpcSock, cfg.address))
449460
eg.Go(redisShutdownTask(ctx, rd, cfg.redisAddress))

docs/fsm_compaction_design.md

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# FSM Compaction Design
2+
3+
## Objective
4+
5+
Run MVCC compaction automatically for the FSM store without breaking:
6+
7+
1. Ongoing transactional reads.
8+
2. Read-modify-write flows that depend on a stable read timestamp.
9+
3. Raft apply progress during backlog replay or elections.
10+
11+
## Problem
12+
13+
The current FSM store used by `main.go` and `cmd/server/demo.go` is the
14+
in-memory `mvccStore`.
15+
16+
1. It keeps all MVCC versions in memory.
17+
2. `Compact(ctx, minTS)` exists, but nothing calls it in production startup.
18+
3. The API still supports explicit historical reads (`GetAt`, `ScanAt`), so
19+
naive compaction can silently change query semantics.
20+
21+
## Design
22+
23+
### 1. Background Compactor
24+
25+
Add a background `kv.FSMCompactor` that runs on a timer instead of the Raft
26+
apply path.
27+
28+
Why:
29+
30+
1. Compaction is potentially heavy.
31+
2. Raft apply must stay simple and deterministic.
32+
3. Compaction should be skipped while a node is still catching up.
33+
34+
### 2. Runtime Eligibility Checks
35+
36+
For each Raft runtime, compaction only runs when:
37+
38+
1. `fsm_pending == 0`
39+
2. `applied_index >= commit_index`
40+
3. Raft state is not `Candidate`
41+
42+
This avoids compacting while the local FSM is still behind or unstable.
43+
44+
### 3. Retention Watermark
45+
46+
The compactor computes a safe `minTS` from:
47+
48+
1. A retention window based on HLC wall time.
49+
2. The oldest actively pinned read timestamp.
50+
51+
Effective rule:
52+
53+
1. Start with `now - retentionWindow` in HLC form.
54+
2. If an older in-flight read exists, clamp the watermark to just before that
55+
read timestamp.
56+
57+
### 4. Active Timestamp Tracking
58+
59+
Longer read-modify-write flows pin their read timestamp through
60+
`kv.ActiveTimestampTracker`.
61+
62+
The current implementation pins:
63+
64+
1. Redis `MULTI` / `EXEC` transactions.
65+
2. Redis Lua script execution.
66+
3. Distribution `SplitRange`, which performs a catalog read-modify-write cycle.
67+
68+
### 5. Historical Read Contract
69+
70+
The in-memory store now tracks `minRetainedTS`.
71+
72+
1. Reads older than `minRetainedTS` fail with `store.ErrReadTSCompacted`.
73+
2. This makes compaction explicit instead of returning silently incorrect
74+
historical results.
75+
3. `minRetainedTS` is included in `mvccStore` snapshots so restart/restore keeps
76+
the same retention boundary.
77+
78+
### 6. Scope of the Current Implementation
79+
80+
Implemented now:
81+
82+
1. Background FSM compactor in `kv/compactor.go`
83+
2. Active timestamp tracker in `kv/active_timestamp_tracker.go`
84+
3. `minRetainedTS` enforcement for `mvccStore`
85+
4. Startup wiring in:
86+
- `main.go`
87+
- `cmd/server/demo.go`
88+
5. Redis and distribution server timestamp pinning
89+
90+
Not implemented yet:
91+
92+
1. Pebble FSM compaction logic
93+
2. Dedicated GC for transaction metadata keys such as commit/rollback records
94+
3. Prometheus metrics for compaction runs
95+
4. Full active timestamp pinning across every DynamoDB multi-step flow
96+
97+
## Current Defaults
98+
99+
The compactor currently uses:
100+
101+
1. Interval: `30s`
102+
2. Retention window: `30m`
103+
3. Per-run timeout: `5s`
104+
105+
These are code defaults and can be tuned later if we expose flags or config.
106+
107+
## Tradeoffs
108+
109+
### Pros
110+
111+
1. MVCC history no longer grows forever in the default in-memory FSM path.
112+
2. Ongoing Redis transactions and scripted operations are protected.
113+
3. Historical reads fail explicitly once compacted away.
114+
115+
### Cons
116+
117+
1. Arbitrary old raw timestamp reads are no longer guaranteed forever.
118+
2. DynamoDB paths currently rely mostly on the retention window rather than
119+
explicit timestamp pinning.
120+
3. Transaction metadata keys still need a dedicated GC pass.
121+
122+
## Follow-Up Work
123+
124+
1. Add compaction metrics and visibility in the monitoring registry.
125+
2. Add transaction-metadata GC for `!txn|cmt|...` and `!txn|rb|...` keys.
126+
3. Extend active timestamp pinning to the remaining long DynamoDB flows.
127+
4. Reuse the same contract when `PebbleStore` becomes the primary FSM store.

0 commit comments

Comments
 (0)