Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .grype.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ ignore:
- vulnerability: CVE-2026-22184
- vulnerability: GHSA-9ppj-qmqm-q256
- vulnerability: CVE-2026-2673 # libcrypto3,libssl3
- vulnerability: GHSA-44fc-8fm5-q62h # convict 6.2.4 (Critical)
- vulnerability: GHSA-hf2r-9gf9-rwch # convict 6.2.4 (Critical)
- vulnerability: GHSA-c2c7-rcm5-vvqj # picomatch 4.0.3 (High)
- vulnerability: GHSA-3v7f-55p6-f55p # picomatch 4.0.3 (Medium)
- vulnerability: GHSA-48c2-rrv3-qjmp # yaml 2.8.2 (Medium)

Medium severity vulnerabilities:
- zlib 1.3.1-r2: CVE-2026-27171
Expand Down
5 changes: 4 additions & 1 deletion audit-ci.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
"allowlist": [
"GHSA-37qj-frw5-hhjh",
"GHSA-43fc-jf86-j433",
"GHSA-wf6x-7x77-mvgw"
"GHSA-wf6x-7x77-mvgw",
"GHSA-hf2r-9gf9-rwch",
"GHSA-48c2-rrv3-qjmp",
"GHSA-44fc-8fm5-q62h"
]
}
2 changes: 1 addition & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
"config": {
"options": {
"mode": 2,
"batchSize": 1,
"batchSize": 10,
"pollFrequency": 10,
"recursiveTimeout": 100,
"messageCharset": "utf8",
Expand Down
2 changes: 1 addition & 1 deletion docker/ml-api-adapter/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"config": {
"options": {
"mode": 2,
"batchSize": 1,
"batchSize": 10,
"pollFrequency": 10,
"recursiveTimeout": 100,
"messageCharset": "utf8",
Expand Down
2 changes: 1 addition & 1 deletion docker/ml-api-adapter/default_iso.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
"config": {
"options": {
"mode": 2,
"batchSize": 1,
"batchSize": 10,
"pollFrequency": 10,
"recursiveTimeout": 100,
"messageCharset": "utf8",
Expand Down
214 changes: 107 additions & 107 deletions package-lock.json

Large diffs are not rendered by default.

43 changes: 35 additions & 8 deletions src/handlers/notification/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,25 @@

const isBatch = Array.isArray(message)
message = Array.isArray(message) ? message : [message]
let combinedResult = true

const processOneMessage = async (msg) => {
log.debug('Notification::consumeMessage - processOneMessage...')
const contextFromMessage = EventSdk.Tracer.extractContextFromMessage(msg.value)
const span = EventSdk.Tracer.createChildSpanFromContext('ml_notification_event', contextFromMessage)
const traceTags = span.getTracestateTags()
if (traceTags.timeApiPrepare && parseInt(traceTags.timeApiPrepare)) timeApiPrepare = parseInt(traceTags.timeApiPrepare)
if (traceTags.timeApiFulfil && parseInt(traceTags.timeApiFulfil)) timeApiFulfil = parseInt(traceTags.timeApiFulfil)

// Capture timestamps locally to avoid a write-race on the shared outer variables in batch mode.
const msgTimeApiPrepare = traceTags.timeApiPrepare && Number.parseInt(traceTags.timeApiPrepare)
? Number.parseInt(traceTags.timeApiPrepare)
: null
const msgTimeApiFulfil = traceTags.timeApiFulfil && Number.parseInt(traceTags.timeApiFulfil)
? Number.parseInt(traceTags.timeApiFulfil)
: null

try {
const result = await processMessage(msg, span)
log.verbose('Notification:consumeMessage - message processed')
combinedResult = (combinedResult && result)
return { result, msg, msgTimeApiPrepare, msgTimeApiFulfil }
} catch (err) {
const errMessage = 'Notification message processing error: '
log.error(errMessage, err)
Expand All @@ -198,17 +203,39 @@
await span.finish(fspiopError.message, state)

if (!isBatch) throw fspiopError // do not throw in batch mode, so that other messages can be processed
combinedResult = false
return { result: false, msg, msgTimeApiPrepare, msgTimeApiFulfil }
} finally {
if (!autoCommitEnabled) notificationConsumer.commitMessageSync(msg)
// In batch mode defer commits until all messages are processed (see ordered commit below).
// In single-message mode commit here so a failed message is not redelivered.
if (!autoCommitEnabled && !isBatch) notificationConsumer.commitMessageSync(msg)
if (!span.isFinished) await span.finish()
}
}

for (const msg of message) {
await processOneMessage(msg)
const outcomes = await Promise.all(message.map(msg => processOneMessage(msg)))

// Use the earliest timestamp across the batch to represent the batch's latency metric.
for (const { msgTimeApiPrepare, msgTimeApiFulfil } of outcomes) {
if (msgTimeApiPrepare !== null && (timeApiPrepare === undefined || msgTimeApiPrepare < timeApiPrepare)) {
timeApiPrepare = msgTimeApiPrepare
}
if (msgTimeApiFulfil !== null && (timeApiFulfil === undefined || msgTimeApiFulfil < timeApiFulfil)) {
timeApiFulfil = msgTimeApiFulfil
}
}

// Commit in ascending partition→offset order so a restart never skips an unprocessed offset.
if (!autoCommitEnabled && isBatch) {
const sortedMsgs = outcomes
.map(({ msg }) => msg)
.sort((a, b) => a.partition !== b.partition ? a.partition - b.partition : a.offset - b.offset)

Check warning on line 231 in src/handlers/notification/index.js

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Unexpected negated condition.

See more on https://sonarcloud.io/project/issues?id=mojaloop_ml-api-adapter&issues=AZ1Gxs_NBqdBuPqsP4X8&open=AZ1Gxs_NBqdBuPqsP4X8&pullRequest=650
for (const msg of sortedMsgs) {
notificationConsumer.commitMessageSync(msg)
}
}

const combinedResult = outcomes.every(({ result }) => Boolean(result))

recordTxMetrics(timeApiPrepare, timeApiFulfil, true)
histTimerEnd({ success: true })
return combinedResult
Expand Down
4 changes: 2 additions & 2 deletions test/integration/api/transfers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const testKafkaConsumerConfig = {
config: {
options: {
mode: 2,
batchSize: 1,
batchSize: 10,
pollFrequency: 10,
recursiveTimeout: 100,
messageCharset: 'utf8',
Expand All @@ -98,7 +98,7 @@ const testKafkaConsumerConfig = {
config: {
options: {
mode: 2,
batchSize: 1,
batchSize: 10,
pollFrequency: 10,
recursiveTimeout: 100,
messageCharset: 'utf8',
Expand Down
42 changes: 42 additions & 0 deletions test/unit/handlers/notification/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5089,6 +5089,48 @@ Test('Notification Service tests', async notificationTest => {
test.end()
})

await consumeMessageTest.test('process multiple valid messages in parallel batch mode', async test => {
const makeMsg = (id) => ({
value: {
metadata: {
event: {
type: 'prepare',
action: 'prepare',
state: {
status: 'success',
code: 0
}
}
},
content: {
headers: {},
payload: {},
context: {
originalRequestId: id
}
},
to: 'dfsp2',
from: 'dfsp1',
id
}
})
const msgs = [
makeMsg('aaaaaaaa-eeee-4575-b6a9-ead2955b0001'),
makeMsg('aaaaaaaa-eeee-4575-b6a9-ead2955b0002'),
makeMsg('aaaaaaaa-eeee-4575-b6a9-ead2955b0003')
]
mockPayloadCache.getPayload.returns(Promise.resolve({}))
test.ok(await Notification.startConsumer({ payloadCache: mockPayloadCache }))

const result = await Notification.consumeMessage(null, msgs)

// All three messages should be processed successfully
test.ok(result, 'combinedResult should be true when all messages succeed')
// Callback.sendRequest should have been called once per message
test.equal(Callback.sendRequest.callCount, 3, 'sendRequest should be called once per message')
test.end()
})

await consumeMessageTest.end()
})

Expand Down