Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions src/clients/admin/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ import {
adminTopicsChannel,
createDiagnosticContext
} from '../../diagnostic.ts'
import { MultipleErrors } from '../../errors.ts'
import { MultipleErrors, UserError } from '../../errors.ts'
import { type Broker, type Connection } from '../../index.ts'
import { Reader } from '../../protocol/reader.ts'
import {
Expand Down Expand Up @@ -2350,15 +2350,26 @@ export class Admin extends Base<AdminOptions> {
return
}

// metadata must be defined at this point
const { topics, brokers } = metadata!

const requests = new Map<number, ListOffsetsRequestTopic[]>()

for (const topic of options.topics) {
for (const partition of topic.partitions) {
const { leader, leaderEpoch } = metadata!.topics.get(topic.name)!.partitions[partition.partitionIndex]
let leaderRequests = requests.get(leader)
// topics.get(topic.name) must be defined as the metadata request was successful
const topicData = topics.get(topic.name)!

const targetPartitionData = topicData.partitions[partition.partitionIndex]
if (!targetPartitionData) {
callback(new UserError(`Unknown partition ${partition.partitionIndex} for topic ${topic.name}.`))
return
}

let leaderRequests = requests.get(targetPartitionData.leader)
if (!leaderRequests) {
leaderRequests = []
requests.set(leader, leaderRequests)
requests.set(targetPartitionData.leader, leaderRequests)
}

let topicRequest = leaderRequests.find(t => t.name === topic.name)
Expand All @@ -2369,7 +2380,7 @@ export class Admin extends Base<AdminOptions> {

topicRequest.partitions.push({
partitionIndex: partition.partitionIndex,
currentLeaderEpoch: leaderEpoch,
currentLeaderEpoch: targetPartitionData.leaderEpoch,
/* c8 ignore next - Hard to test */
timestamp: partition.timestamp ?? -1n
})
Expand All @@ -2380,7 +2391,7 @@ export class Admin extends Base<AdminOptions> {
'Listing offsets failed.',
requests,
([leader, requests], concurrentCallback) => {
this[kGetConnection](metadata!.brokers.get(leader)!, (error, connection) => {
this[kGetConnection](brokers.get(leader)!, (error, connection) => {
if (error) {
concurrentCallback(error)
return
Expand Down
10 changes: 10 additions & 0 deletions src/clients/base/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,16 @@ export class Base<
},
(error, metadata) => {
if (error) {
const unknownTopicError = (error as GenericError).findBy('apiCode', 3)
if (unknownTopicError) {
const topicIndexMatch = unknownTopicError.path?.match(/\/topics\/(\d+)/)
const topicIndex = topicIndexMatch ? parseInt(topicIndexMatch[1]) : -1
const topicName =
topicIndex >= 0 && topicIndex < topicsToFetch.length ? topicsToFetch[topicIndex] : 'unknown'
deduplicateCallback(new UserError(`Unknown topic ${topicName}.`))
return
}

const hasStaleMetadata = (error as GenericError).findBy('hasStaleMetadata', true)

// Stale metadata, we need to fetch everything again
Expand Down
9 changes: 5 additions & 4 deletions test/clients/admin/admin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import {
sleep,
stringSerializers,
UnsupportedApiError,
UserError,
type Writer
} from '../../../src/index.ts'
import {
Expand Down Expand Up @@ -990,15 +991,15 @@ test('deleteTopics should not deduplicate deletion of different topics', async t
await admin.metadata({ topics: [topicNames[0]] })
throw Error('Topic still exists: ' + topicNames[0])
} catch (error) {
// ApiCode 3 = UnknownTopicOrPartition
ok(error.findBy?.('apiCode', 3))
// UserError is thrown for unknown topics
ok(error instanceof UserError)
}
try {
await admin.metadata({ topics: [topicNames[1]] })
throw Error('Topic still exists: ' + topicNames[1])
} catch (error) {
// ApiCode 3 = UnknownTopicOrPartition
ok(error.findBy?.('apiCode', 3))
// UserError is thrown for unknown topics
ok(error instanceof UserError)
}
})
})
Expand Down
6 changes: 3 additions & 3 deletions test/clients/base/base.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
UnsupportedApiError,
type Broker,
type CallbackWithPromise,
UserError,
type ClientDiagnosticEvent,
type ClusterMetadata
} from '../../../src/index.ts'
Expand Down Expand Up @@ -730,9 +731,8 @@ test('kPerformWithRetry should not leak timers', async t => {
// If the timeout was already resolved, the test runner would complain
const error = await promise

ok(error instanceof MultipleErrors)
strictEqual(error.errors.length, 2)
ok(error.errors[1].message.startsWith('Client closed while retrying'))
// UserError is thrown for unknown topics
ok(error instanceof UserError)
})

test('kPerformWithRetry should accept a custom function', async t => {
Expand Down
8 changes: 4 additions & 4 deletions test/clients/consumer/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2563,11 +2563,11 @@ test('startLagMonitoring should handle errors', async t => {
const topic = await createTopic(t, true, 3)
const consumer = createConsumer(t)

const { promise, resolve } = promiseWithResolvers<MultipleErrors>()
const { promise, resolve } = promiseWithResolvers<UserError>()
consumer.startLagMonitoring({ topics: ['invalid'] }, 1000)

consumer.on('consumer:lag:error', error => {
resolve(error as MultipleErrors)
resolve(error as UserError)
})

await consumer.consume({
Expand All @@ -2579,8 +2579,8 @@ test('startLagMonitoring should handle errors', async t => {
})

const error = await promise
ok(MultipleErrors.isMultipleErrors(error))
deepStrictEqual(error.errors[0].errors[0].message, 'This server does not host this topic-partition.')
ok(error instanceof UserError)
ok(error.message.startsWith('Unknown topic '))
})

test('findGroupCoordinator should return the coordinator nodeId and support diagnostic channels', async t => {
Expand Down