|
18 | 18 |
|
19 | 19 | import static io.mantisrx.server.core.utils.StatusConstants.STATUS_MESSAGE_FORMAT; |
20 | 20 |
|
21 | | -import com.netflix.spectator.api.Tag; |
22 | | -import io.mantisrx.common.metrics.Metrics; |
23 | | -import io.mantisrx.common.metrics.MetricsRegistry; |
| 21 | +import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory; |
24 | 22 | import io.mantisrx.runtime.Context; |
25 | 23 | import io.mantisrx.runtime.Job; |
26 | 24 | import io.mantisrx.runtime.MantisJobState; |
@@ -160,14 +158,13 @@ public void signalFailed(Throwable t) { |
160 | 158 | logger.error("Worker failure detected, shutting down job: {}", jobId, t); |
161 | 159 | // Send failure metrics when data emission failed |
162 | 160 | if (t instanceof OnErrorThrowable) { |
163 | | - Metrics jobFailureMetrics = new Metrics.Builder() |
164 | | - .id(workerMonitorMetricId, Tag.of("jobId", this.jobId), |
165 | | - Tag.of("workerIndex", String.valueOf(this.workerIndex)), |
166 | | - Tag.of("stageNum", String.valueOf(this.stageNum))) |
167 | | - .addCounter(workerFailureMetricName) |
168 | | - .build(); |
169 | | - |
170 | | - MetricsRegistry.getInstance().registerAndGet(jobFailureMetrics).getCounter(workerFailureMetricName).increment(); |
| 161 | + SpectatorRegistryFactory.getRegistry() |
| 162 | + .counter("runningWorker_failure", |
| 163 | + "jobId", jobId, |
| 164 | + "workerIndex", String.valueOf(this.workerIndex), |
| 165 | + "stageNum", String.valueOf(this.stageNum) |
| 166 | + ) |
| 167 | + .increment(); |
171 | 168 | } |
172 | 169 |
|
173 | 170 | jobStatus.onNext(new Status(jobId, stageNum, workerIndex, workerNum, |
|
0 commit comments