@@ -105,3 +105,47 @@ func TestShardedCoordinatorDispatch(t *testing.T) {
105105 t .Fatalf ("expected key b missing in group2, got %v" , err )
106106 }
107107}
108+
109+ func TestShardedCoordinatorDispatch_RejectsCrossShardTxn (t * testing.T ) {
110+ ctx := context .Background ()
111+
112+ engine := distribution .NewEngine ()
113+ engine .UpdateRoute ([]byte ("a" ), []byte ("m" ), 1 )
114+ engine .UpdateRoute ([]byte ("m" ), nil , 2 )
115+
116+ s1 := store .NewMVCCStore ()
117+ r1 , stop1 := newSingleRaft (t , "g1" , NewKvFSM (s1 ))
118+ defer stop1 ()
119+
120+ s2 := store .NewMVCCStore ()
121+ r2 , stop2 := newSingleRaft (t , "g2" , NewKvFSM (s2 ))
122+ defer stop2 ()
123+
124+ groups := map [uint64 ]* ShardGroup {
125+ 1 : {Raft : r1 , Store : s1 , Txn : NewLeaderProxy (r1 )},
126+ 2 : {Raft : r2 , Store : s2 , Txn : NewLeaderProxy (r2 )},
127+ }
128+
129+ shardStore := NewShardStore (engine , groups )
130+ coord := NewShardedCoordinator (engine , groups , 1 , NewHLC (), shardStore )
131+
132+ ops := & OperationGroup [OP ]{
133+ IsTxn : true ,
134+ Elems : []* Elem [OP ]{
135+ {Op : Put , Key : []byte ("b" ), Value : []byte ("v1" )},
136+ {Op : Put , Key : []byte ("x" ), Value : []byte ("v2" )},
137+ },
138+ }
139+ if _ , err := coord .Dispatch (ctx , ops ); err == nil || ! errors .Is (err , ErrCrossShardTransactionNotSupported ) {
140+ t .Fatalf ("expected ErrCrossShardTransactionNotSupported, got %v" , err )
141+ }
142+
143+ // Ensure the rejected transaction didn't write anything.
144+ readTS := ^ uint64 (0 )
145+ if _ , err := shardStore .GetAt (ctx , []byte ("b" ), readTS ); ! errors .Is (err , store .ErrKeyNotFound ) {
146+ t .Fatalf ("expected key b missing, got %v" , err )
147+ }
148+ if _ , err := shardStore .GetAt (ctx , []byte ("x" ), readTS ); ! errors .Is (err , store .ErrKeyNotFound ) {
149+ t .Fatalf ("expected key x missing, got %v" , err )
150+ }
151+ }
0 commit comments