Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions adapter/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Jille/raft-grpc-leader-rpc/leaderhealth"
transport "github.com/Jille/raft-grpc-transport"
"github.com/Jille/raftadmin"
internalutil "github.com/bootjp/elastickv/internal"
"github.com/bootjp/elastickv/kv"
pb "github.com/bootjp/elastickv/proto"
"github.com/bootjp/elastickv/store"
Expand All @@ -23,7 +24,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func shutdown(nodes []Node) {
Expand Down Expand Up @@ -350,7 +350,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) (
r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg, electionTimeout)
assert.NoError(t, err)

s := grpc.NewServer()
s := grpc.NewServer(internalutil.GRPCServerOptions()...)
trx := kv.NewTransaction(r)
coordinator := kv.NewCoordinator(trx, r)
relay := NewRedisPubSubRelay()
Expand Down Expand Up @@ -416,9 +416,7 @@ func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg ra
Level: hclog.LevelFromString("WARN"),
})

tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
})
tm := transport.New(raft.ServerAddress(myAddress), internalutil.GRPCDialOptions())

r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport())
if err != nil {
Expand Down
9 changes: 3 additions & 6 deletions cmd/server/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/hashicorp/raft"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var (
Expand Down Expand Up @@ -206,7 +205,7 @@ func joinCluster(ctx context.Context, nodes []config) error {
}

// Connect to leader
conn, err := grpc.NewClient(leader.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(leader.address, internalutil.GRPCDialOptions()...)
if err != nil {
return fmt.Errorf("failed to dial leader: %w", err)
}
Expand Down Expand Up @@ -322,7 +321,7 @@ func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotSto
}

func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate, distServer *adapter.DistributionServer, relay *adapter.RedisPubSubRelay) (*grpc.Server, *adapter.GRPCServer) {
s := grpc.NewServer()
s := grpc.NewServer(internalutil.GRPCServerOptions()...)
trx := kv.NewTransaction(r)
routedStore := kv.NewLeaderRoutedStore(st, coordinator)
gs := adapter.NewGRPCServer(routedStore, coordinator, adapter.WithCloseStore())
Expand Down Expand Up @@ -381,9 +380,7 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
})

// Transport
tm := transport.New(raft.ServerAddress(cfg.address), []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
})
tm := transport.New(raft.ServerAddress(cfg.address), internalutil.GRPCDialOptions())

r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport())
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions internal/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package internal

import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const GRPCMaxMessageBytes = 64 << 20

// GRPCServerOptions keeps Raft replication and the public/internal APIs aligned
// on the same message-size budget.
func GRPCServerOptions() []grpc.ServerOption {
return []grpc.ServerOption{
grpc.MaxRecvMsgSize(GRPCMaxMessageBytes),
grpc.MaxSendMsgSize(GRPCMaxMessageBytes),
}
}

// GRPCDialOptions returns the common insecure dial options used by node-local
// and node-to-node traffic.
func GRPCDialOptions() []grpc.DialOption {
return []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(GRPCMaxMessageBytes),
grpc.MaxCallSendMsgSize(GRPCMaxMessageBytes),
),
}
}
8 changes: 4 additions & 4 deletions kv/grpc_conn_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package kv
import (
"sync"

internalutil "github.com/bootjp/elastickv/internal"
"github.com/cockroachdb/errors"
"github.com/hashicorp/raft"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
)

// GRPCConnCache reuses gRPC connections per address. gRPC itself handles
Expand Down Expand Up @@ -75,9 +75,9 @@ func (c *GRPCConnCache) ConnFor(addr raft.ServerAddress) (*grpc.ClientConn, erro
return conn, nil
}

conn, err := grpc.NewClient(string(addr),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
conn, err := grpc.NewClient(
string(addr),
append(internalutil.GRPCDialOptions(), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)))...,
)
if err != nil {
return nil, errors.WithStack(err)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func raftMonitorRuntimes(runtimes []*raftGroupRuntime) []monitoring.RaftRuntime

func startRaftServers(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime, shardStore *kv.ShardStore, coordinate kv.Coordinator, distServer *adapter.DistributionServer, relay *adapter.RedisPubSubRelay) error {
for _, rt := range runtimes {
gs := grpc.NewServer()
gs := grpc.NewServer(internalutil.GRPCServerOptions()...)
trx := kv.NewTransaction(rt.raft)
grpcSvc := adapter.NewGRPCServer(shardStore, coordinate)
pb.RegisterRawKVServer(gs, grpcSvc)
Expand Down
7 changes: 2 additions & 5 deletions multiraft_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (
"time"

transport "github.com/Jille/raft-grpc-transport"
internalutil "github.com/bootjp/elastickv/internal"
"github.com/bootjp/elastickv/internal/raftstore"
"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
"github.com/hashicorp/raft"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type raftGroupRuntime struct {
Expand Down Expand Up @@ -117,9 +116,7 @@ func newRaftGroup(raftID string, group groupSpec, baseDir string, multi bool, bo
return nil, nil, nil, errors.WithStack(err)
}

tm = transport.New(raft.ServerAddress(group.address), []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
})
tm = transport.New(raft.ServerAddress(group.address), internalutil.GRPCDialOptions())

r, err := raft.NewRaft(c, fsm, raftStore, raftStore, fss, tm.Transport())
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions proxy/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
defaultDialTimeout = 5 * time.Second
defaultReadTimeout = 3 * time.Second
defaultWriteTimeout = 3 * time.Second
respProtocolV2 = 2
)

// Backend abstracts a Redis-protocol endpoint (real Redis or ElasticKV).
Expand Down Expand Up @@ -72,6 +73,7 @@ func NewRedisBackendWithOptions(addr string, name string, opts BackendOptions) *
Addr: addr,
DB: opts.DB,
Password: opts.Password,
Protocol: respProtocolV2,
PoolSize: opts.PoolSize,
DialTimeout: opts.DialTimeout,
ReadTimeout: opts.ReadTimeout,
Expand All @@ -85,6 +87,13 @@ func (b *RedisBackend) Do(ctx context.Context, args ...any) *redis.Cmd {
return b.client.Do(ctx, args...)
}

// DoWithTimeout executes a command using a per-call socket timeout override.
// This is used for blocking commands whose wait time exceeds the backend's
// default read timeout.
func (b *RedisBackend) DoWithTimeout(ctx context.Context, timeout time.Duration, args ...any) *redis.Cmd {
return b.client.WithTimeout(timeout).Do(ctx, args...)
}

func (b *RedisBackend) Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error) {
pipe := b.client.Pipeline()
results := make([]*redis.Cmd, len(cmds))
Expand Down
54 changes: 54 additions & 0 deletions proxy/blocking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package proxy

import (
"context"
"strconv"
"strings"
"time"

"github.com/redis/go-redis/v9"
)

const blockingMultiPopMinArgs = 2

type blockingTimeoutBackend interface {
DoWithTimeout(ctx context.Context, timeout time.Duration, args ...any) *redis.Cmd
}

func blockingCommandTimeout(cmd string, args [][]byte) time.Duration {
switch strings.ToUpper(cmd) {
case "BLPOP", "BRPOP", "BRPOPLPUSH", "BLMOVE", "BZPOPMIN", "BZPOPMAX":
if len(args) == 0 {
return 0
}
return parseBlockingSecondsArg(args[len(args)-1])
case "BLMPOP":
if len(args) < blockingMultiPopMinArgs {
return 0
}
return parseBlockingSecondsArg(args[1])
case "XREAD", "XREADGROUP":
for i := 1; i+1 < len(args); i++ {
if strings.EqualFold(string(args[i]), "BLOCK") {
return parseBlockingMillisecondsArg(args[i+1])
}
}
}
return 0
}

func parseBlockingSecondsArg(raw []byte) time.Duration {
seconds, err := strconv.ParseFloat(string(raw), 64)
if err != nil || seconds < 0 {
return 0
}
return time.Duration(seconds * float64(time.Second))
}

func parseBlockingMillisecondsArg(raw []byte) time.Duration {
millis, err := strconv.ParseInt(string(raw), 10, 64)
if err != nil || millis < 0 {
return 0
}
return time.Duration(millis) * time.Millisecond
}
26 changes: 22 additions & 4 deletions proxy/dualwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ type DualWriter struct {
writeSem chan struct{} // bounds concurrent secondary write goroutines
shadowSem chan struct{} // bounds concurrent shadow read goroutines

wg sync.WaitGroup
mu sync.Mutex // protects closed; held briefly to make wg.Add atomic with close check
closed bool
wg sync.WaitGroup
mu sync.Mutex // protects closed; held briefly to make wg.Add atomic with close check
closed bool
scriptMu sync.RWMutex
scripts map[string]string
// scriptOrder tracks insertion order for FIFO eviction of the bounded script cache.
scriptOrder []string
}

// NewDualWriter creates a DualWriter with the given backends.
Expand All @@ -48,6 +52,7 @@ func NewDualWriter(primary, secondary Backend, cfg ProxyConfig, metrics *ProxyMe
logger: logger,
writeSem: make(chan struct{}, maxWriteGoroutines),
shadowSem: make(chan struct{}, maxShadowGoroutines),
scripts: make(map[string]string),
}

if cfg.Mode == ModeDualWriteShadow || cfg.Mode == ModeElasticKVPrimary {
Expand Down Expand Up @@ -133,9 +138,15 @@ func (d *DualWriter) Read(ctx context.Context, cmd string, args [][]byte) (any,
// cmd must be the pre-uppercased command name.
func (d *DualWriter) Blocking(ctx context.Context, cmd string, args [][]byte) (any, error) {
iArgs := bytesArgsToInterfaces(args)
timeout := blockingCommandTimeout(cmd, args)

start := time.Now()
result := d.primary.Do(ctx, iArgs...)
var result *redis.Cmd
if blockingBackend, ok := d.primary.(blockingTimeoutBackend); ok {
result = blockingBackend.DoWithTimeout(ctx, timeout, iArgs...)
} else {
result = d.primary.Do(ctx, iArgs...)
}
resp, err := result.Result()
d.metrics.CommandDuration.WithLabelValues(cmd, d.primary.Name()).Observe(time.Since(start).Seconds())

Expand Down Expand Up @@ -190,6 +201,7 @@ func (d *DualWriter) Script(ctx context.Context, cmd string, args [][]byte) (any
return nil, fmt.Errorf("primary script %s: %w", cmd, err)
}
d.metrics.CommandTotal.WithLabelValues(cmd, d.primary.Name(), "ok").Inc()
d.rememberScript(cmd, args)

if d.hasSecondaryWrite() {
d.goWrite(func() { d.writeSecondary(cmd, iArgs) })
Expand All @@ -205,6 +217,12 @@ func (d *DualWriter) writeSecondary(cmd string, iArgs []any) {
start := time.Now()
result := d.secondary.Do(sCtx, iArgs...)
_, sErr := result.Result()
if isNoScriptError(sErr) {
if fallbackArgs, ok := d.evalFallbackArgs(cmd, iArgs); ok {
result = d.secondary.Do(sCtx, fallbackArgs...)
_, sErr = result.Result()
}
}
d.metrics.CommandDuration.WithLabelValues(cmd, d.secondary.Name()).Observe(time.Since(start).Seconds())

if sErr != nil && !errors.Is(sErr, redis.Nil) {
Expand Down
Loading
Loading