Skip to content

Commit be22997

Browse files
authored
feat: Added support for Kafka 4.1.0. (#247)
* feat: Added support for Kafka 4.1.0. Signed-off-by: Paolo Insogna <paolo@cowtech.it> * fixup Signed-off-by: Paolo Insogna <paolo@cowtech.it> --------- Signed-off-by: Paolo Insogna <paolo@cowtech.it>
1 parent 627c2ed commit be22997

21 files changed

+655
-23
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
strategy:
1313
matrix:
1414
node-version: [20.x, 22.x, 24.x]
15-
confluent-kafka-version: [7.5.0, 7.6.0, 7.7.0, 7.8.0, 7.9.0, 8.0.0]
15+
confluent-kafka-version: [7.5.0, 7.6.0, 7.7.0, 7.8.0, 7.9.0, 8.0.0, 8.1.0]
1616
name: 'Node.js ${{ matrix.node-version }} / Confluent Kafka ${{ matrix.confluent-kafka-version }}'
1717
runs-on: ubuntu-latest
1818
timeout-minutes: 20

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ A modern, high-performance, pure TypeScript/JavaScript type safe client for Apac
1616

1717
## Supported Kafka Version
1818

19-
Supported Kafka version are from **3.5.0** to **4.0.0** and equivalent, edges included.
19+
Supported Kafka version are from **3.5.0** to **4.1.0** and equivalent, edges included.
2020

2121
## Supported KIPs
2222

docs/internals/apis-status.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
| Heartbeat | 12 | 4 |
2727
| LeaveGroup | 13 | 5 |
2828
| SyncGroup | 14 | 5 |
29-
| ConsumerGroupHeartbeat | 68 | 0 |
29+
| ConsumerGroupHeartbeat | 68 | 1 |
3030

3131
## Admin API
3232

@@ -51,7 +51,7 @@
5151
| DescribeDelegationToken | 41 | 3 |
5252
| DeleteGroups | 42 | 2 |
5353
| IncrementalAlterConfigs | 44 | 1 |
54-
| AlterPartitionReassignments | 45 | 0 |
54+
| AlterPartitionReassignments | 45 | 1 |
5555
| ListPartitionReassignments | 46 | 0 |
5656
| OffsetDelete | 47 | 0 |
5757
| DescribeClientQuotas | 48 | 1 |
@@ -66,7 +66,7 @@
6666
| DescribeProducers | 61 | 0 |
6767
| UnregisterBroker | 64 | 0 |
6868
| DescribeTransactions | 65 | 0 |
69-
| ListTransactions | 66 | 1 |
69+
| ListTransactions | 66 | 2 |
7070
| ConsumerGroupDescribe | 69 | 0 |
7171
| DescribeTopicPartitions | 75 | 0 |
7272

docs/kips.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This page lists the KIPs we have explicitly verified in `@platformatic/kafka`.
44

5-
Other KIPs up to Kafka 3.9.0 are likely supported, or at least have protocol/API support internally, but may not yet have a public-facing client method.
5+
Other KIPs up to Kafka 4.1.0 are likely supported, or at least have protocol/API support internally, but may not yet have a public-facing client method.
66

77
| KIP | Status |
88
| -------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------- |

src/apis/admin/alter-partition-reassignments-v0.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ export interface AlterPartitionReassignmentsResponse {
4343
partition_index => INT32
4444
replicas => INT32
4545
*/
46-
export function createRequest (timeoutMs: number, topics: AlterPartitionReassignmentsRequestTopic[]): Writer {
46+
export function createRequest (
47+
timeoutMs: number,
48+
_allowReplicationFactorChange: boolean,
49+
topics: AlterPartitionReassignmentsRequestTopic[]
50+
): Writer {
4751
return Writer.create()
4852
.appendInt32(timeoutMs)
4953
.appendArray(topics, (w, t) => {
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import { ResponseError } from '../../errors.ts'
2+
import { type NullableString } from '../../protocol/definitions.ts'
3+
import { type Reader } from '../../protocol/reader.ts'
4+
import { Writer } from '../../protocol/writer.ts'
5+
import { createAPI, type ResponseErrorWithLocation } from '../definitions.ts'
6+
7+
export interface AlterPartitionReassignmentsRequestPartition {
8+
partitionIndex: number
9+
replicas: number[]
10+
}
11+
12+
export interface AlterPartitionReassignmentsRequestTopic {
13+
name: string
14+
partitions: AlterPartitionReassignmentsRequestPartition[]
15+
}
16+
17+
export type AlterPartitionReassignmentsRequest = Parameters<typeof createRequest>
18+
19+
export interface AlterPartitionReassignmentsResponsePartition {
20+
partitionIndex: number
21+
errorCode: number
22+
errorMessage: NullableString
23+
}
24+
25+
export interface AlterPartitionReassignmentsResponseResponse {
26+
name: string
27+
partitions: AlterPartitionReassignmentsResponsePartition[]
28+
}
29+
30+
export interface AlterPartitionReassignmentsResponse {
31+
throttleTimeMs: number
32+
errorCode: number
33+
errorMessage: NullableString
34+
responses: AlterPartitionReassignmentsResponseResponse[]
35+
}
36+
37+
/*
38+
AlterPartitionReassignments Request (Version: 1) => timeout_ms allow_replication_factor_change [topics] TAG_BUFFER
39+
timeout_ms => INT32
40+
allow_replication_factor_change => BOOLEAN
41+
topics => name [partitions] TAG_BUFFER
42+
name => COMPACT_STRING
43+
partitions => partition_index [replicas] TAG_BUFFER
44+
partition_index => INT32
45+
replicas => INT32
46+
*/
47+
export function createRequest (
48+
timeoutMs: number,
49+
allowReplicationFactorChange: boolean,
50+
topics: AlterPartitionReassignmentsRequestTopic[]
51+
): Writer {
52+
return Writer.create()
53+
.appendInt32(timeoutMs)
54+
.appendBoolean(allowReplicationFactorChange)
55+
.appendArray(topics, (w, t) => {
56+
w.appendString(t.name).appendArray(t.partitions, (w, p) => {
57+
w.appendInt32(p.partitionIndex).appendArray(p.replicas, (w, r) => w.appendInt32(r), true, false)
58+
})
59+
})
60+
.appendTaggedFields()
61+
}
62+
63+
/*
64+
AlterPartitionReassignments Response (Version: 1) => throttle_time_ms error_code error_message [responses] TAG_BUFFER
65+
throttle_time_ms => INT32
66+
error_code => INT16
67+
error_message => COMPACT_NULLABLE_STRING
68+
responses => name [partitions] TAG_BUFFER
69+
name => COMPACT_STRING
70+
partitions => partition_index error_code error_message TAG_BUFFER
71+
partition_index => INT32
72+
error_code => INT16
73+
error_message => COMPACT_NULLABLE_STRING
74+
*/
75+
export function parseResponse (
76+
_correlationId: number,
77+
apiKey: number,
78+
apiVersion: number,
79+
reader: Reader
80+
): AlterPartitionReassignmentsResponse {
81+
const errors: ResponseErrorWithLocation[] = []
82+
83+
const throttleTimeMs = reader.readInt32()
84+
const errorCode = reader.readInt16()
85+
const errorMessage = reader.readNullableString()
86+
87+
if (errorCode !== 0) {
88+
errors.push(['', [errorCode, errorMessage]])
89+
}
90+
91+
const response: AlterPartitionReassignmentsResponse = {
92+
throttleTimeMs,
93+
errorCode,
94+
errorMessage,
95+
responses: reader.readArray((r, i) => {
96+
return {
97+
name: r.readString(),
98+
partitions: r.readArray((r, j) => {
99+
const partition = {
100+
partitionIndex: r.readInt32(),
101+
errorCode: r.readInt16(),
102+
errorMessage: r.readNullableString()
103+
}
104+
105+
if (partition.errorCode !== 0) {
106+
errors.push([`responses/${i}/partitions/${j}`, [partition.errorCode, partition.errorMessage]])
107+
}
108+
109+
return partition
110+
})
111+
}
112+
})
113+
}
114+
115+
if (errors.length) {
116+
throw new ResponseError(apiKey, apiVersion, Object.fromEntries(errors), response)
117+
}
118+
119+
return response
120+
}
121+
122+
export const api = createAPI<AlterPartitionReassignmentsRequest, AlterPartitionReassignmentsResponse>(
123+
45,
124+
1,
125+
createRequest,
126+
parseResponse
127+
)

src/apis/admin/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export * as alterClientQuotasV1 from './alter-client-quotas-v1.ts'
22
export * as alterConfigsV2 from './alter-configs-v2.ts'
33
export * as alterPartitionReassignmentsV0 from './alter-partition-reassignments-v0.ts'
4+
export * as alterPartitionReassignmentsV1 from './alter-partition-reassignments-v1.ts'
45
export * as alterPartitionV3 from './alter-partition-v3.ts'
56
export * as alterReplicaLogDirsV2 from './alter-replica-log-dirs-v2.ts'
67
export * as alterUserScramCredentialsV0 from './alter-user-scram-credentials-v0.ts'
@@ -43,6 +44,7 @@ export * as listGroupsV5 from './list-groups-v5.ts'
4344
export * as listPartitionReassignmentsV0 from './list-partition-reassignments-v0.ts'
4445
export * as listTransactionsV0 from './list-transactions-v0.ts'
4546
export * as listTransactionsV1 from './list-transactions-v1.ts'
47+
export * as listTransactionsV2 from './list-transactions-v2.ts'
4648
export * as offsetDeleteV0 from './offset-delete-v0.ts'
4749
export * as renewDelegationTokenV2 from './renew-delegation-token-v2.ts'
4850
export * as unregisterBrokerV0 from './unregister-broker-v0.ts'

src/apis/admin/list-transactions-v0.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { ResponseError } from '../../errors.ts'
2+
import { type NullableString } from '../../protocol/definitions.ts'
23
import { type Reader } from '../../protocol/reader.ts'
34
import { Writer } from '../../protocol/writer.ts'
45
import { createAPI } from '../definitions.ts'
@@ -27,7 +28,8 @@ export interface ListTransactionsResponse {
2728
export function createRequest (
2829
stateFilters: TransactionState[],
2930
producerIdFilters: bigint[],
30-
_durationFilter: bigint
31+
_durationFilter: bigint,
32+
_transactionalIdPattern: NullableString
3133
): Writer {
3234
return Writer.create()
3335
.appendArray(stateFilters, (w, t) => w.appendString(t), true, false)

src/apis/admin/list-transactions-v1.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { ResponseError } from '../../errors.ts'
2+
import { type NullableString } from '../../protocol/definitions.ts'
23
import { type Reader } from '../../protocol/reader.ts'
34
import { Writer } from '../../protocol/writer.ts'
45
import { createAPI } from '../definitions.ts'
@@ -28,7 +29,8 @@ export interface ListTransactionsResponse {
2829
export function createRequest (
2930
stateFilters: TransactionState[],
3031
producerIdFilters: bigint[],
31-
durationFilter: bigint
32+
durationFilter: bigint,
33+
_transactionalIdPattern: NullableString
3234
): Writer {
3335
return Writer.create()
3436
.appendArray(stateFilters, (w, t) => w.appendString(t), true, false)
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import { ResponseError } from '../../errors.ts'
2+
import { type NullableString } from '../../protocol/definitions.ts'
3+
import { type Reader } from '../../protocol/reader.ts'
4+
import { Writer } from '../../protocol/writer.ts'
5+
import { createAPI } from '../definitions.ts'
6+
import { type TransactionState } from '../enumerations.ts'
7+
8+
export type ListTransactionsRequest = Parameters<typeof createRequest>
9+
10+
export interface ListTransactionsResponseTransactionState {
11+
transactionalId: string
12+
producerId: bigint
13+
transactionState: string
14+
}
15+
16+
export interface ListTransactionsResponse {
17+
throttleTimeMs: number
18+
errorCode: number
19+
errorMessage: NullableString
20+
unknownStateFilters: string[]
21+
transactionStates: ListTransactionsResponseTransactionState[]
22+
}
23+
24+
/*
25+
ListTransactions Request (Version: 2) => [state_filters] [producer_id_filters] duration_filter transactional_id_pattern TAG_BUFFER
26+
state_filters => COMPACT_STRING
27+
producer_id_filters => INT64
28+
duration_filter => INT64
29+
transactional_id_pattern => COMPACT_NULLABLE_STRING
30+
*/
31+
export function createRequest (
32+
stateFilters: TransactionState[],
33+
producerIdFilters: bigint[],
34+
durationFilter: bigint,
35+
transactionalIdPattern: NullableString
36+
): Writer {
37+
return Writer.create()
38+
.appendArray(stateFilters, (w, t) => w.appendString(t), true, false)
39+
.appendArray(producerIdFilters, (w, p) => w.appendInt64(p), true, false)
40+
.appendInt64(durationFilter)
41+
.appendString(transactionalIdPattern)
42+
.appendTaggedFields()
43+
}
44+
45+
/*
46+
ListTransactions Response (Version: 2) => throttle_time_ms error_code error_message [unknown_state_filters] [transaction_states] TAG_BUFFER
47+
throttle_time_ms => INT32
48+
error_code => INT16
49+
error_message => COMPACT_NULLABLE_STRING
50+
unknown_state_filters => COMPACT_STRING
51+
transaction_states => transactional_id producer_id transaction_state TAG_BUFFER
52+
transactional_id => COMPACT_STRING
53+
producer_id => INT64
54+
transaction_state => COMPACT_STRING
55+
*/
56+
export function parseResponse (
57+
_correlationId: number,
58+
apiKey: number,
59+
apiVersion: number,
60+
reader: Reader
61+
): ListTransactionsResponse {
62+
const response: ListTransactionsResponse = {
63+
throttleTimeMs: reader.readInt32(),
64+
errorCode: reader.readInt16(),
65+
errorMessage: reader.readNullableString(),
66+
unknownStateFilters: reader.readArray(r => r.readString(), true, false)!,
67+
transactionStates: reader.readArray(r => {
68+
return {
69+
transactionalId: r.readString(),
70+
producerId: r.readInt64(),
71+
transactionState: r.readString()
72+
}
73+
})
74+
}
75+
76+
if (response.errorCode !== 0) {
77+
throw new ResponseError(apiKey, apiVersion, { '/': [response.errorCode, response.errorMessage] }, response)
78+
}
79+
80+
return response
81+
}
82+
83+
export const api = createAPI<ListTransactionsRequest, ListTransactionsResponse>(66, 2, createRequest, parseResponse)

0 commit comments

Comments
 (0)