Skip to content

Commit d6cd8f6

Browse files
authored
Merge branch 'main' into feature/compaction
2 parents 67fc954 + 0f2a871 commit d6cd8f6

File tree

12 files changed

+338
-53
lines changed

12 files changed

+338
-53
lines changed

docs/redis-proxy-deployment.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ ghcr.io/bootjp/elastickv/redis-proxy:latest
1111
ghcr.io/bootjp/elastickv/redis-proxy:sha-<commit>
1212
```
1313

14-
The CI workflow (`.github/workflows/redis-proxy-docker.yml`) builds the image automatically when files under `cmd/redis-proxy/`, `proxy/`, or `Dockerfile.redis-proxy` change.
14+
The CI workflow (`.github/workflows/redis-proxy-docker.yml`) builds the image automatically when files under `cmd/redis-proxy/`, `proxy/`, `Dockerfile.redis-proxy`, `go.mod`, `go.sum`, or the workflow file itself change.
1515

1616
### Building locally
1717

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
github.com/emirpasic/gods v1.18.1
2020
github.com/getsentry/sentry-go v0.27.0
2121
github.com/hashicorp/go-hclog v1.6.3
22+
github.com/hashicorp/go-msgpack/v2 v2.1.2
2223
github.com/hashicorp/raft v1.7.3
2324
github.com/pkg/errors v0.9.1
2425
github.com/prometheus/client_golang v1.23.2
@@ -66,7 +67,6 @@ require (
6667
github.com/hashicorp/errwrap v1.0.0 // indirect
6768
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
6869
github.com/hashicorp/go-metrics v0.5.4 // indirect
69-
github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect
7070
github.com/hashicorp/go-multierror v1.1.1 // indirect
7171
github.com/hashicorp/golang-lru v1.0.2 // indirect
7272
github.com/klauspost/compress v1.18.0 // indirect

internal/raftstore/migrate.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ func prepareMigrationPaths(logsPath, stablePath, destDir string) (string, error)
5757
return "", errors.New("destination dir is required")
5858
}
5959

60+
destDir = filepath.Clean(destDir)
61+
6062
if err := requireExistingFile(logsPath); err != nil {
6163
return "", err
6264
}

proxy/dualwrite.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type DualWriter struct {
3737
closed bool
3838
scriptMu sync.RWMutex
3939
scripts map[string]string
40+
// scriptOrder tracks insertion order for FIFO eviction of the bounded script cache.
41+
scriptOrder []string
4042
}
4143

4244
// NewDualWriter creates a DualWriter with the given backends.
@@ -193,13 +195,13 @@ func (d *DualWriter) Script(ctx context.Context, cmd string, args [][]byte) (any
193195
result := d.primary.Do(ctx, iArgs...)
194196
resp, err := result.Result()
195197
d.metrics.CommandDuration.WithLabelValues(cmd, d.primary.Name()).Observe(time.Since(start).Seconds())
196-
d.rememberScript(cmd, args)
197198

198199
if err != nil && !errors.Is(err, redis.Nil) {
199200
d.metrics.CommandTotal.WithLabelValues(cmd, d.primary.Name(), "error").Inc()
200201
return nil, fmt.Errorf("primary script %s: %w", cmd, err)
201202
}
202203
d.metrics.CommandTotal.WithLabelValues(cmd, d.primary.Name(), "ok").Inc()
204+
d.rememberScript(cmd, args)
203205

204206
if d.hasSecondaryWrite() {
205207
d.goWrite(func() { d.writeSecondary(cmd, iArgs) })

proxy/proxy.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,17 @@ func (p *ProxyServer) handleAdmin(conn redcon.Conn, name string, args [][]byte)
334334
// SELECT: accept only the configured DB; reject others since the proxy
335335
// uses a shared connection pool and cannot maintain per-client DB state.
336336
if name == "SELECT" {
337-
if len(args) > 1 && string(args[1]) != "0" && string(args[1]) != fmt.Sprintf("%d", p.cfg.PrimaryDB) {
338-
conn.WriteError(fmt.Sprintf("ERR proxy does not support SELECT %s (configured DB: %d)", string(args[1]), p.cfg.PrimaryDB))
337+
// Redis arity: SELECT <db>. Require exactly one DB argument.
338+
if len(args) != 2 {
339+
conn.WriteError("ERR wrong number of arguments for 'select' command")
340+
return
341+
}
342+
343+
requestedDB := string(args[1])
344+
configuredDB := fmt.Sprintf("%d", p.cfg.PrimaryDB)
345+
346+
if requestedDB != configuredDB {
347+
conn.WriteError(fmt.Sprintf("ERR proxy does not support SELECT %s (configured DB: %d)", requestedDB, p.cfg.PrimaryDB))
339348
return
340349
}
341350
conn.WriteString("OK")

proxy/proxy_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package proxy
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"io"
78
"log/slog"
89
"sync"
@@ -829,6 +830,107 @@ func TestDualWriter_Script_CachesEvalForEvalSHAFallback(t *testing.T) {
829830
assert.InDelta(t, 0, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001)
830831
}
831832

833+
func TestDualWriter_Script_EvalSHARO_FallsBackToEvalRO(t *testing.T) {
834+
primary := newMockBackend("primary")
835+
primary.doFunc = makeCmd("OK", nil)
836+
837+
secondary := newMockBackend("secondary")
838+
script := "return KEYS[1]"
839+
sha := scriptSHA(script)
840+
var calls int
841+
secondary.doFunc = func(ctx context.Context, args ...any) *redis.Cmd {
842+
calls++
843+
cmd := redis.NewCmd(ctx, args...)
844+
switch calls {
845+
case 1:
846+
assert.Equal(t, []byte("EVALSHA_RO"), args[0])
847+
cmd.SetErr(testRedisErr("NOSCRIPT No matching script. Please use EVAL."))
848+
case 2:
849+
// Must fall back to EVAL_RO, not EVAL, to preserve read-only semantics.
850+
assert.Equal(t, []byte("EVAL_RO"), args[0])
851+
assert.Equal(t, []byte(script), args[1])
852+
cmd.SetVal("mykey")
853+
default:
854+
t.Fatalf("unexpected secondary call %d", calls)
855+
}
856+
return cmd
857+
}
858+
859+
metrics := newTestMetrics()
860+
d := NewDualWriter(primary, secondary, ProxyConfig{Mode: ModeRedisOnly, SecondaryTimeout: time.Second}, metrics, newTestSentry(), testLogger)
861+
862+
// Register the script body via EVAL_RO so the proxy can fall back.
863+
_, err := d.Script(context.Background(), "EVAL_RO", [][]byte{[]byte("EVAL_RO"), []byte(script), []byte("1"), []byte("mykey")})
864+
assert.NoError(t, err)
865+
866+
d.cfg.Mode = ModeDualWrite
867+
d.writeSecondary("EVALSHA_RO", []any{[]byte("EVALSHA_RO"), []byte(sha), []byte("1"), []byte("mykey")})
868+
869+
assert.Equal(t, 2, calls)
870+
assert.InDelta(t, 0, testutil.ToFloat64(metrics.SecondaryWriteErrors), 0.001)
871+
}
872+
873+
func TestDualWriter_Script_NoRememberOnPrimaryError(t *testing.T) {
874+
// Verify that a failed SCRIPT FLUSH on the primary does NOT clear the proxy
875+
// script cache, so that subsequent EVALSHA → EVAL fallbacks still work.
876+
primary := newMockBackend("primary")
877+
primary.doFunc = makeCmd(nil, testRedisErr("ERR flush failed"))
878+
879+
secondary := newMockBackend("secondary")
880+
secondary.doFunc = makeCmd("OK", nil)
881+
882+
metrics := newTestMetrics()
883+
d := NewDualWriter(primary, secondary, ProxyConfig{Mode: ModeRedisOnly, SecondaryTimeout: time.Second}, metrics, newTestSentry(), testLogger)
884+
885+
// Seed the script cache directly.
886+
script := "return 1"
887+
d.storeScript(script)
888+
sha := scriptSHA(script)
889+
_, cached := d.lookupScript(sha)
890+
assert.True(t, cached, "script should be cached before flush attempt")
891+
892+
// Attempt SCRIPT FLUSH — primary returns an error so the cache must be untouched.
893+
_, err := d.Script(context.Background(), "SCRIPT", [][]byte{[]byte("SCRIPT"), []byte("FLUSH")})
894+
assert.Error(t, err)
895+
896+
_, stillCached := d.lookupScript(sha)
897+
assert.True(t, stillCached, "cache must not be cleared when primary SCRIPT FLUSH fails")
898+
}
899+
900+
func TestDualWriter_ScriptCache_BoundedEviction(t *testing.T) {
901+
// Fill the cache beyond maxScriptCacheSize and verify it stays bounded.
902+
primary := newMockBackend("primary")
903+
primary.doFunc = makeCmd("OK", nil)
904+
905+
metrics := newTestMetrics()
906+
d := NewDualWriter(primary, nil, ProxyConfig{Mode: ModeRedisOnly, SecondaryTimeout: time.Second}, metrics, newTestSentry(), testLogger)
907+
908+
// Insert maxScriptCacheSize+10 unique scripts.
909+
total := maxScriptCacheSize + 10
910+
for i := range total {
911+
d.storeScript(fmt.Sprintf("return %d", i))
912+
}
913+
914+
d.scriptMu.RLock()
915+
size := len(d.scripts)
916+
d.scriptMu.RUnlock()
917+
918+
assert.Equal(t, maxScriptCacheSize, size, "cache must not exceed maxScriptCacheSize")
919+
920+
// The first 10 scripts (insertion order) must have been evicted.
921+
for i := range 10 {
922+
sha := scriptSHA(fmt.Sprintf("return %d", i))
923+
_, ok := d.lookupScript(sha)
924+
assert.False(t, ok, "script %d should have been evicted", i)
925+
}
926+
// The last maxScriptCacheSize scripts must still be present.
927+
for i := 10; i < total; i++ {
928+
sha := scriptSHA(fmt.Sprintf("return %d", i))
929+
_, ok := d.lookupScript(sha)
930+
assert.True(t, ok, "script %d should still be cached", i)
931+
}
932+
}
933+
832934
// ========== writeRedisValue tests ==========
833935

834936
// testRedisErr satisfies the redis.Error interface for testing.

proxy/pubsub.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"sort"
89
"strings"
910
"sync"
1011
"time"
@@ -41,7 +42,7 @@ const (
4142
// enabling the client to execute regular Redis commands without reconnecting.
4243
type pubsubSession struct {
4344
mu sync.Mutex // protects upstream, closed, and shadow; channelSet/patternSet/txn are goroutine-confined to commandLoop
44-
writeMu sync.Mutex // serializes writes to dconn; never held across state operations
45+
writeMu sync.Mutex // serializes writes to dconn; may be held while acquiring mu (lock ordering: writeMu then mu)
4546
dconn redcon.DetachedConn
4647
upstream *redis.PubSub // nil when not in pub/sub mode
4748
proxy *ProxyServer
@@ -332,8 +333,14 @@ func (s *pubsubSession) handleProxySpecialCommand(name string, args [][]byte) bo
332333
if name != "SELECT" {
333334
return false
334335
}
335-
if len(args) > 1 && string(args[1]) != "0" && string(args[1]) != fmt.Sprintf("%d", s.proxy.cfg.PrimaryDB) {
336-
s.writeError(fmt.Sprintf("ERR proxy does not support SELECT %s (configured DB: %d)", string(args[1]), s.proxy.cfg.PrimaryDB))
336+
// Enforce Redis arity: SELECT requires exactly one DB index argument.
337+
if len(args) != 2 {
338+
s.writeError("ERR wrong number of arguments for 'select' command")
339+
return true
340+
}
341+
dbStr := string(args[1])
342+
if dbStr != fmt.Sprintf("%d", s.proxy.cfg.PrimaryDB) {
343+
s.writeError(fmt.Sprintf("ERR proxy configured for DB %d, but SELECT %s requested", s.proxy.cfg.PrimaryDB, dbStr))
337344
return true
338345
}
339346
s.writeString("OK")
@@ -597,11 +604,13 @@ func (s *pubsubSession) writeUnsubAll(kind string, isPattern bool) {
597604
return
598605
}
599606

600-
// Collect names and pre-compute decreasing counts (state is goroutine-confined).
607+
// Collect names, sort for deterministic reply ordering, and pre-compute
608+
// decreasing counts (state is goroutine-confined).
601609
names := make([]string, 0, len(set))
602610
for n := range set {
603611
names = append(names, n)
604612
}
613+
sort.Strings(names)
605614
counts := make([]int, len(names))
606615
for i, n := range names {
607616
if isPattern {

proxy/pubsub_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,7 @@ func TestPubSub_WriteUnsubAll_PerChannelReplies(t *testing.T) {
198198
s.channelSet["ch2"] = struct{}{}
199199
s.patternSet["pat1"] = struct{}{}
200200

201-
s.mu.Lock()
202201
s.writeUnsubAll("unsubscribe", false)
203-
s.mu.Unlock()
204202

205203
writes := dconn.getWrites()
206204

@@ -487,6 +485,9 @@ func TestPubSub_SelectAuthSilentlyAccepted(t *testing.T) {
487485
t.Run(cmd, func(t *testing.T) {
488486
dconn := newMockDetachedConn()
489487
s := newTestSession(dconn)
488+
// handleProxySpecialCommand accesses s.proxy.cfg for SELECT; provide a
489+
// minimal ProxyServer so the SELECT path does not panic.
490+
s.proxy = &ProxyServer{cfg: ProxyConfig{PrimaryDB: 0}}
490491

491492
s.dispatchRegularCommand(cmd, [][]byte{[]byte(cmd), []byte("0")})
492493

proxy/script_cache.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,27 @@ import (
1010
)
1111

1212
const (
13-
cmdEval = "EVAL"
14-
cmdEvalSHA = "EVALSHA"
15-
cmdScript = "SCRIPT"
13+
cmdEval = "EVAL"
14+
cmdEvalRO = "EVAL_RO"
15+
cmdEvalSHA = "EVALSHA"
16+
cmdEvalSHARO = "EVALSHA_RO"
17+
cmdScript = "SCRIPT"
1618

1719
minScriptSubcommandArgs = 2
1820
scriptLoadArgIndex = 2
1921
minEvalSHAArgs = 2
22+
23+
// maxScriptCacheSize is the maximum number of scripts retained in the
24+
// proxy-side script cache. When the limit is reached, the oldest entry
25+
// (by insertion order) is evicted to prevent unbounded memory growth.
26+
maxScriptCacheSize = 512
2027
)
2128

2229
func (d *DualWriter) rememberScript(cmd string, args [][]byte) {
2330
upper := strings.ToUpper(cmd)
2431

2532
switch upper {
26-
case cmdEval, "EVAL_RO":
33+
case cmdEval, cmdEvalRO:
2734
if len(args) > 1 {
2835
d.storeScript(string(args[1]))
2936
}
@@ -47,13 +54,30 @@ func (d *DualWriter) storeScript(script string) {
4754

4855
d.scriptMu.Lock()
4956
defer d.scriptMu.Unlock()
57+
58+
if _, exists := d.scripts[sha]; !exists {
59+
// Evict the oldest entry when at capacity.
60+
// scriptOrder and scripts always have the same length, so the guard is
61+
// sufficient; the additional len(scriptOrder) check is not needed.
62+
if len(d.scripts) >= maxScriptCacheSize {
63+
oldest := d.scriptOrder[0]
64+
d.scriptOrder = d.scriptOrder[1:]
65+
delete(d.scripts, oldest)
66+
}
67+
// Append to the end so that eviction follows strict FIFO insertion order.
68+
// Re-storing an already-cached script does not move its position; the
69+
// entry retains its original eviction priority, which is acceptable for
70+
// this use-case because script bodies are immutable (same SHA ⇒ same body).
71+
d.scriptOrder = append(d.scriptOrder, sha)
72+
}
5073
d.scripts[sha] = script
5174
}
5275

5376
func (d *DualWriter) clearScripts() {
5477
d.scriptMu.Lock()
5578
defer d.scriptMu.Unlock()
5679
clear(d.scripts)
80+
d.scriptOrder = d.scriptOrder[:0]
5781
}
5882

5983
func (d *DualWriter) lookupScript(sha string) (string, bool) {
@@ -65,7 +89,7 @@ func (d *DualWriter) lookupScript(sha string) (string, bool) {
6589

6690
func (d *DualWriter) evalFallbackArgs(cmd string, iArgs []any) ([]any, bool) {
6791
upper := strings.ToUpper(cmd)
68-
if upper != cmdEvalSHA && upper != "EVALSHA_RO" {
92+
if upper != cmdEvalSHA && upper != cmdEvalSHARO {
6993
return nil, false
7094
}
7195
if len(iArgs) < minEvalSHAArgs {
@@ -78,8 +102,14 @@ func (d *DualWriter) evalFallbackArgs(cmd string, iArgs []any) ([]any, bool) {
78102
return nil, false
79103
}
80104

105+
// Preserve the read-only semantics: EVALSHA_RO falls back to EVAL_RO.
106+
fallbackCmd := cmdEval
107+
if upper == cmdEvalSHARO {
108+
fallbackCmd = cmdEvalRO
109+
}
110+
81111
fallback := make([]any, len(iArgs))
82-
fallback[0] = []byte(cmdEval)
112+
fallback[0] = []byte(fallbackCmd)
83113
fallback[1] = []byte(script)
84114
copy(fallback[2:], iArgs[2:])
85115
return fallback, true

0 commit comments

Comments
 (0)