Skip to content

Commit 0d17ad5

Browse files
authored
Merge branch 'main' into renovate/docker-login-action-4.x
2 parents 62b64a2 + 698fe8a commit 0d17ad5

File tree

5 files changed

+301
-1
lines changed

5 files changed

+301
-1
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ Elastickv is an experimental project undertaking the challenge of creating a dis
99
- **Raft-based Data Replication**: KV state replication is implemented on Raft, with leader-based commit and follower forwarding paths.
1010
- **Shard-aware Data Plane**: Static shard ranges across multiple Raft groups with shard routing/coordinator are implemented.
1111
- **Durable Route Control Plane (Milestone 1)**: Durable route catalog, versioned route snapshot apply, watcher-based route refresh, and manual `ListRoutes`/`SplitRange` (same-group split) are implemented.
12-
- **Protocol Adapters**: gRPC (`RawKV`/`TransactionalKV`), Redis (core commands + `MULTI/EXEC` and list operations), and DynamoDB-compatible API (`PutItem`/`GetItem`/`UpdateItem`/`TransactWriteItems`) implementations are available (runtime exposure depends on the selected server entrypoint/configuration).
12+
- **Protocol Adapters**: gRPC (`RawKV`/`TransactionalKV`), Redis (core commands + `MULTI/EXEC` and list operations), and DynamoDB-compatible API (`PutItem`/`GetItem`/`DeleteItem`/`UpdateItem`/`TransactWriteItems`) implementations are available (runtime exposure depends on the selected server entrypoint/configuration).
13+
- **DynamoDB Compatibility Scope**: `CreateTable`/`DeleteTable`/`DescribeTable`/`ListTables`/`PutItem`/`GetItem`/`DeleteItem`/`UpdateItem`/`Query`/`TransactWriteItems` are implemented. `Scan` and `BatchWriteItem` are currently unsupported.
1314
- **Basic Consistency Behaviors**: Write-after-read checks, leader redirection/forwarding paths, and OCC conflict detection for transactional writes are covered by tests.
1415

1516
## Planned Features

adapter/dynamodb.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929
describeTableTarget = targetPrefix + "DescribeTable"
3030
listTablesTarget = targetPrefix + "ListTables"
3131
putItemTarget = targetPrefix + "PutItem"
32+
deleteItemTarget = targetPrefix + "DeleteItem"
3233
getItemTarget = targetPrefix + "GetItem"
3334
queryTarget = targetPrefix + "Query"
3435
updateItemTarget = targetPrefix + "UpdateItem"
@@ -51,6 +52,7 @@ const (
5152
tableLockStripeCount = 128
5253
queryDefaultLimit = 100
5354
tableCleanupDeleteBatchSize = 256
55+
dynamoMaxRequestBodyBytes = 1 << 20
5456

5557
dynamoTableMetaPrefix = "!ddb|meta|table|"
5658
dynamoTableGenerationPrefix = "!ddb|meta|gen|"
@@ -89,6 +91,7 @@ func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate kv.Co
8991
describeTableTarget: d.describeTable,
9092
listTablesTarget: d.listTables,
9193
putItemTarget: d.putItem,
94+
deleteItemTarget: d.deleteItem,
9295
getItemTarget: d.getItem,
9396
queryTarget: d.query,
9497
updateItemTarget: d.updateItem,
@@ -671,6 +674,10 @@ func (d *DynamoDBServer) retryItemWriteWithGeneration(
671674
if err != nil {
672675
return err
673676
}
677+
if plan.req == nil {
678+
return nil
679+
}
680+
plan.req.StartTS = readTS
674681
if err = d.commitItemWrite(ctx, plan.req); err != nil {
675682
if !isRetryableTransactWriteError(err) {
676683
return errors.WithStack(err)
@@ -778,6 +785,129 @@ func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) {
778785
writeDynamoJSON(w, map[string]any{"Item": item})
779786
}
780787

788+
func (d *DynamoDBServer) deleteItem(w http.ResponseWriter, r *http.Request) {
789+
in, shouldReturnOld, err := decodeDeleteItemInput(http.MaxBytesReader(w, r.Body, dynamoMaxRequestBodyBytes))
790+
if err != nil {
791+
writeDynamoErrorFromErr(w, err)
792+
return
793+
}
794+
lockKey, err := dynamoItemUpdateLockKey(in.TableName, in.Key)
795+
if err != nil {
796+
writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, err.Error())
797+
return
798+
}
799+
unlock := d.lockItemUpdate(lockKey)
800+
defer unlock()
801+
oldItem, err := d.deleteItemWithRetry(r.Context(), in)
802+
if err != nil {
803+
writeDynamoErrorFromErr(w, err)
804+
return
805+
}
806+
resp := map[string]any{}
807+
if shouldReturnOld && len(oldItem) > 0 {
808+
resp["Attributes"] = oldItem
809+
}
810+
writeDynamoJSON(w, resp)
811+
}
812+
813+
func decodeDeleteItemInput(bodyReader io.Reader) (deleteItemInput, bool, error) {
814+
body, err := io.ReadAll(bodyReader)
815+
if err != nil {
816+
return deleteItemInput{}, false, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, err.Error())
817+
}
818+
var in deleteItemInput
819+
if err := json.Unmarshal(body, &in); err != nil {
820+
return deleteItemInput{}, false, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, err.Error())
821+
}
822+
if strings.TrimSpace(in.TableName) == "" {
823+
return deleteItemInput{}, false, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, "missing table name")
824+
}
825+
shouldReturnOld, err := parseDeleteItemReturnValues(in.ReturnValues)
826+
if err != nil {
827+
return deleteItemInput{}, false, err
828+
}
829+
return in, shouldReturnOld, nil
830+
}
831+
832+
func parseDeleteItemReturnValues(returnValues string) (bool, error) {
833+
switch strings.TrimSpace(returnValues) {
834+
case "", "NONE":
835+
return false, nil
836+
case "ALL_OLD":
837+
return true, nil
838+
default:
839+
return false, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, "unsupported ReturnValues")
840+
}
841+
}
842+
843+
type deleteItemPlan struct {
844+
req *kv.OperationGroup[kv.OP]
845+
generation uint64
846+
oldItem map[string]attributeValue
847+
}
848+
849+
func (d *DynamoDBServer) deleteItemWithRetry(ctx context.Context, in deleteItemInput) (map[string]attributeValue, error) {
850+
var oldItem map[string]attributeValue
851+
err := d.retryItemWriteWithGeneration(
852+
ctx,
853+
in.TableName,
854+
"delete retry attempts exhausted",
855+
func(readTS uint64) (*itemWritePlan, error) {
856+
plan, err := d.prepareDeleteItemWrite(ctx, in, readTS)
857+
if err != nil {
858+
return nil, err
859+
}
860+
oldItem = plan.oldItem
861+
return &itemWritePlan{
862+
req: plan.req,
863+
generation: plan.generation,
864+
}, nil
865+
},
866+
)
867+
if err != nil {
868+
return nil, err
869+
}
870+
return oldItem, nil
871+
}
872+
873+
func (d *DynamoDBServer) prepareDeleteItemWrite(ctx context.Context, in deleteItemInput, readTS uint64) (*deleteItemPlan, error) {
874+
schema, exists, err := d.loadTableSchemaAt(ctx, in.TableName, readTS)
875+
if err != nil {
876+
return nil, errors.WithStack(err)
877+
}
878+
if !exists {
879+
return nil, newDynamoAPIError(http.StatusBadRequest, dynamoErrResourceNotFound, "table not found")
880+
}
881+
itemKey, err := schema.itemKeyFromAttributes(in.Key)
882+
if err != nil {
883+
return nil, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, err.Error())
884+
}
885+
current, found, err := d.readItemAtKeyAt(ctx, itemKey, readTS)
886+
if err != nil {
887+
return nil, errors.WithStack(err)
888+
}
889+
if err := validateConditionOnItem(
890+
in.ConditionExpression,
891+
in.ExpressionAttributeNames,
892+
in.ExpressionAttributeValues,
893+
valueOrEmptyMap(current, found),
894+
); err != nil {
895+
return nil, newDynamoAPIError(http.StatusBadRequest, dynamoErrConditionalFailed, err.Error())
896+
}
897+
if !found {
898+
return &deleteItemPlan{oldItem: nil}, nil
899+
}
900+
req, err := buildItemDeleteRequest(schema, itemKey, current)
901+
if err != nil {
902+
return nil, err
903+
}
904+
return &deleteItemPlan{
905+
req: req,
906+
generation: schema.Generation,
907+
oldItem: cloneAttributeValueMap(current),
908+
}, nil
909+
}
910+
781911
func (d *DynamoDBServer) updateItem(w http.ResponseWriter, r *http.Request) {
782912
in, err := decodeUpdateItemInput(r.Body)
783913
if err != nil {

adapter/dynamodb_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package adapter
22

33
import (
44
"context"
5+
"io"
6+
"net/http"
57
"strconv"
68
"strings"
79
"sync"
@@ -76,6 +78,142 @@ func TestDynamoDB_PutItem_GetItem(t *testing.T) {
7678
assert.Equal(t, "v", valueAttr.Value)
7779
}
7880

81+
func TestDynamoDB_DeleteItem(t *testing.T) {
82+
t.Parallel()
83+
nodes, _, _ := createNode(t, 1)
84+
defer shutdown(nodes)
85+
86+
cfg, err := config.LoadDefaultConfig(context.Background(),
87+
config.WithRegion("us-west-2"),
88+
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "")),
89+
)
90+
require.NoError(t, err)
91+
92+
client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
93+
o.BaseEndpoint = aws.String("http://" + nodes[0].dynamoAddress)
94+
})
95+
createSimpleKeyTable(t, context.Background(), client)
96+
97+
_, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{
98+
TableName: aws.String("t"),
99+
Item: map[string]types.AttributeValue{
100+
"key": &types.AttributeValueMemberS{Value: "delete-target"},
101+
"value": &types.AttributeValueMemberS{Value: "v"},
102+
},
103+
})
104+
require.NoError(t, err)
105+
106+
delOut, err := client.DeleteItem(context.Background(), &dynamodb.DeleteItemInput{
107+
TableName: aws.String("t"),
108+
Key: map[string]types.AttributeValue{
109+
"key": &types.AttributeValueMemberS{Value: "delete-target"},
110+
},
111+
ReturnValues: types.ReturnValueAllOld,
112+
})
113+
require.NoError(t, err)
114+
require.NotEmpty(t, delOut.Attributes)
115+
oldValue, ok := delOut.Attributes["value"].(*types.AttributeValueMemberS)
116+
require.True(t, ok)
117+
require.Equal(t, "v", oldValue.Value)
118+
119+
getOut, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{
120+
TableName: aws.String("t"),
121+
Key: map[string]types.AttributeValue{
122+
"key": &types.AttributeValueMemberS{Value: "delete-target"},
123+
},
124+
})
125+
require.NoError(t, err)
126+
require.Empty(t, getOut.Item)
127+
}
128+
129+
func TestDynamoDB_DeleteItem_Condition(t *testing.T) {
130+
t.Parallel()
131+
nodes, _, _ := createNode(t, 1)
132+
defer shutdown(nodes)
133+
134+
cfg, err := config.LoadDefaultConfig(context.Background(),
135+
config.WithRegion("us-west-2"),
136+
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "")),
137+
)
138+
require.NoError(t, err)
139+
140+
client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {
141+
o.BaseEndpoint = aws.String("http://" + nodes[0].dynamoAddress)
142+
})
143+
createSimpleKeyTable(t, context.Background(), client)
144+
145+
_, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{
146+
TableName: aws.String("t"),
147+
Item: map[string]types.AttributeValue{
148+
"key": &types.AttributeValueMemberS{Value: "cond-target"},
149+
"value": &types.AttributeValueMemberS{Value: "v"},
150+
},
151+
})
152+
require.NoError(t, err)
153+
154+
_, err = client.DeleteItem(context.Background(), &dynamodb.DeleteItemInput{
155+
TableName: aws.String("t"),
156+
Key: map[string]types.AttributeValue{
157+
"key": &types.AttributeValueMemberS{Value: "cond-target"},
158+
},
159+
ConditionExpression: aws.String("attribute_not_exists(#k)"),
160+
ExpressionAttributeNames: map[string]string{
161+
"#k": "key",
162+
},
163+
})
164+
require.Error(t, err)
165+
var condErr *types.ConditionalCheckFailedException
166+
require.ErrorAs(t, err, &condErr)
167+
168+
getOut, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{
169+
TableName: aws.String("t"),
170+
Key: map[string]types.AttributeValue{
171+
"key": &types.AttributeValueMemberS{Value: "cond-target"},
172+
},
173+
})
174+
require.NoError(t, err)
175+
require.NotEmpty(t, getOut.Item)
176+
177+
_, err = client.DeleteItem(context.Background(), &dynamodb.DeleteItemInput{
178+
TableName: aws.String("t"),
179+
Key: map[string]types.AttributeValue{
180+
"key": &types.AttributeValueMemberS{Value: "cond-target"},
181+
},
182+
ConditionExpression: aws.String("attribute_exists(#k)"),
183+
ExpressionAttributeNames: map[string]string{
184+
"#k": "key",
185+
},
186+
})
187+
require.NoError(t, err)
188+
}
189+
190+
func TestDynamoDB_DeleteItem_RequestBodyTooLarge(t *testing.T) {
191+
t.Parallel()
192+
nodes, _, _ := createNode(t, 1)
193+
defer shutdown(nodes)
194+
195+
reqBody := strings.Repeat("a", dynamoMaxRequestBodyBytes+1)
196+
req, err := http.NewRequestWithContext(
197+
context.Background(),
198+
http.MethodPost,
199+
"http://"+nodes[0].dynamoAddress+"/",
200+
strings.NewReader(reqBody),
201+
)
202+
require.NoError(t, err)
203+
req.Header.Set("X-Amz-Target", deleteItemTarget)
204+
req.Header.Set("Content-Type", "application/x-amz-json-1.0")
205+
206+
resp, err := http.DefaultClient.Do(req)
207+
require.NoError(t, err)
208+
defer resp.Body.Close()
209+
210+
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
211+
body, err := io.ReadAll(resp.Body)
212+
require.NoError(t, err)
213+
require.Contains(t, string(body), dynamoErrValidation)
214+
require.Contains(t, string(body), "too large")
215+
}
216+
79217
func TestDynamoDB_TransactWriteItems(t *testing.T) {
80218
t.Parallel()
81219
nodes, _, _ := createNode(t, 1)

adapter/dynamodb_types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ type putItemInput struct {
2121
Item map[string]attributeValue `json:"Item"`
2222
}
2323

24+
type deleteItemInput struct {
25+
TableName string `json:"TableName"`
26+
Key map[string]attributeValue `json:"Key"`
27+
ConditionExpression string `json:"ConditionExpression"`
28+
ExpressionAttributeNames map[string]string `json:"ExpressionAttributeNames"`
29+
ExpressionAttributeValues map[string]attributeValue `json:"ExpressionAttributeValues"`
30+
ReturnValues string `json:"ReturnValues"`
31+
}
32+
2433
type transactWriteItemsInput struct {
2534
TransactItems []transactWriteItem `json:"TransactItems"`
2635
}

docs/docker_multinode_manual_run.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,28 @@ redis-cli -h 10.0.0.11 -p 6379 SET survive yes
220220
redis-cli -h 10.0.0.12 -p 6379 GET survive
221221
```
222222

223+
## DynamoDB Compatibility Notes
224+
225+
Current DynamoDB-compatible API coverage includes:
226+
227+
- `CreateTable`
228+
- `DeleteTable`
229+
- `DescribeTable`
230+
- `ListTables`
231+
- `PutItem`
232+
- `GetItem`
233+
- `DeleteItem`
234+
- `UpdateItem`
235+
- `Query`
236+
- `TransactWriteItems`
237+
238+
Currently unsupported commands:
239+
240+
- `Scan`
241+
- `BatchWriteItem`
242+
243+
If you migrate existing DynamoDB data, use key-based reads (`GetItem`/`Query`) and write with `PutItem`/`TransactWriteItems` instead of `Scan`/`BatchWriteItem`.
244+
223245
## Stop and Cleanup
224246

225247
Stop/remove on each VM:

0 commit comments

Comments
 (0)