-
-
Notifications
You must be signed in to change notification settings - Fork 270
Description
Kfake currently returns INVALID_TXN_STATE when it receives a non-transactional idempotent produce request when there is no current producer epoch. Code:
franz-go/pkg/kfake/00_produce.go
Line 211 in 8ad4514
| case window == nil && b.ProducerEpoch != -1: |
This diverges from Apache Kafka's behavior, which accepts such requests and implicitly creates producer state. Kafka accepts the first produce request for a pid/epoch with ANY sequence number: https://github.com/apache/kafka/blob/3e7eddecd6a63ea6a9793d3270bef6d0be5c9021/core/src/main/scala/kafka/log/ProducerStateManager.scala#L235-L236. Redpanda also behaves the same way: https://github.com/redpanda-data/redpanda/blob/dev/src/v/cluster/rm_stm.cc#L1149
While I agree that this behavior is a bit weird, I think kfake should behave the same as Kafka and accept any sequence number if there is no current producer epoch. It could also be useful for testing franzgo's behavior to deal with this behavior (the okOnSink logic - code).
If you are ok with the direction I can implement a fix.
Code to reproduce issue
package test
import (
"context"
"hash/crc32"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/modules/kafka"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kversion"
)
func makeRecordBatch(value []byte, producerID int64, producerEpoch int16, firstSequence int32) []byte {
record := kmsg.Record{
Value: value,
Headers: []kmsg.Header{},
}
trial := record.AppendTo(nil)
record.Length = int32(len(trial) - 1)
batch := kmsg.RecordBatch{
Magic: 2,
ProducerID: producerID,
ProducerEpoch: producerEpoch,
FirstSequence: firstSequence,
NumRecords: 1,
Records: record.AppendTo(nil),
}
rawBatch := batch.AppendTo(nil)
batch.Length = int32(len(rawBatch[12:]))
batch.CRC = int32(crc32.Checksum(rawBatch[21:], crc32.MakeTable(crc32.Castagnoli)))
return batch.AppendTo(nil)
}
func produceWithUnknownPID(
t *testing.T,
ctx context.Context,
client *kgo.Client,
topic string,
producerID int64,
producerEpoch int16,
firstSequence int32,
) *kmsg.ProduceResponseTopicPartition {
t.Helper()
produceReq := kmsg.NewProduceRequest()
produceReq.Acks = -1
produceReq.TimeoutMillis = 5000
produceReq.Topics = append(produceReq.Topics, kmsg.ProduceRequestTopic{
Topic: topic,
Partitions: []kmsg.ProduceRequestTopicPartition{
{
Partition: 0,
Records: makeRecordBatch([]byte("hello from unknown PID"), producerID, producerEpoch, firstSequence),
},
},
})
produceResp, err := client.Request(ctx, &produceReq)
require.NoError(t, err)
pr := produceResp.(*kmsg.ProduceResponse)
require.Len(t, pr.Topics, 1)
require.Len(t, pr.Topics[0].Partitions, 1)
return &pr.Topics[0].Partitions[0]
}
func TestKfake_RejectsUnknownProducerID(t *testing.T) {
ctx := t.Context()
cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(1, "test-topic"))
require.NoError(t, err)
defer cluster.Close()
client, err := kgo.NewClient(
kgo.SeedBrokers(cluster.ListenAddrs()...),
kgo.MaxVersions(kversion.V3_6_0()),
)
require.NoError(t, err)
defer client.Close()
p := produceWithUnknownPID(t, ctx, client, "test-topic", 1000, 2, 10)
require.EqualValues(t, kerr.InvalidTxnState.Code, p.ErrorCode)
}
// This shows how Apache Kafka reacts to unknown pid with non-zero sequence number.
func TestRealKafka_AcceptsUnknownProducerID(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.8.7")
require.NoError(t, err)
defer func() {
require.NoError(t, kafkaContainer.Terminate(ctx))
}()
brokers, err := kafkaContainer.Brokers(ctx)
require.NoError(t, err)
// Create the test topic.
adminClient, err := kgo.NewClient(kgo.SeedBrokers(brokers...))
require.NoError(t, err)
defer adminClient.Close()
admin := kadm.NewClient(adminClient)
_, err = admin.CreateTopic(ctx, 1, 1, nil, "test-topic")
require.NoError(t, err)
produceClient, err := kgo.NewClient(
kgo.SeedBrokers(brokers...),
kgo.DisableIdempotentWrite(),
)
require.NoError(t, err)
defer produceClient.Close()
p := produceWithUnknownPID(t, ctx, produceClient, "test-topic", 1000, 2, 10)
require.Zero(t, p.ErrorCode)
}