Skip to content

Commit f98141d

Browse files
committed
test fix
1 parent b3a5da2 commit f98141d

File tree

3 files changed

+19
-11
lines changed

3 files changed

+19
-11
lines changed

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterScalerActor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private void onGetClusterIdleInstancesResponse(GetClusterIdleInstancesResponse r
309309
private void onTriggerClusterUsageRequest(TriggerClusterUsageRequest req) {
310310
log.trace("Requesting cluster usage: {}", this.clusterId);
311311
if (this.skuToRuleMap.isEmpty()) {
312-
log.info("{} scaler is disabled due to no rules", this.clusterId);
312+
log.debug("{} scaler is disabled due to no rules", this.clusterId);
313313
return;
314314
}
315315

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/TaskExecutorState.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import io.mantisrx.master.resourcecluster.ResourceClusterActor.AvailabilityState;
2121
import io.mantisrx.master.resourcecluster.ResourceClusterActor.Pending;
2222
import io.mantisrx.master.resourcecluster.ResourceClusterActor.Running;
23+
import io.mantisrx.master.jobcluster.job.worker.WorkerState;
24+
import io.mantisrx.master.jobcluster.job.worker.WorkerTerminate;
2325
import io.mantisrx.server.core.domain.WorkerId;
26+
import io.mantisrx.server.core.JobCompletedReason;
2427
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
2528
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
2629
import io.mantisrx.server.master.resourcecluster.TaskExecutorReport;
@@ -199,6 +202,14 @@ boolean onHeartbeat(TaskExecutorHeartbeat heartbeat)
199202
}
200203

201204
TaskExecutorReport report = heartbeat.getTaskExecutorReport();
205+
if (this.availabilityState instanceof Running && report instanceof Available) {
206+
WorkerId runningWorkerId = this.availabilityState.getWorkerId();
207+
if (runningWorkerId != null) {
208+
log.warn("Heartbeat indicates available while running {}. Marking worker as lost.", runningWorkerId);
209+
jobMessageRouter.routeWorkerEvent(
210+
new WorkerTerminate(runningWorkerId, WorkerState.Failed, JobCompletedReason.Lost));
211+
}
212+
}
202213
if (this.cancelledWorkerOnTask != null) {
203214
if (report instanceof Occupied && ((Occupied) report).getWorkerId().equals(this.cancelledWorkerOnTask)) {
204215
log.warn("{} cancelled, request cancel on heartbeat.", this.cancelledWorkerOnTask);

mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,8 @@ public void testMarkTaskCancelled() throws Exception {
269269
doReturn(ImmutableList.of())
270270
.when(mantisJobStore)
271271
.getJobArtifactsToCache(CLUSTER_ID);
272-
doReturn(TASK_EXECUTOR_REGISTRATION)
273-
.when(mantisJobStore)
274-
.getTaskExecutor(TASK_EXECUTOR_ID);
272+
when(mantisJobStore.getTaskExecutor(TASK_EXECUTOR_ID))
273+
.thenReturn(TASK_EXECUTOR_REGISTRATION);
275274

276275
assertEquals(
277276
Ack.getInstance(),
@@ -316,9 +315,8 @@ public void testInitializationAfterRestart() throws Exception {
316315
doReturn(ImmutableList.of())
317316
.when(mantisJobStore)
318317
.getJobArtifactsToCache(CLUSTER_ID);
319-
doReturn(TASK_EXECUTOR_REGISTRATION)
320-
.when(mantisJobStore)
321-
.getTaskExecutor(TASK_EXECUTOR_ID);
318+
when(mantisJobStore.getTaskExecutor(ArgumentMatchers.any(TaskExecutorID.class)))
319+
.thenReturn(TASK_EXECUTOR_REGISTRATION);
322320
assertEquals(
323321
Ack.getInstance(),
324322
resourceCluster.initializeTaskExecutor(TASK_EXECUTOR_ID, WORKER_ID).get());
@@ -549,9 +547,8 @@ public void testAssignmentTimeout() throws Exception {
549547
assertEquals(ImmutableList.of(), resourceCluster.getAvailableTaskExecutors().get());
550548

551549
// re-register TE
552-
doReturn(new TaskExecutorRegistration(TASK_EXECUTOR_ID, CLUSTER_ID, "", "", null, MACHINE_DEFINITION, ATTRIBUTES))
553-
.when(mantisJobStore)
554-
.getTaskExecutor(TASK_EXECUTOR_ID);
550+
when(mantisJobStore.getTaskExecutor(TASK_EXECUTOR_ID))
551+
.thenReturn(new TaskExecutorRegistration(TASK_EXECUTOR_ID, CLUSTER_ID, "", "", null, MACHINE_DEFINITION, ATTRIBUTES));
555552
assertEquals(Ack.getInstance(),
556553
resourceCluster
557554
.heartBeatFromTaskExecutor(
@@ -802,7 +799,7 @@ public void testGetAssignedTaskExecutorAfterTaskCompletes() throws Throwable {
802799

803800
@Test
804801
public void testTaskExecutorIsDisabledEvenAfterRestart() throws Exception {
805-
doReturn(TASK_EXECUTOR_REGISTRATION).when(mantisJobStore).getTaskExecutor(TASK_EXECUTOR_ID);
802+
when(mantisJobStore.getTaskExecutor(TASK_EXECUTOR_ID)).thenReturn(TASK_EXECUTOR_REGISTRATION);
806803
doReturn(ImmutableList.of()).when(mantisJobStore).getJobArtifactsToCache(CLUSTER_ID);
807804

808805
resourceCluster.registerTaskExecutor(TASK_EXECUTOR_REGISTRATION);

0 commit comments

Comments
 (0)