Skip to content

Commit ae01c55

Browse files
e11syCopilotneSpecc
authored
feat(grouper): add counters to the grouper worker (#521)
* chore(grouper): add counters to the grouper worker * chore(): eslint fix * chore(): clean up * chore(grouper): remove redundant rate-limit increment logic * chore(grouper): remove redundant mocks * chore(): eslint fix * chore(): change metric type * Update workers/grouper/src/index.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * imp(): use lua for create if not exists, to avoid race-cond * add logs to sentry worker * chore(): remove redundant import --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Peter Savchenko <specc.dev@gmail.com>
1 parent f4342de commit ae01c55

File tree

4 files changed

+320
-5
lines changed

4 files changed

+320
-5
lines changed

workers/grouper/src/index.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,74 @@ export default class GrouperWorker extends Worker {
306306
});
307307
}
308308
}
309+
310+
await this.recordProjectMetrics(task.projectId, 'events-accepted');
311+
}
312+
313+
/**
314+
* Build RedisTimeSeries key for project metrics.
315+
*
316+
* @param projectId - id of the project
317+
* @param metricType - metric type identifier
318+
* @param granularity - time granularity
319+
*/
320+
private getTimeSeriesKey(
321+
projectId: string,
322+
metricType: string,
323+
granularity: 'minutely' | 'hourly' | 'daily'
324+
): string {
325+
return `ts:project-${metricType}:${projectId}:${granularity}`;
326+
}
327+
328+
/**
329+
* Record project metrics to Redis TimeSeries.
330+
*
331+
* @param projectId - id of the project
332+
* @param metricType - metric type identifier
333+
*/
334+
private async recordProjectMetrics(projectId: string, metricType: string): Promise<void> {
335+
const minutelyKey = this.getTimeSeriesKey(projectId, metricType, 'minutely');
336+
const hourlyKey = this.getTimeSeriesKey(projectId, metricType, 'hourly');
337+
const dailyKey = this.getTimeSeriesKey(projectId, metricType, 'daily');
338+
339+
const labels: Record<string, string> = {
340+
type: 'error',
341+
status: metricType,
342+
project: projectId,
343+
};
344+
345+
const series = [
346+
{
347+
key: minutelyKey,
348+
label: 'minutely',
349+
retentionMs: TimeMs.DAY,
350+
},
351+
{
352+
key: hourlyKey,
353+
label: 'hourly',
354+
retentionMs: TimeMs.WEEK,
355+
},
356+
{
357+
key: dailyKey,
358+
label: 'daily',
359+
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
360+
retentionMs: 90 * TimeMs.DAY,
361+
},
362+
];
363+
364+
const operations = series.map(({ key, label, retentionMs }) => ({
365+
label,
366+
promise: this.redis.safeTsAdd(key, 1, labels, retentionMs),
367+
}));
368+
369+
const results = await Promise.allSettled(operations.map((op) => op.promise));
370+
371+
results.forEach((result, index) => {
372+
if (result.status === 'rejected') {
373+
const { label } = operations[index];
374+
this.logger.error(`Failed to add ${label} TS for ${metricType}`, result.reason);
375+
}
376+
});
309377
}
310378

311379
/**

workers/grouper/src/redisHelper.ts

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,176 @@ export default class RedisHelper {
110110
return result === null;
111111
}
112112

113+
/**
114+
* Creates a RedisTimeSeries key if it doesn't exist.
115+
*
116+
* @param key - time series key
117+
* @param labels - labels to attach to the time series
118+
* @param retentionMs - optional retention in milliseconds
119+
*/
120+
public async tsCreateIfNotExists(
121+
key: string,
122+
labels: Record<string, string>,
123+
retentionMs = 0
124+
): Promise<void> {
125+
const script = `
126+
if redis.call('EXISTS', KEYS[1]) == 1 then
127+
return 0
128+
end
129+
130+
redis.call('TS.CREATE', KEYS[1], unpack(ARGV))
131+
return 1
132+
`;
133+
134+
const args: string[] = [];
135+
136+
if (retentionMs > 0) {
137+
args.push('RETENTION', Math.floor(retentionMs).toString());
138+
}
139+
140+
args.push(...this.buildLabelArguments(labels));
141+
142+
await this.redisClient.eval(
143+
script,
144+
{
145+
keys: [key],
146+
arguments: args,
147+
}
148+
);
149+
}
150+
151+
/**
152+
* Increments a RedisTimeSeries key with labels and timestamp.
153+
*
154+
* @param key - time series key
155+
* @param value - value to increment by
156+
* @param timestampMs - timestamp in milliseconds, defaults to current time
157+
* @param labels - labels to attach to the sample
158+
*/
159+
public async tsIncrBy(
160+
key: string,
161+
value: number,
162+
timestampMs = 0,
163+
labels: Record<string, string> = {}
164+
): Promise<void> {
165+
const labelArgs = this.buildLabelArguments(labels);
166+
const timestamp = timestampMs === 0 ? Date.now() : timestampMs;
167+
168+
const args: string[] = [
169+
'TS.INCRBY',
170+
key,
171+
value.toString(),
172+
'TIMESTAMP',
173+
Math.floor(timestamp).toString(),
174+
...labelArgs,
175+
];
176+
177+
await this.redisClient.sendCommand(args);
178+
}
179+
180+
/**
181+
* Ensures that a RedisTimeSeries key exists and increments it safely.
182+
*
183+
* @param key - time series key
184+
* @param value - value to increment by
185+
* @param labels - labels to attach to the time series
186+
* @param retentionMs - optional retention in milliseconds
187+
*/
188+
public async safeTsIncrBy(
189+
key: string,
190+
value: number,
191+
labels: Record<string, string>,
192+
retentionMs = 0
193+
): Promise<void> {
194+
const timestamp = Date.now();
195+
196+
try {
197+
await this.tsIncrBy(key, value, timestamp, labels);
198+
} catch (error) {
199+
if (error instanceof Error && error.message.includes('TSDB: key does not exist')) {
200+
this.logger.warn(`TS key ${key} does not exist, creating it...`);
201+
await this.tsCreateIfNotExists(key, labels, retentionMs);
202+
await this.tsIncrBy(key, value, timestamp, labels);
203+
} else {
204+
throw error;
205+
}
206+
}
207+
}
208+
209+
/**
210+
* Adds a sample to a RedisTimeSeries key.
211+
*
212+
* @param key - time series key
213+
* @param value - value to add
214+
* @param timestampMs - timestamp in milliseconds, defaults to current time
215+
* @param labels - labels to attach to the sample
216+
*/
217+
public async tsAdd(
218+
key: string,
219+
value: number,
220+
timestampMs = 0,
221+
labels: Record<string, string> = {}
222+
): Promise<void> {
223+
const labelArgs = this.buildLabelArguments(labels);
224+
const timestamp = timestampMs === 0 ? Date.now() : timestampMs;
225+
226+
const args: string[] = [
227+
'TS.ADD',
228+
key,
229+
Math.floor(timestamp).toString(),
230+
value.toString(),
231+
'ON_DUPLICATE',
232+
'SUM',
233+
...labelArgs,
234+
];
235+
236+
await this.redisClient.sendCommand(args);
237+
}
238+
239+
/**
240+
* Ensures that a RedisTimeSeries key exists and adds a sample safely.
241+
*
242+
* @param key - time series key
243+
* @param value - value to add
244+
* @param labels - labels to attach to the time series
245+
* @param retentionMs - optional retention in milliseconds
246+
*/
247+
public async safeTsAdd(
248+
key: string,
249+
value: number,
250+
labels: Record<string, string>,
251+
retentionMs = 0
252+
): Promise<void> {
253+
const timestamp = Date.now();
254+
255+
try {
256+
await this.tsAdd(key, value, timestamp, labels);
257+
} catch (error) {
258+
if (error instanceof Error && error.message.includes('TSDB: key does not exist')) {
259+
this.logger.warn(`TS key ${key} does not exist, creating it...`);
260+
await this.tsCreateIfNotExists(key, labels, retentionMs);
261+
await this.tsAdd(key, value, timestamp, labels);
262+
} else {
263+
throw error;
264+
}
265+
}
266+
}
267+
268+
/**
269+
* Build label arguments for RedisTimeSeries commands
270+
*
271+
* @param labels - labels to attach to the time series
272+
*/
273+
private buildLabelArguments(labels: Record<string, string>): string[] {
274+
const labelArgs: string[] = [ 'LABELS' ];
275+
276+
for (const [labelKey, labelValue] of Object.entries(labels)) {
277+
labelArgs.push(labelKey, labelValue);
278+
}
279+
280+
return labelArgs;
281+
}
282+
113283
/**
114284
* Creates callback function for Redis operations
115285
*
@@ -130,4 +300,4 @@ export default class RedisHelper {
130300
resolve(resp !== 'OK');
131301
};
132302
}
133-
}
303+
}

workers/grouper/tests/index.test.ts

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type { Collection } from 'mongodb';
77
import { MongoClient } from 'mongodb';
88
import type { ErrorsCatcherType, EventAddons, EventData } from '@hawk.so/types';
99
import { MS_IN_SEC } from '../../../lib/utils/consts';
10+
import TimeMs from '../../../lib/utils/time';
1011
import * as mongodb from 'mongodb';
1112
import { patch } from '@n1ru4l/json-patch-plus';
1213

@@ -115,7 +116,6 @@ describe('GrouperWorker', () => {
115116
let projectsCollection: Collection;
116117
let redisClient: RedisClientType;
117118
let worker: GrouperWorker;
118-
119119
beforeAll(async () => {
120120
worker = new GrouperWorker();
121121

@@ -743,6 +743,84 @@ describe('GrouperWorker', () => {
743743
});
744744
});
745745

746+
describe('Events-accepted metrics', () => {
747+
test('writes minutely, hourly, and daily samples after handling an event', async () => {
748+
const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd');
749+
750+
try {
751+
await worker.handle(generateTask());
752+
753+
expect(safeTsAddSpy).toHaveBeenCalledTimes(3);
754+
755+
const expectedLabels = {
756+
type: 'error',
757+
status: 'events-accepted',
758+
project: projectIdMock,
759+
};
760+
761+
expect(safeTsAddSpy).toHaveBeenNthCalledWith(
762+
1,
763+
`ts:project-events-accepted:${projectIdMock}:minutely`,
764+
1,
765+
expectedLabels,
766+
TimeMs.DAY
767+
);
768+
expect(safeTsAddSpy).toHaveBeenNthCalledWith(
769+
2,
770+
`ts:project-events-accepted:${projectIdMock}:hourly`,
771+
1,
772+
expectedLabels,
773+
TimeMs.WEEK
774+
);
775+
expect(safeTsAddSpy).toHaveBeenNthCalledWith(
776+
3,
777+
`ts:project-events-accepted:${projectIdMock}:daily`,
778+
1,
779+
expectedLabels,
780+
90 * TimeMs.DAY
781+
);
782+
} finally {
783+
safeTsAddSpy.mockRestore();
784+
}
785+
});
786+
787+
test('logs when a time-series write fails but continues processing', async () => {
788+
const safeTsAddSpy = jest.spyOn((worker as any).redis, 'safeTsAdd');
789+
const loggerErrorSpy = jest.spyOn((worker as any).logger, 'error').mockImplementation(() => undefined);
790+
const failure = new Error('TS failure');
791+
792+
safeTsAddSpy
793+
.mockImplementationOnce(() => Promise.resolve())
794+
.mockImplementationOnce(async () => {
795+
throw failure;
796+
})
797+
.mockImplementationOnce(() => Promise.resolve());
798+
799+
try {
800+
await worker.handle(generateTask());
801+
802+
expect(loggerErrorSpy).toHaveBeenCalledWith('Failed to add hourly TS for events-accepted', failure);
803+
expect(await eventsCollection.find().count()).toBe(1);
804+
} finally {
805+
safeTsAddSpy.mockRestore();
806+
loggerErrorSpy.mockRestore();
807+
}
808+
});
809+
810+
test('records metrics exactly once per handled event', async () => {
811+
const recordMetricsSpy = jest.spyOn(worker as any, 'recordProjectMetrics');
812+
813+
try {
814+
await worker.handle(generateTask());
815+
816+
expect(recordMetricsSpy).toHaveBeenCalledTimes(1);
817+
expect(recordMetricsSpy).toHaveBeenCalledWith(projectIdMock, 'events-accepted');
818+
} finally {
819+
recordMetricsSpy.mockRestore();
820+
}
821+
});
822+
});
823+
746824
afterAll(async () => {
747825
await redisClient.quit();
748826
await worker.finish();

workers/sentry/src/index.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ export default class SentryEventWorker extends Worker {
4141

4242
if (items.length === 0) {
4343
this.logger.warn('Received envelope with no items');
44-
4544
return;
4645
}
4746

@@ -50,7 +49,6 @@ export default class SentryEventWorker extends Worker {
5049

5150
for (const item of items) {
5251
const result = await this.handleEnvelopeItem(headers, item, event.projectId);
53-
5452
if (result === 'processed') {
5553
processedCount++;
5654
} else if (result === 'skipped') {
@@ -249,15 +247,16 @@ export default class SentryEventWorker extends Worker {
249247
if (!taskSent) {
250248
/**
251249
* If addTask returns false, the message was not queued (queue full or channel closed)
250+
* This is a critical error that should be logged and thrown
252251
*/
253252
const error = new Error(`Failed to queue event to ${workerName} worker. Queue may be full or channel closed.`);
254-
255253
this.logger.error(error.message);
256254
this.logger.info('👇 Here is the event that failed to queue:');
257255
this.logger.json(hawkEvent);
258256
throw error;
259257
}
260258

259+
this.logger.verbose(`Successfully queued event to ${workerName} worker`);
261260
return 'processed';
262261
} catch (error) {
263262
this.logger.error('Error handling envelope item:', error);

0 commit comments

Comments
 (0)