Skip to content

Commit 3f9fde4

Browse files
committed
kv: use one-phase commit for single-group txns
1 parent ff2678b commit 3f9fde4

File tree

7 files changed

+218
-64
lines changed

7 files changed

+218
-64
lines changed

adapter/internal.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,10 @@ func forwardedTxnStartTS(reqs []*pb.Request) uint64 {
130130
}
131131

132132
func forwardedTxnMetaMutation(r *pb.Request, metaPrefix []byte) (*pb.Mutation, bool) {
133-
if r == nil {
133+
if r == nil || !r.IsTxn {
134134
return nil, false
135135
}
136-
if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT {
136+
if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT && r.Phase != pb.Phase_NONE {
137137
return nil, false
138138
}
139139
if len(r.Mutations) == 0 || r.Mutations[0] == nil {

adapter/internal_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,37 @@ func TestFillForwardedTxnCommitTS_PreservesExistingCommitTS(t *testing.T) {
103103
require.Equal(t, uint64(42), meta.CommitTS)
104104
}
105105

106+
func TestFillForwardedTxnCommitTS_AssignsCommitTSForOnePhaseTxn(t *testing.T) {
107+
t.Parallel()
108+
109+
i := &Internal{}
110+
startTS := uint64(10)
111+
reqs := []*pb.Request{
112+
{
113+
IsTxn: true,
114+
Phase: pb.Phase_NONE,
115+
Mutations: []*pb.Mutation{
116+
{
117+
Op: pb.Op_PUT,
118+
Key: []byte(kv.TxnMetaPrefix),
119+
Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}),
120+
},
121+
{
122+
Op: pb.Op_PUT,
123+
Key: []byte("k"),
124+
Value: []byte("v"),
125+
},
126+
},
127+
},
128+
}
129+
130+
require.NoError(t, i.fillForwardedTxnCommitTS(reqs, startTS))
131+
132+
meta, err := kv.DecodeTxnMeta(reqs[0].Mutations[0].Value)
133+
require.NoError(t, err)
134+
require.Equal(t, startTS+1, meta.CommitTS)
135+
}
136+
106137
func TestStampTxnTimestamps_UsesSingleTxnStartTS(t *testing.T) {
107138
t.Parallel()
108139

kv/coordinator.go

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64) (*CoordinateR
116116
return nil, errors.WithStack(ErrTxnCommitTSRequired)
117117
}
118118

119-
logs := txnRequests(startTS, commitTS, defaultTxnLockTTLms, primary, reqs)
120-
121-
r, err := c.transactionManager.Commit(logs)
119+
r, err := c.transactionManager.Commit([]*pb.Request{
120+
onePhaseTxnRequest(startTS, commitTS, primary, reqs),
121+
})
122122
if err != nil {
123123
return nil, errors.WithStack(err)
124124
}
@@ -209,7 +209,9 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C
209209
if len(primary) == 0 {
210210
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
211211
}
212-
requests = txnRequests(reqs.StartTS, 0, defaultTxnLockTTLms, primary, reqs.Elems)
212+
requests = []*pb.Request{
213+
onePhaseTxnRequest(reqs.StartTS, 0, primary, reqs.Elems),
214+
}
213215
} else {
214216
for _, req := range reqs.Elems {
215217
requests = append(requests, c.toRawRequest(req))
@@ -261,33 +263,17 @@ func elemToMutation(req *Elem[OP]) *pb.Mutation {
261263
panic("unreachable")
262264
}
263265

264-
func txnRequests(startTS, commitTS, lockTTLms uint64, primaryKey []byte, reqs []*Elem[OP]) []*pb.Request {
265-
meta := &pb.Mutation{
266-
Op: pb.Op_PUT,
267-
Key: []byte(txnMetaPrefix),
268-
Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: lockTTLms, CommitTS: 0}),
269-
}
270-
271-
prepareMuts := make([]*pb.Mutation, 0, len(reqs)+1)
272-
prepareMuts = append(prepareMuts, meta)
273-
for _, req := range reqs {
274-
prepareMuts = append(prepareMuts, elemToMutation(req))
275-
}
276-
277-
commitMeta := &pb.Mutation{
278-
Op: pb.Op_PUT,
279-
Key: []byte(txnMetaPrefix),
280-
Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: 0, CommitTS: commitTS}),
281-
}
282-
commitMuts := make([]*pb.Mutation, 0, len(reqs)+1)
283-
commitMuts = append(commitMuts, commitMeta)
266+
func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP]) *pb.Request {
267+
muts := make([]*pb.Mutation, 0, len(reqs)+1)
268+
muts = append(muts, txnMetaMutation(primaryKey, 0, commitTS))
284269
for _, req := range reqs {
285-
commitMuts = append(commitMuts, &pb.Mutation{Op: pb.Op_PUT, Key: req.Key})
270+
muts = append(muts, elemToMutation(req))
286271
}
287-
288-
return []*pb.Request{
289-
{IsTxn: true, Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: prepareMuts},
290-
{IsTxn: true, Phase: pb.Phase_COMMIT, Ts: startTS, Mutations: commitMuts},
272+
return &pb.Request{
273+
IsTxn: true,
274+
Phase: pb.Phase_NONE,
275+
Ts: startTS,
276+
Mutations: muts,
291277
}
292278
}
293279

kv/coordinator_txn_test.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import (
99

1010
type stubTransactional struct {
1111
commits int
12+
reqs [][]*pb.Request
1213
}
1314

14-
func (s *stubTransactional) Commit(_ []*pb.Request) (*TransactionResponse, error) {
15+
func (s *stubTransactional) Commit(reqs []*pb.Request) (*TransactionResponse, error) {
1516
s.commits++
17+
s.reqs = append(s.reqs, reqs)
1618
return &TransactionResponse{}, nil
1719
}
1820

@@ -54,3 +56,40 @@ func TestCoordinateDispatchTxn_RejectsMissingPrimaryKey(t *testing.T) {
5456
require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired)
5557
require.Equal(t, 0, tx.commits)
5658
}
59+
60+
func TestCoordinateDispatchTxn_UsesOnePhaseRequest(t *testing.T) {
61+
t.Parallel()
62+
63+
tx := &stubTransactional{}
64+
c := &Coordinate{
65+
transactionManager: tx,
66+
clock: NewHLC(),
67+
}
68+
69+
startTS := uint64(10)
70+
_, err := c.dispatchTxn([]*Elem[OP]{
71+
{Op: Put, Key: []byte("b"), Value: []byte("v1")},
72+
{Op: Del, Key: []byte("x")},
73+
}, startTS)
74+
require.NoError(t, err)
75+
require.Equal(t, 1, tx.commits)
76+
require.Len(t, tx.reqs, 1)
77+
require.Len(t, tx.reqs[0], 1)
78+
79+
req := tx.reqs[0][0]
80+
require.NotNil(t, req)
81+
require.True(t, req.IsTxn)
82+
require.Equal(t, pb.Phase_NONE, req.Phase)
83+
require.Equal(t, startTS, req.Ts)
84+
require.Len(t, req.Mutations, 3)
85+
require.Equal(t, []byte(txnMetaPrefix), req.Mutations[0].Key)
86+
require.Equal(t, []byte("b"), req.Mutations[1].Key)
87+
require.Equal(t, []byte("v1"), req.Mutations[1].Value)
88+
require.Equal(t, pb.Op_DEL, req.Mutations[2].Op)
89+
require.Equal(t, []byte("x"), req.Mutations[2].Key)
90+
91+
meta, err := DecodeTxnMeta(req.Mutations[0].Value)
92+
require.NoError(t, err)
93+
require.Equal(t, []byte("b"), meta.PrimaryKey)
94+
require.Greater(t, meta.CommitTS, startTS)
95+
}

kv/fsm.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func requestCommitTS(r *pb.Request) (uint64, error) {
8787
}
8888

8989
commitTS := r.Ts
90-
if r.IsTxn && (r.Phase == pb.Phase_COMMIT || r.Phase == pb.Phase_ABORT) {
90+
if r.IsTxn && (r.Phase == pb.Phase_COMMIT || r.Phase == pb.Phase_ABORT || r.Phase == pb.Phase_NONE) {
9191
meta, _, err := extractTxnMeta(r.Mutations)
9292
if err != nil {
9393
return 0, errors.WithStack(err)
@@ -177,8 +177,7 @@ func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS ui
177177
case pb.Phase_ABORT:
178178
return f.handleAbortRequest(ctx, r, commitTS)
179179
case pb.Phase_NONE:
180-
// not reached
181-
return errors.WithStack(ErrUnknownRequestType)
180+
return f.handleOnePhaseTxnRequest(ctx, r, commitTS)
182181
default:
183182
return errors.WithStack(ErrUnknownRequestType)
184183
}
@@ -266,6 +265,37 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error {
266265
return nil
267266
}
268267

268+
func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, commitTS uint64) error {
269+
meta, muts, err := extractTxnMeta(r.Mutations)
270+
if err != nil {
271+
return err
272+
}
273+
if len(meta.PrimaryKey) == 0 {
274+
return errors.WithStack(ErrTxnPrimaryKeyRequired)
275+
}
276+
if len(muts) == 0 {
277+
return errors.WithStack(ErrInvalidRequest)
278+
}
279+
startTS := r.Ts
280+
if commitTS <= startTS {
281+
return errors.WithStack(ErrTxnCommitTSRequired)
282+
}
283+
284+
uniq, err := uniqueMutations(muts)
285+
if err != nil {
286+
return err
287+
}
288+
if err := f.validateConflicts(ctx, uniq, startTS); err != nil {
289+
return errors.WithStack(err)
290+
}
291+
292+
storeMuts, err := f.buildOnePhaseStoreMutations(ctx, uniq)
293+
if err != nil {
294+
return err
295+
}
296+
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, startTS, commitTS))
297+
}
298+
269299
func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error {
270300
meta, muts, err := extractTxnMeta(r.Mutations)
271301
if err != nil {
@@ -343,6 +373,22 @@ func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutat
343373
return storeMuts, nil
344374
}
345375

376+
func (f *kvFSM) buildOnePhaseStoreMutations(ctx context.Context, muts []*pb.Mutation) ([]*store.KVPairMutation, error) {
377+
for _, mut := range muts {
378+
if isTxnInternalKey(mut.Key) {
379+
return nil, errors.WithStack(ErrInvalidRequest)
380+
}
381+
if err := f.assertNoConflictingTxnLock(ctx, mut.Key, 0); err != nil {
382+
return nil, err
383+
}
384+
}
385+
storeMuts, err := toStoreMutations(muts)
386+
if err != nil {
387+
return nil, errors.WithStack(err)
388+
}
389+
return storeMuts, nil
390+
}
391+
346392
func (f *kvFSM) buildCommitStoreMutations(ctx context.Context, muts []*pb.Mutation, meta TxnMeta, startTS, commitTS uint64) ([]*store.KVPairMutation, error) {
347393
storeMuts := make([]*store.KVPairMutation, 0, len(muts)*txnCommitStoreMutationFactor+txnCommitStoreMutationSlack)
348394

kv/fsm_txn_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,38 @@ func TestPrepareClampsHugeLockTTL(t *testing.T) {
143143
maxDelta := maxTxnLockTTLms << hlcLogicalBits
144144
require.LessOrEqual(t, lock.TTLExpireAt-now, maxDelta)
145145
}
146+
147+
func TestOnePhaseTxnCommitsWithoutTxnArtifacts(t *testing.T) {
148+
t.Parallel()
149+
150+
ctx := context.Background()
151+
st := store.NewMVCCStore()
152+
fsm, ok := NewKvFSM(st).(*kvFSM)
153+
require.True(t, ok)
154+
155+
startTS := uint64(15)
156+
commitTS := uint64(25)
157+
key := []byte("k")
158+
req := &pb.Request{
159+
IsTxn: true,
160+
Phase: pb.Phase_NONE,
161+
Ts: startTS,
162+
Mutations: []*pb.Mutation{
163+
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: key, CommitTS: commitTS})},
164+
{Op: pb.Op_PUT, Key: key, Value: []byte("v")},
165+
},
166+
}
167+
168+
require.NoError(t, applyFSMRequest(t, fsm, req))
169+
170+
value, err := st.GetAt(ctx, key, ^uint64(0))
171+
require.NoError(t, err)
172+
require.Equal(t, []byte("v"), value)
173+
174+
_, err = st.GetAt(ctx, txnLockKey(key), ^uint64(0))
175+
require.ErrorIs(t, err, store.ErrKeyNotFound)
176+
_, err = st.GetAt(ctx, txnIntentKey(key), ^uint64(0))
177+
require.ErrorIs(t, err, store.ErrKeyNotFound)
178+
_, err = st.GetAt(ctx, txnCommitKey(key, startTS), ^uint64(0))
179+
require.ErrorIs(t, err, store.ErrKeyNotFound)
180+
}

kv/transaction.go

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,10 @@ func (t *TransactionManager) commitSequential(reqs []*pb.Request) (*TransactionR
159159

160160
if err != nil {
161161
// Only attempt transactional cleanup for transactional batches. Raw request
162-
// batches may partially succeed across shards by design.
163-
if len(reqs) > 0 && reqs[0] != nil && reqs[0].IsTxn {
162+
// batches may partially succeed across shards by design. One-phase
163+
// transactional requests do not leave intents behind, so they do not need
164+
// abort cleanup on failure.
165+
if needsTxnCleanup(reqs) {
164166
_, _err := t.Abort(reqs)
165167
if _err != nil {
166168
return nil, errors.WithStack(errors.CombineErrors(err, _err))
@@ -174,6 +176,15 @@ func (t *TransactionManager) commitSequential(reqs []*pb.Request) (*TransactionR
174176
}, nil
175177
}
176178

179+
func needsTxnCleanup(reqs []*pb.Request) bool {
180+
for _, req := range reqs {
181+
if req != nil && req.IsTxn && req.Phase != pb.Phase_NONE {
182+
return true
183+
}
184+
}
185+
return false
186+
}
187+
177188
func (t *TransactionManager) commitRaw(reqs []*pb.Request) (*TransactionResponse, error) {
178189
item := &rawCommitItem{
179190
reqs: reqs,
@@ -315,34 +326,9 @@ func combineApplyErrors(errs []error) error {
315326
func (t *TransactionManager) Abort(reqs []*pb.Request) (*TransactionResponse, error) {
316327
var abortReqs []*pb.Request
317328
for _, req := range reqs {
318-
if req == nil || !req.IsTxn {
319-
continue
320-
}
321-
meta, muts, err := extractTxnMeta(req.Mutations)
322-
if err != nil {
323-
// Best-effort cleanup; skip requests we can't interpret.
324-
continue
329+
if abortReq := abortRequestFor(req); abortReq != nil {
330+
abortReqs = append(abortReqs, abortReq)
325331
}
326-
startTS := req.Ts
327-
abortTS := abortTSFrom(startTS, startTS)
328-
if abortTS <= startTS {
329-
// Overflow: can't choose an abort timestamp strictly greater than startTS.
330-
continue
331-
}
332-
meta.CommitTS = abortTS
333-
334-
abortReqs = append(abortReqs, &pb.Request{
335-
IsTxn: true,
336-
Phase: pb.Phase_ABORT,
337-
Ts: startTS,
338-
Mutations: append([]*pb.Mutation{
339-
{
340-
Op: pb.Op_PUT,
341-
Key: []byte(txnMetaPrefix),
342-
Value: EncodeTxnMeta(meta),
343-
},
344-
}, muts...),
345-
})
346332
}
347333

348334
var commitIndex uint64
@@ -362,6 +348,37 @@ func (t *TransactionManager) Abort(reqs []*pb.Request) (*TransactionResponse, er
362348
}, nil
363349
}
364350

351+
func abortRequestFor(req *pb.Request) *pb.Request {
352+
if req == nil || !req.IsTxn || req.Phase == pb.Phase_NONE {
353+
return nil
354+
}
355+
meta, muts, err := extractTxnMeta(req.Mutations)
356+
if err != nil {
357+
// Best-effort cleanup; skip requests we can't interpret.
358+
return nil
359+
}
360+
startTS := req.Ts
361+
abortTS := abortTSFrom(startTS, startTS)
362+
if abortTS <= startTS {
363+
// Overflow: can't choose an abort timestamp strictly greater than startTS.
364+
return nil
365+
}
366+
meta.CommitTS = abortTS
367+
368+
return &pb.Request{
369+
IsTxn: true,
370+
Phase: pb.Phase_ABORT,
371+
Ts: startTS,
372+
Mutations: append([]*pb.Mutation{
373+
{
374+
Op: pb.Op_PUT,
375+
Key: []byte(txnMetaPrefix),
376+
Value: EncodeTxnMeta(meta),
377+
},
378+
}, muts...),
379+
}
380+
}
381+
365382
func extractTxnMeta(muts []*pb.Mutation) (TxnMeta, []*pb.Mutation, error) {
366383
if len(muts) == 0 || muts[0] == nil || !isTxnMetaKey(muts[0].Key) {
367384
return TxnMeta{}, nil, errors.WithStack(ErrTxnMetaMissing)

0 commit comments

Comments
 (0)