|
18 | 18 |
|
19 | 19 | import static io.mantisrx.server.core.utils.StatusConstants.STATUS_MESSAGE_FORMAT; |
20 | 20 |
|
| 21 | +import com.netflix.spectator.api.Tag; |
| 22 | +import io.mantisrx.common.metrics.Metrics; |
| 23 | +import io.mantisrx.common.metrics.MetricsRegistry; |
21 | 24 | import io.mantisrx.runtime.Context; |
22 | 25 | import io.mantisrx.runtime.Job; |
23 | 26 | import io.mantisrx.runtime.MantisJobState; |
|
28 | 31 | import io.mantisrx.server.core.JobSchedulingInfo; |
29 | 32 | import io.mantisrx.server.core.Status; |
30 | 33 | import io.mantisrx.server.core.Status.TYPE; |
31 | | -import java.util.Iterator; |
| 34 | + |
| 35 | +import java.util.*; |
32 | 36 | import java.util.concurrent.CountDownLatch; |
33 | 37 | import org.slf4j.Logger; |
34 | 38 | import org.slf4j.LoggerFactory; |
35 | 39 | import rx.Observable; |
36 | 40 | import rx.Observer; |
| 41 | +import rx.exceptions.OnErrorThrowable; |
37 | 42 | import rx.functions.Action0; |
38 | 43 | import rx.functions.Action1; |
39 | 44 | import rx.subjects.PublishSubject; |
|
43 | 48 | public class RunningWorker { |
44 | 49 |
|
45 | 50 | private static final Logger logger = LoggerFactory.getLogger(RunningWorker.class); |
| 51 | + private static final String workerFailureMetricName = "workerFailure"; |
| 52 | + private static final String workerMonitorMetricId = "runningWorkerMonitor"; |
46 | 53 | private final int totalStagesNet; |
47 | 54 | private Action0 onTerminateCallback; |
48 | 55 | private Action0 onCompleteCallback; |
@@ -150,7 +157,19 @@ public void signalCompleted() { |
150 | 157 | public void signalFailed(Throwable t) { |
151 | 158 | logger.info("JobId: " + jobId + ", stage: " + stageNum + " workerIndex: " + workerIndex + " workerNumber: " + workerNum + "," |
152 | 159 | + " signaling failed"); |
153 | | - logger.error("Worker failure detected, shutting down job", t); |
| 160 | + logger.error("Worker failure detected, shutting down job: {}", jobId, t); |
| 161 | + // Send failure metrics when data emission failed |
| 162 | + if (t instanceof OnErrorThrowable) { |
| 163 | + Metrics jobFailureMetrics = new Metrics.Builder() |
| 164 | + .id(workerMonitorMetricId, Tag.of("jobId", this.jobId), |
| 165 | + Tag.of("workerIndex", String.valueOf(this.workerIndex)), |
| 166 | + Tag.of("stageNum", String.valueOf(this.stageNum))) |
| 167 | + .addCounter(workerFailureMetricName) |
| 168 | + .build(); |
| 169 | + |
| 170 | + MetricsRegistry.getInstance().registerAndGet(jobFailureMetrics).getCounter(workerFailureMetricName).increment(); |
| 171 | + } |
| 172 | + |
154 | 173 | jobStatus.onNext(new Status(jobId, stageNum, workerIndex, workerNum, |
155 | 174 | TYPE.INFO, String.format(STATUS_MESSAGE_FORMAT, stageNum, workerIndex, workerNum, "failed. error: " + t.getMessage()), |
156 | 175 | MantisJobState.Failed)); |
|
0 commit comments