Skip to content

Commit 8d7753a

Browse files
authored
Merge pull request #313 from bootjp/feature/multi-shard-transaction
Refactor transaction timestamp management for cross shard data
2 parents 1b134ed + 32dc5ef commit 8d7753a

30 files changed

+2884
-124
lines changed

adapter/grpc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (r *GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.Raw
8787

8888
v, err := r.store.GetAt(ctx, req.Key, readTS)
8989
if errors.Is(err, store.ErrKeyNotFound) {
90-
return &pb.RawGetResponse{Value: nil}, nil
90+
return &pb.RawGetResponse{Value: nil, Exists: false}, nil
9191
}
9292
if err != nil {
9393
return nil, errors.WithStack(err)
@@ -97,7 +97,7 @@ func (r *GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.Raw
9797
slog.String("key", string(req.Key)),
9898
slog.String("value", string(v)))
9999

100-
return &pb.RawGetResponse{Value: v}, nil
100+
return &pb.RawGetResponse{Value: v, Exists: true}, nil
101101
}
102102

103103
func (r *GRPCServer) RawLatestCommitTS(ctx context.Context, req *pb.RawLatestCommitTSRequest) (*pb.RawLatestCommitTSResponse, error) {

adapter/grpc_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,33 @@ func Test_value_can_be_deleted(t *testing.T) {
3636
resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key})
3737
assert.NoError(t, err, "Get RPC failed")
3838
assert.Nil(t, err)
39+
assert.True(t, resp.Exists)
3940
assert.Equal(t, want, resp.Value)
4041

4142
_, err = c.RawDelete(context.TODO(), &pb.RawDeleteRequest{Key: key})
4243
assert.NoError(t, err, "Delete RPC failed")
4344

44-
_, err = c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key})
45+
resp, err = c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key})
4546
assert.NoError(t, err, "Get RPC failed")
47+
assert.False(t, resp.Exists)
48+
}
49+
50+
func Test_grpc_raw_get_empty_value(t *testing.T) {
51+
t.Parallel()
52+
nodes, adders, _ := createNode(t, 3)
53+
c := rawKVClient(t, adders)
54+
defer shutdown(nodes)
55+
56+
key := []byte("empty-key")
57+
empty := []byte{}
58+
59+
_, err := c.RawPut(context.Background(), &pb.RawPutRequest{Key: key, Value: empty})
60+
assert.NoError(t, err, "Put RPC failed")
61+
62+
resp, err := c.RawGet(context.TODO(), &pb.RawGetRequest{Key: key})
63+
assert.NoError(t, err, "Get RPC failed")
64+
assert.True(t, resp.Exists)
65+
assert.Equal(t, 0, len(resp.Value))
4666
}
4767

4868
func Test_grpc_scan(t *testing.T) {

adapter/internal.go

Lines changed: 117 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package adapter
22

33
import (
4+
"bytes"
45
"context"
56

67
"github.com/bootjp/elastickv/kv"
@@ -29,13 +30,19 @@ var _ pb.InternalServer = (*Internal)(nil)
2930

3031
var ErrNotLeader = errors.New("not leader")
3132
var ErrLeaderNotFound = errors.New("leader not found")
33+
var ErrTxnTimestampOverflow = errors.New("txn timestamp overflow")
3234

3335
func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.ForwardResponse, error) {
3436
if i.raft.State() != raft.Leader {
3537
return nil, errors.WithStack(ErrNotLeader)
3638
}
3739

38-
i.stampTimestamps(req)
40+
if err := i.stampTimestamps(req); err != nil {
41+
return &pb.ForwardResponse{
42+
Success: false,
43+
CommitIndex: 0,
44+
}, errors.WithStack(err)
45+
}
3946

4047
r, err := i.transactionManager.Commit(req.Requests)
4148
if err != nil {
@@ -51,35 +58,125 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
5158
}, nil
5259
}
5360

54-
func (i *Internal) stampTimestamps(req *pb.ForwardRequest) {
61+
func (i *Internal) stampTimestamps(req *pb.ForwardRequest) error {
5562
if req == nil {
56-
return
63+
return nil
5764
}
5865
if req.IsTxn {
59-
var startTs uint64
60-
// All requests in a transaction must have the same timestamp.
61-
// Find a timestamp from the requests, or generate a new one if none exist.
62-
for _, r := range req.Requests {
63-
if r.Ts != 0 {
64-
startTs = r.Ts
65-
break
66-
}
66+
return i.stampTxnTimestamps(req.Requests)
67+
}
68+
69+
i.stampRawTimestamps(req.Requests)
70+
return nil
71+
}
72+
73+
func (i *Internal) stampRawTimestamps(reqs []*pb.Request) {
74+
for _, r := range reqs {
75+
if r == nil {
76+
continue
77+
}
78+
if r.Ts != 0 {
79+
continue
6780
}
81+
if i.clock == nil {
82+
r.Ts = 1
83+
continue
84+
}
85+
r.Ts = i.clock.Next()
86+
}
87+
}
6888

69-
if startTs == 0 && len(req.Requests) > 0 {
70-
startTs = i.clock.Next()
89+
func (i *Internal) stampTxnTimestamps(reqs []*pb.Request) error {
90+
startTS := forwardedTxnStartTS(reqs)
91+
if startTS == 0 {
92+
if i.clock == nil {
93+
startTS = 1
94+
} else {
95+
startTS = i.clock.Next()
7196
}
97+
}
98+
if startTS == ^uint64(0) {
99+
return errors.WithStack(ErrTxnTimestampOverflow)
100+
}
101+
102+
// Assign the unified timestamp to all requests in the transaction.
103+
for _, r := range reqs {
104+
if r != nil {
105+
r.Ts = startTS
106+
}
107+
}
108+
109+
return i.fillForwardedTxnCommitTS(reqs, startTS)
110+
}
72111

73-
// Assign the unified timestamp to all requests in the transaction.
74-
for _, r := range req.Requests {
75-
r.Ts = startTs
112+
func forwardedTxnStartTS(reqs []*pb.Request) uint64 {
113+
for _, r := range reqs {
114+
if r != nil && r.Ts != 0 {
115+
return r.Ts
76116
}
77-
return
117+
}
118+
return 0
119+
}
120+
121+
func forwardedTxnMetaMutation(r *pb.Request, metaPrefix []byte) (*pb.Mutation, bool) {
122+
if r == nil {
123+
return nil, false
124+
}
125+
if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT {
126+
return nil, false
127+
}
128+
if len(r.Mutations) == 0 || r.Mutations[0] == nil {
129+
return nil, false
130+
}
131+
if !bytes.HasPrefix(r.Mutations[0].Key, metaPrefix) {
132+
return nil, false
133+
}
134+
return r.Mutations[0], true
135+
}
136+
137+
func (i *Internal) fillForwardedTxnCommitTS(reqs []*pb.Request, startTS uint64) error {
138+
type metaToUpdate struct {
139+
m *pb.Mutation
140+
meta kv.TxnMeta
78141
}
79142

80-
for _, r := range req.Requests {
81-
if r.Ts == 0 {
82-
r.Ts = i.clock.Next()
143+
metaMutations := make([]metaToUpdate, 0, len(reqs))
144+
prefix := []byte(kv.TxnMetaPrefix)
145+
for _, r := range reqs {
146+
m, ok := forwardedTxnMetaMutation(r, prefix)
147+
if !ok {
148+
continue
83149
}
150+
meta, err := kv.DecodeTxnMeta(m.Value)
151+
if err != nil {
152+
continue
153+
}
154+
if meta.CommitTS != 0 {
155+
continue
156+
}
157+
metaMutations = append(metaMutations, metaToUpdate{m: m, meta: meta})
158+
}
159+
if len(metaMutations) == 0 {
160+
return nil
161+
}
162+
163+
commitTS := startTS + 1
164+
if commitTS == 0 {
165+
// Overflow: can't choose a commit timestamp strictly greater than startTS.
166+
return errors.WithStack(ErrTxnTimestampOverflow)
167+
}
168+
if i.clock != nil {
169+
i.clock.Observe(startTS)
170+
commitTS = i.clock.Next()
171+
}
172+
if commitTS <= startTS {
173+
// Defensive: avoid writing an invalid CommitTS.
174+
return errors.WithStack(ErrTxnTimestampOverflow)
175+
}
176+
177+
for _, item := range metaMutations {
178+
item.meta.CommitTS = commitTS
179+
item.m.Value = kv.EncodeTxnMeta(item.meta)
84180
}
181+
return nil
85182
}

adapter/internal_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package adapter
2+
3+
import (
4+
"testing"
5+
6+
"github.com/bootjp/elastickv/kv"
7+
pb "github.com/bootjp/elastickv/proto"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestStampTxnTimestamps_RejectsMaxStartTS(t *testing.T) {
12+
t.Parallel()
13+
14+
i := &Internal{}
15+
reqs := []*pb.Request{
16+
{
17+
IsTxn: true,
18+
Phase: pb.Phase_COMMIT,
19+
Ts: ^uint64(0),
20+
Mutations: []*pb.Mutation{
21+
{
22+
Op: pb.Op_PUT,
23+
Key: []byte(kv.TxnMetaPrefix),
24+
Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}),
25+
},
26+
},
27+
},
28+
}
29+
30+
err := i.stampTxnTimestamps(reqs)
31+
require.ErrorIs(t, err, ErrTxnTimestampOverflow)
32+
}
33+
34+
func TestFillForwardedTxnCommitTS_RejectsOverflow(t *testing.T) {
35+
t.Parallel()
36+
37+
i := &Internal{}
38+
reqs := []*pb.Request{
39+
{
40+
IsTxn: true,
41+
Phase: pb.Phase_COMMIT,
42+
Mutations: []*pb.Mutation{
43+
{
44+
Op: pb.Op_PUT,
45+
Key: []byte(kv.TxnMetaPrefix),
46+
Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}),
47+
},
48+
},
49+
},
50+
}
51+
52+
err := i.fillForwardedTxnCommitTS(reqs, ^uint64(0))
53+
require.ErrorIs(t, err, ErrTxnTimestampOverflow)
54+
}
55+
56+
func TestFillForwardedTxnCommitTS_AssignsCommitTS(t *testing.T) {
57+
t.Parallel()
58+
59+
i := &Internal{}
60+
startTS := uint64(10)
61+
reqs := []*pb.Request{
62+
{
63+
IsTxn: true,
64+
Phase: pb.Phase_COMMIT,
65+
Mutations: []*pb.Mutation{
66+
{
67+
Op: pb.Op_PUT,
68+
Key: []byte(kv.TxnMetaPrefix),
69+
Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}),
70+
},
71+
},
72+
},
73+
}
74+
75+
require.NoError(t, i.fillForwardedTxnCommitTS(reqs, startTS))
76+
77+
meta, err := kv.DecodeTxnMeta(reqs[0].Mutations[0].Value)
78+
require.NoError(t, err)
79+
require.Equal(t, startTS+1, meta.CommitTS)
80+
}
81+
82+
func TestFillForwardedTxnCommitTS_PreservesExistingCommitTS(t *testing.T) {
83+
t.Parallel()
84+
85+
i := &Internal{}
86+
reqs := []*pb.Request{
87+
{
88+
IsTxn: true,
89+
Phase: pb.Phase_COMMIT,
90+
Mutations: []*pb.Mutation{
91+
{
92+
Op: pb.Op_PUT,
93+
Key: []byte(kv.TxnMetaPrefix),
94+
Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 42}),
95+
},
96+
},
97+
},
98+
}
99+
100+
require.NoError(t, i.fillForwardedTxnCommitTS(reqs, 10))
101+
meta, err := kv.DecodeTxnMeta(reqs[0].Mutations[0].Value)
102+
require.NoError(t, err)
103+
require.Equal(t, uint64(42), meta.CommitTS)
104+
}
105+
106+
func TestStampTxnTimestamps_UsesSingleTxnStartTS(t *testing.T) {
107+
t.Parallel()
108+
109+
i := &Internal{}
110+
prepare := &pb.Request{
111+
IsTxn: true,
112+
Phase: pb.Phase_PREPARE,
113+
Ts: 0,
114+
Mutations: []*pb.Mutation{
115+
{Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v")},
116+
},
117+
}
118+
commit := &pb.Request{
119+
IsTxn: true,
120+
Phase: pb.Phase_COMMIT,
121+
Ts: 9,
122+
Mutations: []*pb.Mutation{
123+
{
124+
Op: pb.Op_PUT,
125+
Key: []byte(kv.TxnMetaPrefix),
126+
Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}),
127+
},
128+
{Op: pb.Op_PUT, Key: []byte("k")},
129+
},
130+
}
131+
reqs := []*pb.Request{prepare, commit}
132+
133+
require.NoError(t, i.stampTxnTimestamps(reqs))
134+
require.Equal(t, uint64(9), prepare.Ts)
135+
require.Equal(t, uint64(9), commit.Ts)
136+
137+
meta, err := kv.DecodeTxnMeta(commit.Mutations[0].Value)
138+
require.NoError(t, err)
139+
require.Greater(t, meta.CommitTS, uint64(9))
140+
}

0 commit comments

Comments
 (0)