Skip to content

Commit 8d98c59

Browse files
authored
Merge pull request #351 from bootjp/feature/redis-proxy
Add redis-proxy: dual-write Redis proxy with pub/sub forwarding
2 parents 2c48837 + 1cff1e8 commit 8d98c59

22 files changed

+5445
-6
lines changed

adapter/add_voter_join_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func startAddVoterJoinNode(
182182
return err
183183
})
184184

185-
rd := NewRedisServer(lis.redis, port.redisAddress, st, coordinator, leaderRedisMap, relay)
185+
rd := NewRedisServer(lis.redis, port.redisAddress, routedStore, coordinator, leaderRedisMap, relay)
186186
workers.Go(func() error {
187187
err := rd.Run()
188188
if errors.Is(err, net.ErrClosed) {

adapter/redis_server_setup_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package adapter
2+
3+
import (
4+
"testing"
5+
6+
"github.com/bootjp/elastickv/kv"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestRedisServerUsesLeaderRoutedStoreInSingleGroupCluster(t *testing.T) {
11+
t.Parallel()
12+
13+
nodes, _, _ := createNode(t, 3)
14+
defer shutdown(nodes)
15+
16+
require.IsType(t, &kv.LeaderRoutedStore{}, nodes[0].redisServer.store)
17+
require.IsType(t, &kv.LeaderRoutedStore{}, nodes[1].redisServer.store)
18+
require.IsType(t, &kv.LeaderRoutedStore{}, nodes[2].redisServer.store)
19+
}

adapter/test_util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) (
370370
assert.NoError(t, srv.Serve(lis))
371371
}(s, grpcSock)
372372

373-
rd := NewRedisServer(redisSock, port.redisAddress, st, coordinator, leaderRedisMap, relay)
373+
rd := NewRedisServer(redisSock, port.redisAddress, routedStore, coordinator, leaderRedisMap, relay)
374374
go func(server *RedisServer) {
375375
assert.NoError(t, server.Run())
376376
}(rd)

cmd/redis-proxy/main.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"log/slog"
8+
"net"
9+
"net/http"
10+
"os"
11+
"os/signal"
12+
"syscall"
13+
"time"
14+
15+
"github.com/bootjp/elastickv/proxy"
16+
"github.com/prometheus/client_golang/prometheus"
17+
"github.com/prometheus/client_golang/prometheus/promhttp"
18+
)
19+
20+
const (
21+
sentryFlushTimeout = 2 * time.Second
22+
metricsShutdownTimeout = 5 * time.Second
23+
)
24+
25+
func main() {
26+
if err := run(); err != nil {
27+
fmt.Fprintf(os.Stderr, "error: %v\n", err)
28+
os.Exit(1)
29+
}
30+
}
31+
32+
func run() error {
33+
cfg := proxy.DefaultConfig()
34+
var modeStr string
35+
36+
flag.StringVar(&cfg.ListenAddr, "listen", cfg.ListenAddr, "Proxy listen address")
37+
flag.StringVar(&cfg.PrimaryAddr, "primary", cfg.PrimaryAddr, "Primary (Redis) address")
38+
flag.IntVar(&cfg.PrimaryDB, "primary-db", cfg.PrimaryDB, "Primary Redis DB number")
39+
flag.StringVar(&cfg.PrimaryPassword, "primary-password", cfg.PrimaryPassword, "Primary Redis password")
40+
flag.StringVar(&cfg.SecondaryAddr, "secondary", cfg.SecondaryAddr, "Secondary (ElasticKV) address")
41+
flag.IntVar(&cfg.SecondaryDB, "secondary-db", cfg.SecondaryDB, "Secondary Redis DB number")
42+
flag.StringVar(&cfg.SecondaryPassword, "secondary-password", cfg.SecondaryPassword, "Secondary Redis password")
43+
flag.StringVar(&modeStr, "mode", "dual-write", "Proxy mode: redis-only, dual-write, dual-write-shadow, elastickv-primary, elastickv-only")
44+
flag.DurationVar(&cfg.SecondaryTimeout, "secondary-timeout", cfg.SecondaryTimeout, "Secondary write timeout")
45+
flag.DurationVar(&cfg.ShadowTimeout, "shadow-timeout", cfg.ShadowTimeout, "Shadow read timeout")
46+
flag.StringVar(&cfg.SentryDSN, "sentry-dsn", cfg.SentryDSN, "Sentry DSN (empty = disabled)")
47+
flag.StringVar(&cfg.SentryEnv, "sentry-env", cfg.SentryEnv, "Sentry environment")
48+
flag.Float64Var(&cfg.SentrySampleRate, "sentry-sample", cfg.SentrySampleRate, "Sentry sample rate")
49+
flag.StringVar(&cfg.MetricsAddr, "metrics", cfg.MetricsAddr, "Prometheus metrics address")
50+
flag.Parse()
51+
52+
mode, ok := proxy.ParseProxyMode(modeStr)
53+
if !ok {
54+
return fmt.Errorf("unknown mode: %s", modeStr)
55+
}
56+
cfg.Mode = mode
57+
58+
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
59+
60+
// Sentry
61+
sentryReporter := proxy.NewSentryReporter(cfg.SentryDSN, cfg.SentryEnv, cfg.SentrySampleRate, logger)
62+
defer sentryReporter.Flush(sentryFlushTimeout)
63+
64+
// Prometheus
65+
reg := prometheus.NewRegistry()
66+
metrics := proxy.NewProxyMetrics(reg)
67+
68+
// Backends
69+
primaryOpts := proxy.DefaultBackendOptions()
70+
primaryOpts.DB = cfg.PrimaryDB
71+
primaryOpts.Password = cfg.PrimaryPassword
72+
secondaryOpts := proxy.DefaultBackendOptions()
73+
secondaryOpts.DB = cfg.SecondaryDB
74+
secondaryOpts.Password = cfg.SecondaryPassword
75+
76+
var primary, secondary proxy.Backend
77+
switch cfg.Mode {
78+
case proxy.ModeElasticKVPrimary, proxy.ModeElasticKVOnly:
79+
primary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
80+
secondary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
81+
case proxy.ModeRedisOnly, proxy.ModeDualWrite, proxy.ModeDualWriteShadow:
82+
primary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
83+
secondary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
84+
}
85+
defer primary.Close()
86+
defer secondary.Close()
87+
88+
dual := proxy.NewDualWriter(primary, secondary, cfg, metrics, sentryReporter, logger)
89+
defer dual.Close() // wait for in-flight async goroutines
90+
srv := proxy.NewProxyServer(cfg, dual, metrics, sentryReporter, logger)
91+
92+
// Context for graceful shutdown
93+
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
94+
defer cancel()
95+
96+
// Start metrics server
97+
go func() {
98+
mux := http.NewServeMux()
99+
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
100+
var lc net.ListenConfig
101+
ln, err := lc.Listen(ctx, "tcp", cfg.MetricsAddr)
102+
if err != nil {
103+
logger.Error("metrics listen failed", "addr", cfg.MetricsAddr, "err", err)
104+
return
105+
}
106+
metricsSrv := &http.Server{Handler: mux, ReadHeaderTimeout: time.Second}
107+
go func() {
108+
<-ctx.Done()
109+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), metricsShutdownTimeout)
110+
defer shutdownCancel()
111+
if err := metricsSrv.Shutdown(shutdownCtx); err != nil {
112+
logger.Warn("metrics server shutdown error", "err", err)
113+
}
114+
}()
115+
logger.Info("metrics server starting", "addr", cfg.MetricsAddr)
116+
if err := metricsSrv.Serve(ln); err != nil && err != http.ErrServerClosed {
117+
logger.Error("metrics server error", "err", err)
118+
}
119+
}()
120+
121+
// Start proxy
122+
if err := srv.ListenAndServe(ctx); err != nil {
123+
return fmt.Errorf("proxy server: %w", err)
124+
}
125+
return nil
126+
}

cmd/server/demo.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,8 @@ func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, co
354354
if err != nil {
355355
return nil, errors.WithStack(err)
356356
}
357-
return adapter.NewRedisServer(l, redisAddr, st, coordinator, leaderRedis, relay), nil
357+
routedStore := kv.NewLeaderRoutedStore(st, coordinator)
358+
return adapter.NewRedisServer(l, redisAddr, routedStore, coordinator, leaderRedis, relay), nil
358359
}
359360

360361
func run(ctx context.Context, eg *errgroup.Group, cfg config) error {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
github.com/cockroachdb/errors v1.12.0
1818
github.com/cockroachdb/pebble v1.1.5
1919
github.com/emirpasic/gods v1.18.1
20+
github.com/getsentry/sentry-go v0.27.0
2021
github.com/hashicorp/go-hclog v1.6.3
2122
github.com/hashicorp/raft v1.7.3
2223
github.com/pkg/errors v0.9.1
@@ -58,7 +59,6 @@ require (
5859
github.com/davecgh/go-spew v1.1.1 // indirect
5960
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
6061
github.com/fatih/color v1.15.0 // indirect
61-
github.com/getsentry/sentry-go v0.27.0 // indirect
6262
github.com/gogo/protobuf v1.3.2 // indirect
6363
github.com/golang/protobuf v1.5.4 // indirect
6464
github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e // indirect

jepsen/src/elastickv/redis_workload.clj

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,13 @@
175175
:parse-fn #(Integer/parseInt %)]
176176
["-h" "--help"]])
177177

178+
(defn fail-on-invalid!
179+
"Raises when Jepsen completed analysis and found the history invalid."
180+
[result]
181+
(when (false? (:valid? result))
182+
(throw (ex-info "Jepsen analysis invalid" {:result result})))
183+
result)
184+
178185
(defn -main
179186
[& args]
180187
(let [{:keys [options errors summary]} (tools.cli/parse-opts args cli-opts)
@@ -211,5 +218,5 @@
211218
(seq errors) (binding [*out* *err*]
212219
(println "Error parsing options:" (str/join "; " errors)))
213220
(:local options) (binding [control/*dummy* true]
214-
(jepsen/run! (elastickv-redis-test options)))
215-
:else (jepsen/run! (elastickv-redis-test options)))))
221+
(fail-on-invalid! (jepsen/run! (elastickv-redis-test options))))
222+
:else (fail-on-invalid! (jepsen/run! (elastickv-redis-test options))))))
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
(ns elastickv.redis-workload-test
2+
(:require [clojure.test :refer :all]
3+
[elastickv.redis-workload :as workload]))
4+
5+
(deftest fail-on-invalid-passes-through-valid-results
6+
(let [result {:valid? true}]
7+
(is (= result (workload/fail-on-invalid! result)))))
8+
9+
(deftest fail-on-invalid-throws-for-invalid-results
10+
(is (thrown-with-msg?
11+
clojure.lang.ExceptionInfo
12+
#"Jepsen analysis invalid"
13+
(workload/fail-on-invalid! {:valid? false}))))

proxy/backend.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package proxy
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"github.com/redis/go-redis/v9"
10+
)
11+
12+
const (
13+
defaultPoolSize = 128
14+
defaultDialTimeout = 5 * time.Second
15+
defaultReadTimeout = 3 * time.Second
16+
defaultWriteTimeout = 3 * time.Second
17+
)
18+
19+
// Backend abstracts a Redis-protocol endpoint (real Redis or ElasticKV).
20+
type Backend interface {
21+
// Do sends a single command and returns its result.
22+
Do(ctx context.Context, args ...any) *redis.Cmd
23+
// Pipeline sends multiple commands in a pipeline.
24+
Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error)
25+
// Close releases the underlying connection.
26+
Close() error
27+
// Name identifies this backend for logging and metrics.
28+
Name() string
29+
}
30+
31+
// BackendOptions configures the underlying go-redis connection pool.
32+
type BackendOptions struct {
33+
DB int
34+
Password string
35+
PoolSize int
36+
DialTimeout time.Duration
37+
ReadTimeout time.Duration
38+
WriteTimeout time.Duration
39+
}
40+
41+
// DefaultBackendOptions returns reasonable defaults for a proxy backend.
42+
func DefaultBackendOptions() BackendOptions {
43+
return BackendOptions{
44+
PoolSize: defaultPoolSize,
45+
DialTimeout: defaultDialTimeout,
46+
ReadTimeout: defaultReadTimeout,
47+
WriteTimeout: defaultWriteTimeout,
48+
}
49+
}
50+
51+
// PubSubBackend is an optional interface for backends that support
52+
// creating dedicated PubSub connections.
53+
type PubSubBackend interface {
54+
NewPubSub(ctx context.Context) *redis.PubSub
55+
}
56+
57+
// RedisBackend connects to an upstream Redis instance via go-redis.
58+
type RedisBackend struct {
59+
client *redis.Client
60+
name string
61+
}
62+
63+
// NewRedisBackend creates a Backend targeting a Redis server with default pool options.
64+
func NewRedisBackend(addr string, name string) *RedisBackend {
65+
return NewRedisBackendWithOptions(addr, name, DefaultBackendOptions())
66+
}
67+
68+
// NewRedisBackendWithOptions creates a Backend with explicit pool configuration.
69+
func NewRedisBackendWithOptions(addr string, name string, opts BackendOptions) *RedisBackend {
70+
return &RedisBackend{
71+
client: redis.NewClient(&redis.Options{
72+
Addr: addr,
73+
DB: opts.DB,
74+
Password: opts.Password,
75+
PoolSize: opts.PoolSize,
76+
DialTimeout: opts.DialTimeout,
77+
ReadTimeout: opts.ReadTimeout,
78+
WriteTimeout: opts.WriteTimeout,
79+
}),
80+
name: name,
81+
}
82+
}
83+
84+
func (b *RedisBackend) Do(ctx context.Context, args ...any) *redis.Cmd {
85+
return b.client.Do(ctx, args...)
86+
}
87+
88+
func (b *RedisBackend) Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error) {
89+
pipe := b.client.Pipeline()
90+
results := make([]*redis.Cmd, len(cmds))
91+
for i, args := range cmds {
92+
results[i] = pipe.Do(ctx, args...)
93+
}
94+
_, err := pipe.Exec(ctx)
95+
if err != nil {
96+
// go-redis pipelines return redis.Error for Redis reply errors (e.g., EXECABORT).
97+
// Return results with nil error so callers can read per-command results (especially EXEC).
98+
// Only propagate true transport/context errors.
99+
var redisErr redis.Error
100+
if errors.As(err, &redisErr) || errors.Is(err, redis.Nil) {
101+
return results, nil
102+
}
103+
return results, fmt.Errorf("pipeline exec: %w", err)
104+
}
105+
return results, nil
106+
}
107+
108+
func (b *RedisBackend) Close() error {
109+
if err := b.client.Close(); err != nil {
110+
return fmt.Errorf("close %s backend: %w", b.name, err)
111+
}
112+
return nil
113+
}
114+
115+
func (b *RedisBackend) Name() string {
116+
return b.name
117+
}
118+
119+
// NewPubSub creates a dedicated PubSub connection (not from the pool).
120+
func (b *RedisBackend) NewPubSub(ctx context.Context) *redis.PubSub {
121+
return b.client.Subscribe(ctx)
122+
}

0 commit comments

Comments
 (0)