Skip to content

Commit ca691b7

Browse files
committed
refactor(api): enhance metrics polling mechanism and simplify subscription handling
- Removed legacy polling methods from `MetricsResolver` and integrated polling logic into `SubscriptionPollingService` for better separation of concerns. - Updated `SubscriptionTrackerService` to support polling configuration directly, allowing for cleaner topic registration. - Adjusted unit tests to accommodate changes in the subscription handling and polling logic. - Introduced `SubscriptionPollingService` to manage polling intervals and ensure efficient execution of polling tasks.
1 parent 99da8bf commit ca691b7

File tree

7 files changed

+151
-55
lines changed

7 files changed

+151
-55
lines changed

api/src/unraid-api/graph/resolvers/metrics/metrics.resolver.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,12 @@ describe('MetricsResolver', () => {
174174
expect(subscriptionTracker.registerTopic).toHaveBeenCalledWith(
175175
'CPU_UTILIZATION',
176176
expect.any(Function),
177-
expect.any(Function)
177+
1000
178178
);
179179
expect(subscriptionTracker.registerTopic).toHaveBeenCalledWith(
180180
'MEMORY_UTILIZATION',
181181
expect.any(Function),
182-
expect.any(Function)
182+
2000
183183
);
184184
});
185185
});

api/src/unraid-api/graph/resolvers/metrics/metrics.resolver.ts

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subsc
1919

2020
@Resolver(() => Metrics)
2121
export class MetricsResolver implements OnModuleInit {
22-
private cpuPollingTimer: NodeJS.Timeout | undefined;
23-
private memoryPollingTimer: NodeJS.Timeout | undefined;
24-
private isCpuPollingInProgress = false;
25-
private isMemoryPollingInProgress = false;
26-
2722
constructor(
2823
private readonly cpuService: CpuService,
2924
private readonly memoryService: MemoryService,
@@ -32,59 +27,27 @@ export class MetricsResolver implements OnModuleInit {
3227
) {}
3328

3429
onModuleInit() {
30+
// Register CPU polling with 1 second interval
3531
this.subscriptionTracker.registerTopic(
3632
PUBSUB_CHANNEL.CPU_UTILIZATION,
37-
() => {
38-
this.pollCpuUtilization();
39-
this.cpuPollingTimer = setInterval(() => this.pollCpuUtilization(), 1000);
33+
async () => {
34+
const payload = await this.cpuService.generateCpuLoad();
35+
pubsub.publish(PUBSUB_CHANNEL.CPU_UTILIZATION, { systemMetricsCpu: payload });
4036
},
41-
() => {
42-
clearInterval(this.cpuPollingTimer);
43-
this.isCpuPollingInProgress = false;
44-
}
37+
1000
4538
);
4639

40+
// Register memory polling with 2 second interval
4741
this.subscriptionTracker.registerTopic(
4842
PUBSUB_CHANNEL.MEMORY_UTILIZATION,
49-
() => {
50-
this.pollMemoryUtilization();
51-
this.memoryPollingTimer = setInterval(() => this.pollMemoryUtilization(), 2000);
43+
async () => {
44+
const payload = await this.memoryService.generateMemoryLoad();
45+
pubsub.publish(PUBSUB_CHANNEL.MEMORY_UTILIZATION, { systemMetricsMemory: payload });
5246
},
53-
() => {
54-
clearInterval(this.memoryPollingTimer);
55-
this.isMemoryPollingInProgress = false;
56-
}
47+
2000
5748
);
5849
}
5950

60-
private async pollCpuUtilization(): Promise<void> {
61-
if (this.isCpuPollingInProgress) return;
62-
63-
this.isCpuPollingInProgress = true;
64-
try {
65-
const payload = await this.cpuService.generateCpuLoad();
66-
pubsub.publish(PUBSUB_CHANNEL.CPU_UTILIZATION, { systemMetricsCpu: payload });
67-
} catch (error) {
68-
console.error('Error polling CPU utilization:', error);
69-
} finally {
70-
this.isCpuPollingInProgress = false;
71-
}
72-
}
73-
74-
private async pollMemoryUtilization(): Promise<void> {
75-
if (this.isMemoryPollingInProgress) return;
76-
77-
this.isMemoryPollingInProgress = true;
78-
try {
79-
const payload = await this.memoryService.generateMemoryLoad();
80-
pubsub.publish(PUBSUB_CHANNEL.MEMORY_UTILIZATION, { systemMetricsMemory: payload });
81-
} catch (error) {
82-
console.error('Error polling memory utilization:', error);
83-
} finally {
84-
this.isMemoryPollingInProgress = false;
85-
}
86-
}
87-
8851
@Query(() => Metrics)
8952
@UsePermissions({
9053
action: AuthActionVerb.READ,
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import { Module } from '@nestjs/common';
2+
import { ScheduleModule } from '@nestjs/schedule';
23

34
import { SubscriptionHelperService } from '@app/unraid-api/graph/services/subscription-helper.service.js';
5+
import { SubscriptionPollingService } from '@app/unraid-api/graph/services/subscription-polling.service.js';
46
import { SubscriptionTrackerService } from '@app/unraid-api/graph/services/subscription-tracker.service.js';
57

68
@Module({
7-
providers: [SubscriptionTrackerService, SubscriptionHelperService],
8-
exports: [SubscriptionTrackerService, SubscriptionHelperService],
9+
imports: [ScheduleModule.forRoot()],
10+
providers: [SubscriptionTrackerService, SubscriptionHelperService, SubscriptionPollingService],
11+
exports: [SubscriptionTrackerService, SubscriptionHelperService, SubscriptionPollingService],
912
})
1013
export class ServicesModule {}

api/src/unraid-api/graph/services/subscription-helper.service.spec.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ describe('SubscriptionHelperService', () => {
1313
let loggerSpy: any;
1414

1515
beforeEach(() => {
16-
trackerService = new SubscriptionTrackerService();
16+
const mockPollingService = {
17+
startPolling: vi.fn(),
18+
stopPolling: vi.fn(),
19+
};
20+
trackerService = new SubscriptionTrackerService(mockPollingService as any);
1721
helperService = new SubscriptionHelperService(trackerService);
1822
loggerSpy = vi.spyOn(Logger.prototype, 'debug').mockImplementation(() => {});
1923
});
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
2+
import { SchedulerRegistry } from '@nestjs/schedule';
3+
4+
export interface PollingConfig {
5+
name: string;
6+
intervalMs: number;
7+
callback: () => Promise<void>;
8+
}
9+
10+
@Injectable()
11+
export class SubscriptionPollingService implements OnModuleDestroy {
12+
private readonly logger = new Logger(SubscriptionPollingService.name);
13+
private readonly activePollers = new Map<string, { isPolling: boolean }>();
14+
15+
constructor(private readonly schedulerRegistry: SchedulerRegistry) {}
16+
17+
onModuleDestroy() {
18+
this.stopAll();
19+
}
20+
21+
/**
22+
* Start polling for a specific subscription topic
23+
*/
24+
startPolling(config: PollingConfig): void {
25+
const { name, intervalMs, callback } = config;
26+
27+
// Clean up any existing interval
28+
this.stopPolling(name);
29+
30+
// Initialize polling state
31+
this.activePollers.set(name, { isPolling: false });
32+
33+
// Create the polling function with guard against overlapping executions
34+
const pollFunction = async () => {
35+
const poller = this.activePollers.get(name);
36+
if (!poller || poller.isPolling) {
37+
return;
38+
}
39+
40+
poller.isPolling = true;
41+
try {
42+
await callback();
43+
} catch (error) {
44+
this.logger.error(`Error in polling task '${name}'`, error);
45+
} finally {
46+
if (poller) {
47+
poller.isPolling = false;
48+
}
49+
}
50+
};
51+
52+
// Create and register the interval
53+
const interval = setInterval(pollFunction, intervalMs);
54+
this.schedulerRegistry.addInterval(name, interval);
55+
56+
this.logger.debug(`Started polling for '${name}' every ${intervalMs}ms`);
57+
}
58+
59+
/**
60+
* Stop polling for a specific subscription topic
61+
*/
62+
stopPolling(name: string): void {
63+
try {
64+
if (this.schedulerRegistry.doesExist('interval', name)) {
65+
this.schedulerRegistry.deleteInterval(name);
66+
this.logger.debug(`Stopped polling for '${name}'`);
67+
}
68+
} catch (error) {
69+
// Interval doesn't exist, which is fine
70+
}
71+
72+
// Clean up polling state
73+
this.activePollers.delete(name);
74+
}
75+
76+
/**
77+
* Stop all active polling tasks
78+
*/
79+
stopAll(): void {
80+
const intervals = this.schedulerRegistry.getIntervals();
81+
intervals.forEach((key) => this.stopPolling(key));
82+
this.activePollers.clear();
83+
}
84+
85+
/**
86+
* Check if polling is active for a given name
87+
*/
88+
isPolling(name: string): boolean {
89+
return this.schedulerRegistry.doesExist('interval', name);
90+
}
91+
}

api/src/unraid-api/graph/services/subscription-tracker.service.spec.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ describe('SubscriptionTrackerService', () => {
99
let loggerSpy: any;
1010

1111
beforeEach(() => {
12-
service = new SubscriptionTrackerService();
12+
const mockPollingService = {
13+
startPolling: vi.fn(),
14+
stopPolling: vi.fn(),
15+
};
16+
service = new SubscriptionTrackerService(mockPollingService as any);
1317
// Spy on logger methods
1418
loggerSpy = vi.spyOn(Logger.prototype, 'debug').mockImplementation(() => {});
1519
});

api/src/unraid-api/graph/services/subscription-tracker.service.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,44 @@
11
import { Injectable, Logger } from '@nestjs/common';
22

3+
import { SubscriptionPollingService } from '@app/unraid-api/graph/services/subscription-polling.service.js';
4+
35
@Injectable()
46
export class SubscriptionTrackerService {
57
private readonly logger = new Logger(SubscriptionTrackerService.name);
68
private subscriberCounts = new Map<string, number>();
79
private topicHandlers = new Map<string, { onStart: () => void; onStop: () => void }>();
810

9-
public registerTopic(topic: string, onStart: () => void, onStop: () => void): void {
10-
this.topicHandlers.set(topic, { onStart, onStop });
11+
constructor(private readonly pollingService: SubscriptionPollingService) {}
12+
13+
/**
14+
* Register a topic with optional polling support
15+
* @param topic The topic identifier
16+
* @param callbackOrOnStart The callback function to execute (can be async) OR onStart handler for legacy support
17+
* @param intervalMsOrOnStop Optional interval in ms for polling OR onStop handler for legacy support
18+
*/
19+
public registerTopic(
20+
topic: string,
21+
callbackOrOnStart: () => void | Promise<void>,
22+
intervalMsOrOnStop?: number | (() => void)
23+
): void {
24+
if (typeof intervalMsOrOnStop === 'number') {
25+
// New API: callback with polling interval
26+
const pollingConfig = {
27+
name: topic,
28+
intervalMs: intervalMsOrOnStop,
29+
callback: async () => callbackOrOnStart(),
30+
};
31+
this.topicHandlers.set(topic, {
32+
onStart: () => this.pollingService.startPolling(pollingConfig),
33+
onStop: () => this.pollingService.stopPolling(topic),
34+
});
35+
} else {
36+
// Legacy API: onStart and onStop handlers
37+
this.topicHandlers.set(topic, {
38+
onStart: callbackOrOnStart,
39+
onStop: intervalMsOrOnStop || (() => {}),
40+
});
41+
}
1142
}
1243

1344
public subscribe(topic: string): void {

0 commit comments

Comments
 (0)