Skip to content

Commit 0d9f719

Browse files
committed
fix(metrics): validate push interval, add push cleanup, and prevent retry double-counting in grouper
1 parent ba3f4b9 commit 0d9f719

File tree

6 files changed

+638
-337
lines changed

6 files changed

+638
-337
lines changed

lib/metrics.ts

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,97 @@
11
import * as client from 'prom-client';
22
import os from 'os';
33
import { nanoid } from 'nanoid';
4+
import createLogger from './logger';
45

56
const register = new client.Registry();
7+
const logger = createLogger();
8+
9+
const DEFAULT_PUSH_INTERVAL_MS = 10_000;
10+
const ID_SIZE = 5;
11+
const METRICS_JOB_NAME = 'workers';
12+
13+
let pushInterval: NodeJS.Timeout | null = null;
14+
let currentWorkerName = '';
615

716
client.collectDefaultMetrics({ register });
817

918
export { register, client };
1019

1120
/**
12-
* Start periodic push to pushgateway
21+
* Parse push interval from environment.
22+
*/
23+
function getPushIntervalMs(): number {
24+
const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL;
25+
const parsedInterval = rawInterval === undefined
26+
? DEFAULT_PUSH_INTERVAL_MS
27+
: Number(rawInterval);
28+
29+
const interval = Number.isFinite(parsedInterval) && parsedInterval > 0
30+
? parsedInterval
31+
: DEFAULT_PUSH_INTERVAL_MS;
32+
33+
if (rawInterval !== undefined && interval !== parsedInterval) {
34+
logger.warn(`[metrics] invalid PROMETHEUS_PUSHGATEWAY_INTERVAL="${rawInterval}", fallback to ${DEFAULT_PUSH_INTERVAL_MS}ms`);
35+
}
36+
37+
return interval;
38+
}
39+
40+
/**
41+
* Stop periodic push to pushgateway.
42+
*/
43+
export function stopMetricsPushing(): void {
44+
if (!pushInterval) {
45+
return;
46+
}
47+
48+
clearInterval(pushInterval);
49+
pushInterval = null;
50+
logger.info(`[metrics] stopped pushing metrics for worker=${currentWorkerName}`);
51+
currentWorkerName = '';
52+
}
53+
54+
/**
55+
* Start periodic push to pushgateway.
1356
*
14-
* @param workerName - name of the worker for grouping
57+
* @param workerName - name of the worker for grouping.
1558
*/
16-
export function startMetricsPushing(workerName: string): void {
59+
export function startMetricsPushing(workerName: string): () => void {
1760
const url = process.env.PROMETHEUS_PUSHGATEWAY_URL;
18-
const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000');
1961

2062
if (!url) {
21-
return;
63+
return stopMetricsPushing;
2264
}
2365

66+
if (pushInterval) {
67+
logger.warn(`[metrics] pushing is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`);
68+
69+
return stopMetricsPushing;
70+
}
71+
72+
const interval = getPushIntervalMs();
2473
const hostname = os.hostname();
25-
const ID_SIZE = 5;
2674
const id = nanoid(ID_SIZE);
27-
2875
const gateway = new client.Pushgateway(url, [], register);
2976

30-
console.log(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id})`);
77+
currentWorkerName = workerName;
78+
79+
logger.info(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id}, worker: ${workerName})`);
3180

32-
setInterval(() => {
33-
gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => {
81+
pushInterval = setInterval(() => {
82+
gateway.pushAdd({
83+
jobName: METRICS_JOB_NAME,
84+
groupings: {
85+
worker: workerName,
86+
host: hostname,
87+
id,
88+
},
89+
}, (err) => {
3490
if (err) {
35-
console.error('Metrics push error:', err);
91+
logger.error(`Metrics push error: ${err.message || err}`);
3692
}
3793
});
3894
}, interval);
95+
96+
return stopMetricsPushing;
3997
}

runner.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ class WorkerRunner {
4141
// private gateway?: promClient.Pushgateway;
4242

4343
/**
44-
* number returned by setInterval() of metrics push function
44+
* Metrics push cleanup callback.
4545
*/
46-
private pushIntervalNumber?: ReturnType<typeof setInterval>;
46+
private stopMetricsPushing?: () => void;
4747

4848
/**
4949
* Create runner instance
@@ -86,9 +86,21 @@ class WorkerRunner {
8686
return;
8787
}
8888

89-
this.workers.forEach((worker) => {
90-
startMetricsPushing(worker.type.replace('/', '_'));
91-
});
89+
if (this.workers.length === 0) {
90+
return;
91+
}
92+
93+
const workerTypes = Array.from(new Set(this.workers.map((worker) => {
94+
return worker.type.replace('/', '_');
95+
})));
96+
97+
const workerTypeForMetrics = workerTypes.length === 1 ? workerTypes[0] : 'multi_worker_process';
98+
99+
if (workerTypes.length > 1) {
100+
console.warn(`[metrics] ${workerTypes.length} workers are running in one process; pushing metrics as "${workerTypeForMetrics}" to avoid duplicated attribution`);
101+
}
102+
103+
this.stopMetricsPushing = startMetricsPushing(workerTypeForMetrics);
92104
}
93105

94106
/**
@@ -224,7 +236,8 @@ class WorkerRunner {
224236
private async stopWorker(worker: Worker): Promise<void> {
225237
try {
226238
// stop pushing metrics
227-
clearInterval(this.pushIntervalNumber);
239+
this.stopMetricsPushing?.();
240+
this.stopMetricsPushing = undefined;
228241
await worker.finish();
229242

230243
console.log(

0 commit comments

Comments
 (0)