Skip to content

Commit f1d04ba

Browse files
committed
adapter: use structured redis txn errors
1 parent 939e785 commit f1d04ba

File tree

9 files changed

+104
-53
lines changed

9 files changed

+104
-53
lines changed

adapter/redis_lua_context.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -511,9 +511,6 @@ func (c *luaScriptContext) materializeList(key []byte, st *luaListState) error {
511511
func (c *luaScriptContext) materializedListForRead(key string) (*luaListState, bool, error) {
512512
st, err := c.listState([]byte(key))
513513
if err != nil {
514-
if errors.Is(err, store.ErrKeyNotFound) {
515-
return nil, false, nil
516-
}
517514
return nil, false, err
518515
}
519516
if !st.exists || st.currentLen() == 0 {

adapter/redis_retry.go

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"context"
66
"math/rand/v2"
7-
"strings"
87
"time"
98

109
"github.com/bootjp/elastickv/kv"
@@ -105,46 +104,24 @@ func (r *RedisServer) retryRedisWrite(ctx context.Context, fn func() error) erro
105104
}
106105

107106
func normalizeRetryableRedisTxnErr(err error) error {
108-
key, ok := extractRetryableRedisTxnKey(err.Error())
109-
if !ok {
110-
return err
111-
}
112-
logicalKey := normalizeRetryableRedisTxnKey(key)
113-
if len(logicalKey) == 0 || bytes.Equal(logicalKey, key) {
114-
return err
115-
}
116-
117-
switch {
118-
case errors.Is(err, kv.ErrTxnLocked):
119-
return errors.Wrapf(kv.ErrTxnLocked, "key: %s", string(logicalKey))
120-
case errors.Is(err, store.ErrWriteConflict):
121-
return errors.Wrapf(store.ErrWriteConflict, "key: %s", string(logicalKey))
122-
default:
123-
return err
124-
}
125-
}
126-
127-
func extractRetryableRedisTxnKey(msg string) ([]byte, bool) {
128-
const marker = "key: "
129-
idx := strings.Index(msg, marker)
130-
if idx < 0 {
131-
return nil, false
132-
}
133-
start := idx + len(marker)
134-
end := len(msg)
135-
for _, suffix := range []string{
136-
": txn locked",
137-
": write conflict",
138-
" (timestamp overflow)",
139-
} {
140-
if pos := strings.Index(msg[start:], suffix); pos >= 0 && start+pos < end {
141-
end = start + pos
107+
if key, detail, ok := kv.TxnLockedDetails(err); ok {
108+
logicalKey := normalizeRetryableRedisTxnKey(key)
109+
if len(logicalKey) == 0 || bytes.Equal(logicalKey, key) {
110+
return err
111+
}
112+
if detail != "" {
113+
return errors.WithStack(kv.NewTxnLockedErrorWithDetail(logicalKey, detail))
142114
}
115+
return errors.WithStack(kv.NewTxnLockedError(logicalKey))
143116
}
144-
if start >= end {
145-
return nil, false
117+
if key, ok := store.WriteConflictKey(err); ok {
118+
logicalKey := normalizeRetryableRedisTxnKey(key)
119+
if len(logicalKey) == 0 || bytes.Equal(logicalKey, key) {
120+
return err
121+
}
122+
return errors.WithStack(store.NewWriteConflictError(logicalKey))
146123
}
147-
return []byte(msg[start:end]), true
124+
return err
148125
}
149126

150127
func normalizeRetryableRedisTxnKey(key []byte) []byte {

adapter/redis_retry_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func TestNormalizeRetryableRedisTxnErrListKey(t *testing.T) {
235235
t.Parallel()
236236

237237
internalKey := store.ListItemKey([]byte("retry:list"), 1)
238-
err := errors.Wrapf(kv.ErrTxnLocked, "key: %s", string(internalKey))
238+
err := kv.NewTxnLockedError(internalKey)
239239

240240
normalized := normalizeRetryableRedisTxnErr(err)
241241

@@ -249,7 +249,7 @@ func TestNormalizeRetryableRedisTxnErrTxnTTLKey(t *testing.T) {
249249

250250
internalKey := append([]byte("!txn|cmt|"), redisTTLKey([]byte("retry:ttl"))...)
251251
internalKey = append(internalKey, make([]byte, 8)...)
252-
err := errors.Wrapf(store.ErrWriteConflict, "key: %s", string(internalKey))
252+
err := store.NewWriteConflictError(internalKey)
253253

254254
normalized := normalizeRetryableRedisTxnErr(err)
255255

@@ -259,6 +259,20 @@ func TestNormalizeRetryableRedisTxnErrTxnTTLKey(t *testing.T) {
259259
require.NotContains(t, normalized.Error(), redisTTLPrefix)
260260
}
261261

262+
func TestNormalizeRetryableRedisTxnErrPreservesTxnLockedDetail(t *testing.T) {
263+
t.Parallel()
264+
265+
internalKey := store.ListItemKey([]byte("retry:list"), 2)
266+
err := errors.WithStack(kv.NewTxnLockedErrorWithDetail(internalKey, "timestamp overflow"))
267+
268+
normalized := normalizeRetryableRedisTxnErr(err)
269+
270+
require.ErrorIs(t, normalized, kv.ErrTxnLocked)
271+
require.ErrorContains(t, normalized, "key: retry:list")
272+
require.ErrorContains(t, normalized, "timestamp overflow")
273+
require.NotContains(t, normalized.Error(), store.ListItemPrefix)
274+
}
275+
262276
func TestRetryPolicyForRedisTxnErr(t *testing.T) {
263277
t.Parallel()
264278

kv/fsm.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (f *kvFSM) validateConflicts(ctx context.Context, muts []*pb.Mutation, star
197197
return errors.WithStack(err)
198198
}
199199
if exists && latest > startTS {
200-
return errors.Wrapf(store.ErrWriteConflict, "key: %s", string(mut.Key))
200+
return errors.WithStack(store.NewWriteConflictError(mut.Key))
201201
}
202202
}
203203
return nil
@@ -543,7 +543,7 @@ func (f *kvFSM) commitTxnKeyMutations(ctx context.Context, key, primaryKey []byt
543543
return nil, nil
544544
}
545545
if lock.StartTS != startTS {
546-
return nil, errors.Wrapf(ErrTxnLocked, "key: %s", string(key))
546+
return nil, NewTxnLockedError(key)
547547
}
548548
if !bytes.Equal(lock.PrimaryKey, primaryKey) {
549549
return nil, errors.Wrapf(ErrTxnInvalidMeta, "lock primary_key mismatch for key %s", string(key))
@@ -605,7 +605,7 @@ func (f *kvFSM) assertNoConflictingTxnLock(ctx context.Context, key []byte, star
605605
if startTS != 0 && lock.StartTS == startTS {
606606
return nil
607607
}
608-
return errors.Wrapf(ErrTxnLocked, "key: %s", string(key))
608+
return NewTxnLockedError(key)
609609
}
610610

611611
func toStoreMutations(muts []*pb.Mutation) ([]*store.KVPairMutation, error) {

kv/shard_store.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -506,11 +506,11 @@ func (s *ShardStore) resolveTxnLockForKey(ctx context.Context, g *ShardGroup, ke
506506
// Defensive check: While uint64 overflow is not expected in normal operation,
507507
// this handles the edge case where startTS==^uint64(0) or a bug causes overflow.
508508
// Prevents violating the FSM invariant resolveTS > startTS (fsm.go:258).
509-
return errors.Wrapf(ErrTxnLocked, "key: %s (timestamp overflow)", string(key))
509+
return NewTxnLockedErrorWithDetail(key, "timestamp overflow")
510510
}
511511
return applyTxnResolution(g, pb.Phase_ABORT, lock.StartTS, abortTS, lock.PrimaryKey, [][]byte{key})
512512
case txnStatusPending:
513-
return errors.Wrapf(ErrTxnLocked, "key: %s", string(key))
513+
return NewTxnLockedError(key)
514514
default:
515515
return errors.Wrapf(ErrTxnInvalidMeta, "unknown txn status for key %s", string(key))
516516
}
@@ -669,13 +669,13 @@ func (s *ShardStore) cachedLockTxnStatus(ctx context.Context, plan *scanLockPlan
669669
func lockResolutionForStatus(state lockTxnStatus, lock txnLock, key []byte, cleanupNow uint64) (pb.Phase, uint64, error) {
670670
switch state.status {
671671
case txnStatusPending:
672-
return pb.Phase_NONE, 0, errors.Wrapf(ErrTxnLocked, "key: %s", string(key))
672+
return pb.Phase_NONE, 0, NewTxnLockedError(key)
673673
case txnStatusCommitted:
674674
return pb.Phase_COMMIT, state.commitTS, nil
675675
case txnStatusRolledBack:
676676
abortTS := cleanupTSWithNow(lock.StartTS, cleanupNow)
677677
if abortTS <= lock.StartTS {
678-
return pb.Phase_NONE, 0, errors.Wrapf(ErrTxnLocked, "key: %s (timestamp overflow)", string(key))
678+
return pb.Phase_NONE, 0, NewTxnLockedErrorWithDetail(key, "timestamp overflow")
679679
}
680680
return pb.Phase_ABORT, abortTS, nil
681681
default:

kv/txn_errors.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package kv
22

3-
import "github.com/cockroachdb/errors"
3+
import (
4+
"bytes"
5+
"fmt"
6+
7+
"github.com/cockroachdb/errors"
8+
)
49

510
var (
611
ErrTxnMetaMissing = errors.New("txn meta missing")
@@ -10,3 +15,35 @@ var (
1015
ErrTxnAlreadyCommitted = errors.New("txn already committed")
1116
ErrTxnPrimaryKeyRequired = errors.New("txn primary key required")
1217
)
18+
19+
type TxnLockedError struct {
20+
key []byte
21+
detail string
22+
}
23+
24+
func NewTxnLockedError(key []byte) error {
25+
return &TxnLockedError{key: bytes.Clone(key)}
26+
}
27+
28+
func NewTxnLockedErrorWithDetail(key []byte, detail string) error {
29+
return &TxnLockedError{key: bytes.Clone(key), detail: detail}
30+
}
31+
32+
func TxnLockedDetails(err error) ([]byte, string, bool) {
33+
var lockedErr *TxnLockedError
34+
if !errors.As(err, &lockedErr) {
35+
return nil, "", false
36+
}
37+
return bytes.Clone(lockedErr.key), lockedErr.detail, true
38+
}
39+
40+
func (e *TxnLockedError) Error() string {
41+
if e.detail != "" {
42+
return fmt.Sprintf("key: %s (%s): %s", string(e.key), e.detail, ErrTxnLocked)
43+
}
44+
return fmt.Sprintf("key: %s: %s", string(e.key), ErrTxnLocked)
45+
}
46+
47+
func (e *TxnLockedError) Unwrap() error {
48+
return ErrTxnLocked
49+
}

store/lsm_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ func (s *pebbleStore) checkConflicts(ctx context.Context, mutations []*KVPairMut
560560
return err
561561
}
562562
if exists && ts > startTS {
563-
return errors.Wrapf(ErrWriteConflict, "key: %s", string(mut.Key))
563+
return NewWriteConflictError(mut.Key)
564564
}
565565
}
566566
return nil

store/mvcc_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ func (s *mvccStore) ApplyMutations(ctx context.Context, mutations []*KVPairMutat
447447

448448
for _, mut := range mutations {
449449
if latestVer, ok := s.latestVersionLocked(mut.Key); ok && latestVer.TS > startTS {
450-
return errors.Wrapf(ErrWriteConflict, "key: %s", string(mut.Key))
450+
return NewWriteConflictError(mut.Key)
451451
}
452452
}
453453

store/store.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package store
22

33
import (
4+
"bytes"
45
"context"
6+
"fmt"
57
"io"
68

79
"github.com/cockroachdb/errors"
@@ -17,6 +19,30 @@ var ErrReadTSCompacted = errors.New("read timestamp has been compacted")
1719
var ErrSnapshotKeyTooLarge = errors.New("mvcc snapshot key too large")
1820
var ErrSnapshotVersionCountTooLarge = errors.New("mvcc snapshot version count too large")
1921

22+
type WriteConflictError struct {
23+
key []byte
24+
}
25+
26+
func NewWriteConflictError(key []byte) error {
27+
return &WriteConflictError{key: bytes.Clone(key)}
28+
}
29+
30+
func WriteConflictKey(err error) ([]byte, bool) {
31+
var conflictErr *WriteConflictError
32+
if !errors.As(err, &conflictErr) {
33+
return nil, false
34+
}
35+
return bytes.Clone(conflictErr.key), true
36+
}
37+
38+
func (e *WriteConflictError) Error() string {
39+
return fmt.Sprintf("key: %s: %s", string(e.key), ErrWriteConflict)
40+
}
41+
42+
func (e *WriteConflictError) Unwrap() error {
43+
return ErrWriteConflict
44+
}
45+
2046
type KVPair struct {
2147
Key []byte
2248
Value []byte

0 commit comments

Comments
 (0)