diff --git a/src/clients/admin/admin.ts b/src/clients/admin/admin.ts index 6e4e8389..e9568a08 100644 --- a/src/clients/admin/admin.ts +++ b/src/clients/admin/admin.ts @@ -95,7 +95,7 @@ import { adminTopicsChannel, createDiagnosticContext } from '../../diagnostic.ts' -import { MultipleErrors } from '../../errors.ts' +import { MultipleErrors, UserError } from '../../errors.ts' import { type Broker, type Connection } from '../../index.ts' import { Reader } from '../../protocol/reader.ts' import { @@ -2350,15 +2350,26 @@ export class Admin extends Base { return } + // metadata must be defined at this point + const { topics, brokers } = metadata! + const requests = new Map() for (const topic of options.topics) { for (const partition of topic.partitions) { - const { leader, leaderEpoch } = metadata!.topics.get(topic.name)!.partitions[partition.partitionIndex] - let leaderRequests = requests.get(leader) + // topics.get(topic.name) must be defined as the metadata request was successful + const topicData = topics.get(topic.name)! + + const targetPartitionData = topicData.partitions[partition.partitionIndex] + if (!targetPartitionData) { + callback(new UserError(`Unknown partition ${partition.partitionIndex} for topic ${topic.name}.`)) + return + } + + let leaderRequests = requests.get(targetPartitionData.leader) if (!leaderRequests) { leaderRequests = [] - requests.set(leader, leaderRequests) + requests.set(targetPartitionData.leader, leaderRequests) } let topicRequest = leaderRequests.find(t => t.name === topic.name) @@ -2369,7 +2380,7 @@ export class Admin extends Base { topicRequest.partitions.push({ partitionIndex: partition.partitionIndex, - currentLeaderEpoch: leaderEpoch, + currentLeaderEpoch: targetPartitionData.leaderEpoch, /* c8 ignore next - Hard to test */ timestamp: partition.timestamp ?? -1n }) @@ -2380,7 +2391,7 @@ export class Admin extends Base { 'Listing offsets failed.', requests, ([leader, requests], concurrentCallback) => { - this[kGetConnection](metadata!.brokers.get(leader)!, (error, connection) => { + this[kGetConnection](brokers.get(leader)!, (error, connection) => { if (error) { concurrentCallback(error) return diff --git a/src/clients/base/base.ts b/src/clients/base/base.ts index f6915c57..dec486a0 100644 --- a/src/clients/base/base.ts +++ b/src/clients/base/base.ts @@ -594,6 +594,16 @@ export class Base< }, (error, metadata) => { if (error) { + const unknownTopicError = (error as GenericError).findBy('apiCode', 3) + if (unknownTopicError) { + const topicIndexMatch = unknownTopicError.path?.match(/\/topics\/(\d+)/) + const topicIndex = topicIndexMatch ? parseInt(topicIndexMatch[1]) : -1 + const topicName = + topicIndex >= 0 && topicIndex < topicsToFetch.length ? topicsToFetch[topicIndex] : 'unknown' + deduplicateCallback(new UserError(`Unknown topic ${topicName}.`)) + return + } + const hasStaleMetadata = (error as GenericError).findBy('hasStaleMetadata', true) // Stale metadata, we need to fetch everything again diff --git a/test/clients/admin/admin.test.ts b/test/clients/admin/admin.test.ts index 30642f8e..433a54b5 100644 --- a/test/clients/admin/admin.test.ts +++ b/test/clients/admin/admin.test.ts @@ -65,6 +65,7 @@ import { sleep, stringSerializers, UnsupportedApiError, + UserError, type Writer } from '../../../src/index.ts' import { @@ -990,15 +991,15 @@ test('deleteTopics should not deduplicate deletion of different topics', async t await admin.metadata({ topics: [topicNames[0]] }) throw Error('Topic still exists: ' + topicNames[0]) } catch (error) { - // ApiCode 3 = UnknownTopicOrPartition - ok(error.findBy?.('apiCode', 3)) + // UserError is thrown for unknown topics + ok(error instanceof UserError) } try { await admin.metadata({ topics: [topicNames[1]] }) throw Error('Topic still exists: ' + topicNames[1]) } catch (error) { - // ApiCode 3 = UnknownTopicOrPartition - ok(error.findBy?.('apiCode', 3)) + // UserError is thrown for unknown topics + ok(error instanceof UserError) } }) }) diff --git a/test/clients/base/base.test.ts b/test/clients/base/base.test.ts index d96de7e1..9c6f747e 100644 --- a/test/clients/base/base.test.ts +++ b/test/clients/base/base.test.ts @@ -15,6 +15,7 @@ import { UnsupportedApiError, type Broker, type CallbackWithPromise, + UserError, type ClientDiagnosticEvent, type ClusterMetadata } from '../../../src/index.ts' @@ -730,9 +731,8 @@ test('kPerformWithRetry should not leak timers', async t => { // If the timeout was already resolved, the test runner would complain const error = await promise - ok(error instanceof MultipleErrors) - strictEqual(error.errors.length, 2) - ok(error.errors[1].message.startsWith('Client closed while retrying')) + // UserError is thrown for unknown topics + ok(error instanceof UserError) }) test('kPerformWithRetry should accept a custom function', async t => { diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts index ebd45bf9..0fbc3c55 100644 --- a/test/clients/consumer/consumer.test.ts +++ b/test/clients/consumer/consumer.test.ts @@ -2563,11 +2563,11 @@ test('startLagMonitoring should handle errors', async t => { const topic = await createTopic(t, true, 3) const consumer = createConsumer(t) - const { promise, resolve } = promiseWithResolvers() + const { promise, resolve } = promiseWithResolvers() consumer.startLagMonitoring({ topics: ['invalid'] }, 1000) consumer.on('consumer:lag:error', error => { - resolve(error as MultipleErrors) + resolve(error as UserError) }) await consumer.consume({ @@ -2579,8 +2579,8 @@ test('startLagMonitoring should handle errors', async t => { }) const error = await promise - ok(MultipleErrors.isMultipleErrors(error)) - deepStrictEqual(error.errors[0].errors[0].message, 'This server does not host this topic-partition.') + ok(error instanceof UserError) + ok(error.message.startsWith('Unknown topic ')) }) test('findGroupCoordinator should return the coordinator nodeId and support diagnostic channels', async t => {