Skip to content

Commit 650c4da

Browse files
committed
fix(grouper-metrics): (docs, log context, pushgateway options, duplicate-retry test
1 parent 0d9f719 commit 650c4da

File tree

6 files changed

+119
-23
lines changed

6 files changed

+119
-23
lines changed

.env.sample

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,13 @@ PROMETHEUS_PUSHGATEWAY_URL=
3232
PROMETHEUS_PUSHGATEWAY_INTERVAL=10000
3333

3434
# Grouper memory log controls
35+
# Number of handled tasks between memory checkpoint logs
3536
GROUPER_MEMORY_LOG_EVERY_TASKS=50
37+
# Number of handled tasks in one sustained-growth evaluation window
3638
GROUPER_MEMORY_GROWTH_WINDOW_TASKS=200
39+
# Sustained-growth warning threshold in megabytes (MB)
3740
GROUPER_MEMORY_GROWTH_WARN_MB=64
41+
# Single-handle growth warning threshold in megabytes (MB)
3842
GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB=16
3943

4044
# project token for error catching

lib/metrics.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ export function startMetricsPushing(workerName: string): () => void {
7272
const interval = getPushIntervalMs();
7373
const hostname = os.hostname();
7474
const id = nanoid(ID_SIZE);
75-
const gateway = new client.Pushgateway(url, [], register);
75+
const gateway = new client.Pushgateway(url, undefined, register);
7676

7777
currentWorkerName = workerName;
7878

workers/grouper/src/index.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ export default class GrouperWorker extends Worker {
163163
});
164164
} catch (error) {
165165
this.grouperMetrics.incrementErrorsTotal();
166-
this.memoryMonitor.logHandleError(this.handledTasksCount, task.payload?.title);
166+
this.memoryMonitor.logHandleError(this.handledTasksCount, task.payload?.title, task.projectId);
167167
throw error;
168168
}
169169
}
@@ -179,7 +179,7 @@ export default class GrouperWorker extends Worker {
179179
const memoryBeforeHandle = process.memoryUsage();
180180

181181
this.grouperMetrics.observePayloadSize(taskPayloadSize);
182-
this.memoryMonitor.logBeforeHandle(memoryBeforeHandle, handledTasksCount, taskPayloadSize);
182+
this.memoryMonitor.logBeforeHandle(memoryBeforeHandle, handledTasksCount, taskPayloadSize, task.projectId);
183183

184184
this.logger.info(`[handle] project=${task.projectId} catcher=${task.catcherType} title="${task.payload.title}" payloadSize=${taskPayloadSize}b backtraceFrames=${task.payload.backtrace?.length ?? 0}`);
185185

@@ -208,6 +208,10 @@ export default class GrouperWorker extends Worker {
208208
*/
209209
this.dataFilter.processEvent(task.payload);
210210

211+
/**
212+
* Retry loop for duplicate-key races:
213+
* another worker may save the first occurrence between `getEvent()` and `saveEvent()`.
214+
*/
211215
while (true) {
212216
uniqueEventHash = await this.getUniqueEventHash(task);
213217

@@ -217,7 +221,7 @@ export default class GrouperWorker extends Worker {
217221
const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title);
218222

219223
if (similarEvent) {
220-
this.logger.info(`[handle] similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`);
224+
this.logger.info(`[handle] project=${task.projectId} title="${task.payload.title}" similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`);
221225

222226
/**
223227
* Override group hash with found event's group hash
@@ -227,7 +231,7 @@ export default class GrouperWorker extends Worker {
227231
existedEvent = similarEvent;
228232
} else {
229233
/**
230-
* If we couldn't group by grouping pattern — try grouping by hash (title)
234+
* If we couldn't group by grouping pattern - try grouping by hash (title).
231235
*/
232236
/**
233237
* Find event by group hash.
@@ -244,7 +248,7 @@ export default class GrouperWorker extends Worker {
244248
try {
245249
const incrementAffectedUsers = !!task.payload.user;
246250

247-
this.logger.info(`[saveEvent] new event, payloadSize=${taskPayloadSize}b`);
251+
this.logger.info(`[saveEvent] project=${task.projectId} title="${task.payload.title}" new event, payloadSize=${taskPayloadSize}b`);
248252

249253
/**
250254
* Insert new event
@@ -277,7 +281,7 @@ export default class GrouperWorker extends Worker {
277281
*/
278282
if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) {
279283
this.grouperMetrics.incrementDuplicateRetriesTotal();
280-
this.logger.info(`[saveEvent] duplicate key, retrying as repetition`);
284+
this.logger.info(`[saveEvent] project=${task.projectId} title="${task.payload.title}" duplicate key, retrying as repetition`);
281285

282286
continue;
283287
}
@@ -306,7 +310,7 @@ export default class GrouperWorker extends Worker {
306310

307311
const existedPayloadSize = Buffer.byteLength(JSON.stringify(existedEvent.payload));
308312

309-
this.logger.info(`[computeDelta] existedPayloadSize=${existedPayloadSize}b taskPayloadSize=${taskPayloadSize}b`);
313+
this.logger.info(`[computeDelta] project=${task.projectId} title="${task.payload.title}" existedPayloadSize=${existedPayloadSize}b taskPayloadSize=${taskPayloadSize}b`);
310314

311315
try {
312316
/**
@@ -323,7 +327,7 @@ export default class GrouperWorker extends Worker {
323327

324328
this.grouperMetrics.observeDeltaSize(deltaSize);
325329

326-
this.logger.info(`[computeDelta] deltaSize=${deltaSize}b`);
330+
this.logger.info(`[computeDelta] project=${task.projectId} title="${task.payload.title}" deltaSize=${deltaSize}b`);
327331

328332
const newRepetition = {
329333
groupHash: uniqueEventHash,

workers/grouper/src/metrics/config.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,45 @@
22
* Parsed config for grouper memory monitoring.
33
*/
44
export interface GrouperMemoryConfig {
5+
/**
6+
* Write periodic memory checkpoint every N handled tasks.
7+
*/
58
logEveryTasks: number;
9+
10+
/**
11+
* Number of handled tasks in one sustained-growth evaluation window.
12+
*/
613
growthWindowTasks: number;
14+
15+
/**
16+
* Warn when heap growth in the evaluation window is greater than this amount in MB.
17+
*/
718
growthWarnMb: number;
19+
20+
/**
21+
* Warn when a single handle() call grows heap by more than this amount in MB.
22+
*/
823
handleGrowthWarnMb: number;
924
}
1025

26+
/**
27+
* Default memory checkpoint interval in handled tasks.
28+
*/
1129
const DEFAULT_MEMORY_LOG_EVERY_TASKS = 50;
30+
31+
/**
32+
* Default sustained-growth window size in handled tasks.
33+
*/
1234
const DEFAULT_MEMORY_GROWTH_WINDOW_TASKS = 200;
35+
36+
/**
37+
* Default sustained-growth warning threshold in MB.
38+
*/
1339
const DEFAULT_MEMORY_GROWTH_WARN_MB = 64;
40+
41+
/**
42+
* Default single-handle growth warning threshold in MB.
43+
*/
1444
const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16;
1545

1646
/**

workers/grouper/src/metrics/memoryMonitor.ts

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ interface LoggerLike {
66
}
77

88
const ROUND_PRECISION = 100;
9+
10+
/**
11+
* Number of bytes in one mebibyte.
12+
*/
913
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
1014
const BYTES_IN_MEBIBYTE = 1024 * 1024;
1115

@@ -15,7 +19,15 @@ const BYTES_IN_MEBIBYTE = 1024 * 1024;
1519
export default class GrouperMemoryMonitor {
1620
private readonly logger: LoggerLike;
1721
private readonly config: GrouperMemoryConfig;
22+
23+
/**
24+
* Task number of the last sustained-growth checkpoint.
25+
*/
1826
private memoryCheckpointTask = 0;
27+
28+
/**
29+
* Heap usage (bytes) saved at the last sustained-growth checkpoint.
30+
*/
1931
private memoryCheckpointHeapUsed = 0;
2032

2133
/**
@@ -54,11 +66,12 @@ export default class GrouperMemoryMonitor {
5466
*
5567
* @param handledTasksCount - currently handled tasks count.
5668
* @param title - event title if available.
69+
* @param projectId - project identifier.
5770
*/
58-
public logHandleError(handledTasksCount: number, title: string | undefined): void {
71+
public logHandleError(handledTasksCount: number, title: string | undefined, projectId: string): void {
5972
const suffix = title ? `title="${title}"` : '';
6073

61-
this.logCheckpoint('handle-error', process.memoryUsage(), handledTasksCount, suffix);
74+
this.logCheckpoint('handle-error', process.memoryUsage(), handledTasksCount, projectId, suffix);
6275
}
6376

6477
/**
@@ -67,17 +80,18 @@ export default class GrouperMemoryMonitor {
6780
* @param memoryUsage - process memory usage.
6881
* @param handledTasksCount - currently handled tasks count.
6982
* @param payloadSizeBytes - task payload size.
83+
* @param projectId - project identifier.
7084
*/
71-
public logBeforeHandle(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, payloadSizeBytes: number): void {
85+
public logBeforeHandle(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, payloadSizeBytes: number, projectId: string): void {
7286
if (handledTasksCount !== 1 && handledTasksCount % this.config.logEveryTasks !== 0) {
7387
return;
7488
}
7589

76-
this.logCheckpoint('before-handle', memoryUsage, handledTasksCount, `payloadSize=${payloadSizeBytes}b`);
90+
this.logCheckpoint('before-handle', memoryUsage, handledTasksCount, projectId, `payloadSize=${payloadSizeBytes}b`);
7791
}
7892

7993
/**
80-
* Log handle() completion memory details and growth checks.
94+
* Log post-handle memory stats and emit warnings on suspicious growth.
8195
*
8296
* @param memoryBeforeHandle - memory usage before handling.
8397
* @param handledTasksCount - currently handled tasks count.
@@ -97,25 +111,26 @@ export default class GrouperMemoryMonitor {
97111
const heapDeltaMb = Math.round((heapDeltaBytes / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION;
98112

99113
this.logger.info(
100-
`[handle] done, ${this.formatMemoryUsage(memoryAfterHandle)} heapDelta=${heapDeltaMb}MB handled=${handledTasksCount}`
114+
`[memory][project=${projectId}] [handle] done, ${this.formatMemoryUsage(memoryAfterHandle)} heapDelta=${heapDeltaMb}MB handled=${handledTasksCount}`
101115
);
102116

103117
if (heapDeltaBytes > this.config.handleGrowthWarnMb * BYTES_IN_MEBIBYTE) {
104118
this.logger.warn(
105-
`[memory] high heap growth in single handle: heapDelta=${heapDeltaMb}MB payloadSize=${payloadSizeBytes}b title="${title}" project=${projectId}`
119+
`[memory][project=${projectId}] high heap growth in single handle: heapDelta=${heapDeltaMb}MB payloadSize=${payloadSizeBytes}b title="${title}"`
106120
);
107121
}
108122

109-
this.checkMemoryGrowthWindow(memoryAfterHandle, handledTasksCount);
123+
this.checkMemoryGrowthWindow(memoryAfterHandle, handledTasksCount, projectId);
110124
}
111125

112126
/**
113127
* Logs sustained heap growth over a configurable number of handled tasks.
114128
*
115129
* @param memoryUsage - current process memory usage.
116130
* @param handledTasksCount - currently handled tasks count.
131+
* @param projectId - project identifier.
117132
*/
118-
private checkMemoryGrowthWindow(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number): void {
133+
private checkMemoryGrowthWindow(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, projectId: string): void {
119134
const tasksInWindow = handledTasksCount - this.memoryCheckpointTask;
120135

121136
if (tasksInWindow < this.config.growthWindowTasks) {
@@ -127,12 +142,12 @@ export default class GrouperMemoryMonitor {
127142
const heapUsedNowMb = Math.round((memoryUsage.heapUsed / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION;
128143

129144
this.logger.info(
130-
`[memory] growth window tasks=${tasksInWindow} handled=${this.memoryCheckpointTask + 1}-${handledTasksCount} heapGrowth=${heapGrowthMb}MB heapUsedNow=${heapUsedNowMb}MB`
145+
`[memory][project=${projectId}] growth window tasks=${tasksInWindow} handled=${this.memoryCheckpointTask + 1}-${handledTasksCount} heapGrowth=${heapGrowthMb}MB heapUsedNow=${heapUsedNowMb}MB`
131146
);
132147

133148
if (heapGrowthBytes > this.config.growthWarnMb * BYTES_IN_MEBIBYTE) {
134149
this.logger.warn(
135-
`[memory] possible leak detected: heap grew by ${heapGrowthMb}MB in ${tasksInWindow} handled tasks`
150+
`[memory][project=${projectId}] possible leak detected: heap grew by ${heapGrowthMb}MB in ${tasksInWindow} handled tasks`
136151
);
137152
}
138153

@@ -161,11 +176,13 @@ export default class GrouperMemoryMonitor {
161176
* @param stage - lifecycle stage.
162177
* @param memoryUsage - current process memory usage.
163178
* @param handledTasksCount - currently handled tasks count.
179+
* @param projectId - optional project identifier.
164180
* @param suffix - optional extra suffix.
165181
*/
166-
private logCheckpoint(stage: string, memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, suffix = ''): void {
182+
private logCheckpoint(stage: string, memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, projectId?: string, suffix = ''): void {
167183
const extra = suffix ? ` ${suffix}` : '';
184+
const prefix = projectId ? `[memory][project=${projectId}]` : '[memory]';
168185

169-
this.logger.info(`[memory] stage=${stage} handled=${handledTasksCount} ${this.formatMemoryUsage(memoryUsage)}${extra}`);
186+
this.logger.info(`${prefix} stage=${stage} handled=${handledTasksCount} ${this.formatMemoryUsage(memoryUsage)}${extra}`);
170187
}
171188
}

workers/grouper/tests/index.test.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type { RedisClientType } from 'redis';
55
import { createClient } from 'redis';
66
import type { Collection } from 'mongodb';
77
import { MongoClient } from 'mongodb';
8-
import type { ErrorsCatcherType, EventAddons, EventData } from '@hawk.so/types';
8+
import type { ErrorsCatcherType, EventAddons, EventData, GroupedEventDBScheme } from '@hawk.so/types';
99
import { MS_IN_SEC } from '../../../lib/utils/consts';
1010
import * as mongodb from 'mongodb';
1111
import { patch } from '@n1ru4l/json-patch-plus';
@@ -173,6 +173,47 @@ describe('GrouperWorker', () => {
173173
expect(await eventsCollection.find().count()).toBe(1);
174174
});
175175

176+
test('Should process event as repetition after duplicate key race', async () => {
177+
const duplicateKeyErrorCode = 11000;
178+
const task = generateTask();
179+
const workerWithSaveEvent = worker as unknown as {
180+
saveEvent: (
181+
projectId: string,
182+
groupedEventData: GroupedEventDBScheme
183+
) => Promise<mongodb.ObjectID>;
184+
};
185+
const originalSaveEvent = workerWithSaveEvent.saveEvent.bind(workerWithSaveEvent) as (
186+
projectId: string,
187+
groupedEventData: GroupedEventDBScheme
188+
) => Promise<mongodb.ObjectID>;
189+
let shouldThrowDuplicate = true;
190+
const saveEventSpy = jest.spyOn(workerWithSaveEvent, 'saveEvent');
191+
192+
saveEventSpy.mockImplementation(async (projectId: string, groupedEventData: GroupedEventDBScheme): Promise<mongodb.ObjectID> => {
193+
if (shouldThrowDuplicate) {
194+
shouldThrowDuplicate = false;
195+
196+
await originalSaveEvent(projectId, groupedEventData);
197+
198+
const duplicateError = new Error('duplicate key') as Error & { code: number };
199+
200+
duplicateError.code = duplicateKeyErrorCode;
201+
throw duplicateError;
202+
}
203+
204+
return originalSaveEvent(projectId, groupedEventData);
205+
});
206+
207+
await worker.handle(task);
208+
209+
expect(saveEventSpy).toHaveBeenCalledTimes(1);
210+
expect(await eventsCollection.find().count()).toBe(1);
211+
expect(await repetitionsCollection.find().count()).toBe(1);
212+
expect((await eventsCollection.findOne({})).totalCount).toBe(2);
213+
214+
saveEventSpy.mockRestore();
215+
});
216+
176217
test('Should increment total events count on each processing', async () => {
177218
await worker.handle(generateTask());
178219
await worker.handle(generateTask());

0 commit comments

Comments
 (0)