Skip to content

Commit b3a5da2

Browse files
committed
fix tests
1 parent cfc1a93 commit b3a5da2

File tree

3 files changed

+124
-36
lines changed

3 files changed

+124
-36
lines changed

mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerActor.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,45 @@ public Receive createReceive() {
392392
}
393393

394394
private void onTaskExecutorInitialization(InitializeTaskExecutorRequest request) {
395-
log.debug("[Noop] InitializeTaskExecutorRequest {} for the resource cluster {}", request, clusterID);
395+
log.debug("InitializeTaskExecutorRequest {} for the resource cluster {}", request, clusterID);
396+
setupTaskExecutorStateIfNecessary(request.getTaskExecutorID());
397+
try {
398+
final TaskExecutorID taskExecutorID = request.getTaskExecutorID();
399+
final TaskExecutorState state = this.delegate.get(taskExecutorID);
400+
if (state.getRegistration() == null || !state.isRegistered()) {
401+
TaskExecutorRegistration registration = this.mantisJobStore.getTaskExecutor(taskExecutorID);
402+
if (registration != null) {
403+
log.debug("Found registration {} for task executor {}", registration, taskExecutorID);
404+
Preconditions.checkState(state.onRegistration(registration));
405+
if (isTaskExecutorDisabled(registration)) {
406+
log.info("Reconnected task executor {} was already marked for disabling.", registration.getTaskExecutorID());
407+
state.onNodeDisabled();
408+
}
409+
} else {
410+
log.warn("Received initialization from unknown task executor {}", taskExecutorID);
411+
sender().tell(new Status.Failure(new TaskExecutorNotFoundException(taskExecutorID)), self());
412+
return;
413+
}
414+
}
415+
416+
boolean stateChange = state.onTaskExecutorStatusChange(
417+
new TaskExecutorStatusChange(
418+
taskExecutorID,
419+
clusterID,
420+
TaskExecutorReport.occupied(request.getWorkerId())));
421+
if (stateChange) {
422+
if (state.isAvailable()) {
423+
this.delegate.tryMarkAvailable(taskExecutorID);
424+
} else {
425+
this.delegate.tryMarkUnavailable(taskExecutorID);
426+
}
427+
}
428+
429+
updateHeartbeatTimeout(taskExecutorID);
430+
sender().tell(Ack.getInstance(), self());
431+
} catch (Exception e) {
432+
sender().tell(new Status.Failure(e), self());
433+
}
396434
}
397435

398436
private void onTaskExecutorRegistration(TaskExecutorRegistration registration) {

mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -192,18 +192,24 @@ public static void teardown() {
192192
}
193193

194194
@Before
195-
public void setupRpcService() throws Exception {
195+
public void setupTest() throws Exception {
196+
setupRpcService();
197+
setupActor();
198+
}
199+
200+
private void setupRpcService() throws Exception {
196201
rpcService.registerGateway(TASK_EXECUTOR_ADDRESS, gateway);
197202
mantisJobStore = mock(MantisJobStore.class);
198203
jobMessageRouter = mock(JobMessageRouter.class);
199-
when(mantisJobStore.loadAllDisableTaskExecutorsRequests(ArgumentMatchers.any()))
200-
.thenReturn(ImmutableList.of());
201-
when(mantisJobStore.getJobArtifactsToCache(ArgumentMatchers.any()))
202-
.thenReturn(ImmutableList.of());
204+
doReturn(ImmutableList.of())
205+
.when(mantisJobStore)
206+
.loadAllDisableTaskExecutorsRequests(CLUSTER_ID);
207+
doReturn(ImmutableList.of())
208+
.when(mantisJobStore)
209+
.getJobArtifactsToCache(CLUSTER_ID);
203210
}
204211

205-
@Before
206-
public void setupActor() {
212+
private void setupActor() {
207213
MasterConfiguration masterConfig = mock(MasterConfiguration.class);
208214
when(masterConfig.getTimeoutSecondsToReportStart()).thenReturn(1);
209215
ExecuteStageRequestFactory executeStageRequestFactory = new ExecuteStageRequestFactory(masterConfig);
@@ -232,7 +238,7 @@ public void setupActor() {
232238
resourceClusterActor,
233239
Duration.ofSeconds(15),
234240
CLUSTER_ID,
235-
new LongDynamicProperty(propertiesLoader, "rate.limite.perSec", 10000L));
241+
new LongDynamicProperty(propertiesLoader, "resourcecluster.gateway.maxConcurrentRequests.test", 100000L));
236242
}
237243

238244
@Test
@@ -257,10 +263,15 @@ public void testMarkTaskCancelled() throws Exception {
257263
assertEquals(Ack.getInstance(), resourceCluster.registerTaskExecutor(TASK_EXECUTOR_REGISTRATION).get());
258264
assertEquals(ImmutableList.of(TASK_EXECUTOR_ID), resourceCluster.getRegisteredTaskExecutors().get());
259265

260-
when(mantisJobStore.loadAllDisableTaskExecutorsRequests(ArgumentMatchers.eq(CLUSTER_ID)))
261-
.thenReturn(ImmutableList.of());
262-
when(mantisJobStore.getJobArtifactsToCache(ArgumentMatchers.eq(CLUSTER_ID))).thenReturn(ImmutableList.of());
263-
when(mantisJobStore.getTaskExecutor(ArgumentMatchers.eq(TASK_EXECUTOR_ID))).thenReturn(TASK_EXECUTOR_REGISTRATION);
266+
doReturn(ImmutableList.of())
267+
.when(mantisJobStore)
268+
.loadAllDisableTaskExecutorsRequests(CLUSTER_ID);
269+
doReturn(ImmutableList.of())
270+
.when(mantisJobStore)
271+
.getJobArtifactsToCache(CLUSTER_ID);
272+
doReturn(TASK_EXECUTOR_REGISTRATION)
273+
.when(mantisJobStore)
274+
.getTaskExecutor(TASK_EXECUTOR_ID);
264275

265276
assertEquals(
266277
Ack.getInstance(),
@@ -299,10 +310,15 @@ public void testMarkTaskCancelled() throws Exception {
299310

300311
@Test
301312
public void testInitializationAfterRestart() throws Exception {
302-
when(mantisJobStore.loadAllDisableTaskExecutorsRequests(ArgumentMatchers.eq(CLUSTER_ID)))
303-
.thenReturn(ImmutableList.of());
304-
when(mantisJobStore.getJobArtifactsToCache(ArgumentMatchers.eq(CLUSTER_ID))).thenReturn(ImmutableList.of());
305-
when(mantisJobStore.getTaskExecutor(ArgumentMatchers.eq(TASK_EXECUTOR_ID))).thenReturn(TASK_EXECUTOR_REGISTRATION);
313+
doReturn(ImmutableList.of())
314+
.when(mantisJobStore)
315+
.loadAllDisableTaskExecutorsRequests(CLUSTER_ID);
316+
doReturn(ImmutableList.of())
317+
.when(mantisJobStore)
318+
.getJobArtifactsToCache(CLUSTER_ID);
319+
doReturn(TASK_EXECUTOR_REGISTRATION)
320+
.when(mantisJobStore)
321+
.getTaskExecutor(TASK_EXECUTOR_ID);
306322
assertEquals(
307323
Ack.getInstance(),
308324
resourceCluster.initializeTaskExecutor(TASK_EXECUTOR_ID, WORKER_ID).get());
@@ -319,7 +335,12 @@ public void testGetFreeTaskExecutors() throws Exception {
319335
TASK_EXECUTOR_ID,
320336
CLUSTER_ID,
321337
TaskExecutorReport.available())).get());
322-
final Set<TaskExecutorAllocationRequest> requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, SchedulingConstraints.of(MACHINE_DEFINITION), null, 0));
338+
final Set<TaskExecutorAllocationRequest> requests = Collections.singleton(
339+
TaskExecutorAllocationRequest.of(
340+
WORKER_ID,
341+
SchedulingConstraints.of(MACHINE_DEFINITION),
342+
JOB_METADATA,
343+
0));
323344
assertEquals(
324345
TASK_EXECUTOR_ID,
325346
resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get());
@@ -416,7 +437,12 @@ public void testGetTaskExecutorsUsageAndList() throws Exception {
416437
assertEquals(ImmutableList.of(TASK_EXECUTOR_ID_3, TASK_EXECUTOR_ID_2), idleInstancesResponse.getInstanceIds());
417438
assertEquals(CONTAINER_DEF_ID_2, idleInstancesResponse.getSkuId());
418439

419-
Set<TaskExecutorAllocationRequest> requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, SchedulingConstraints.of(MACHINE_DEFINITION_2), null, 0));
440+
Set<TaskExecutorAllocationRequest> requests = Collections.singleton(
441+
TaskExecutorAllocationRequest.of(
442+
WORKER_ID,
443+
SchedulingConstraints.of(MACHINE_DEFINITION_2),
444+
JOB_METADATA,
445+
0));
420446
assertEquals(
421447
TASK_EXECUTOR_ID_3,
422448
resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get());
@@ -451,7 +477,12 @@ public void testGetTaskExecutorsUsageAndList() throws Exception {
451477
assertEquals(ImmutableList.of(TASK_EXECUTOR_ID), idleInstancesResponse.getInstanceIds());
452478
assertEquals(CONTAINER_DEF_ID_1, idleInstancesResponse.getSkuId());
453479

454-
requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, SchedulingConstraints.of(MACHINE_DEFINITION), null, 0));
480+
requests = Collections.singleton(
481+
TaskExecutorAllocationRequest.of(
482+
WORKER_ID,
483+
SchedulingConstraints.of(MACHINE_DEFINITION),
484+
JOB_METADATA,
485+
0));
455486
assertEquals(
456487
TASK_EXECUTOR_ID_2,
457488
resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get());
@@ -518,8 +549,9 @@ public void testAssignmentTimeout() throws Exception {
518549
assertEquals(ImmutableList.of(), resourceCluster.getAvailableTaskExecutors().get());
519550

520551
// re-register TE
521-
when(mantisJobStore.getTaskExecutor(Matchers.eq(TASK_EXECUTOR_ID)))
522-
.thenReturn(new TaskExecutorRegistration(TASK_EXECUTOR_ID, CLUSTER_ID, "", "", null, MACHINE_DEFINITION, ATTRIBUTES));
552+
doReturn(new TaskExecutorRegistration(TASK_EXECUTOR_ID, CLUSTER_ID, "", "", null, MACHINE_DEFINITION, ATTRIBUTES))
553+
.when(mantisJobStore)
554+
.getTaskExecutor(TASK_EXECUTOR_ID);
523555
assertEquals(Ack.getInstance(),
524556
resourceCluster
525557
.heartBeatFromTaskExecutor(
@@ -571,7 +603,9 @@ public void testGetMultipleActiveJobs() throws ExecutionException, InterruptedEx
571603
}
572604

573605
expectedTaskExecutorIds.add(taskExecutorID);
574-
requests.add(TaskExecutorAllocationRequest.of(workerId, SchedulingConstraints.of(MACHINE_DEFINITION), null, 0));
606+
JobMetadata jobMetadata =
607+
new JobMetadata(workerId.getJobId(), null, null, 1, "testuser", null, ImmutableList.of(), -1, -1, -1);
608+
requests.add(TaskExecutorAllocationRequest.of(workerId, SchedulingConstraints.of(MACHINE_DEFINITION), jobMetadata, 0));
575609
}
576610
assertEquals(
577611
expectedTaskExecutorIds,
@@ -702,7 +736,12 @@ public void testIfDisabledTaskExecutorsAreNotAvailableForScheduling() throws Exc
702736
resourceCluster.heartBeatFromTaskExecutor(
703737
new TaskExecutorHeartbeat(TASK_EXECUTOR_ID_2, CLUSTER_ID, TaskExecutorReport.available())).get());
704738
resourceCluster.disableTaskExecutorsFor(ATTRIBUTES, Instant.now().plus(Duration.ofDays(1)), Optional.empty()).get();
705-
Set<TaskExecutorAllocationRequest> requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, SchedulingConstraints.of(MACHINE_DEFINITION), null, 0));
739+
Set<TaskExecutorAllocationRequest> requests = Collections.singleton(
740+
TaskExecutorAllocationRequest.of(
741+
WORKER_ID,
742+
SchedulingConstraints.of(MACHINE_DEFINITION),
743+
JOB_METADATA,
744+
0));
706745
assertEquals(
707746
TASK_EXECUTOR_ID_2,
708747
resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get());
@@ -737,7 +776,12 @@ public void testGetAssignedTaskExecutorAfterTaskCompletes() throws Throwable {
737776
new TaskExecutorHeartbeat(TASK_EXECUTOR_ID, CLUSTER_ID, TaskExecutorReport.available())).join());
738777

739778

740-
final Set<TaskExecutorAllocationRequest> requests = Collections.singleton(TaskExecutorAllocationRequest.of(WORKER_ID, SchedulingConstraints.of(MACHINE_DEFINITION), null, 0));
779+
final Set<TaskExecutorAllocationRequest> requests = Collections.singleton(
780+
TaskExecutorAllocationRequest.of(
781+
WORKER_ID,
782+
SchedulingConstraints.of(MACHINE_DEFINITION),
783+
JOB_METADATA,
784+
0));
741785
assertEquals(
742786
TASK_EXECUTOR_ID,
743787
resourceCluster.getTaskExecutorsFor(requests).get().values().stream().findFirst().get());
@@ -758,8 +802,8 @@ public void testGetAssignedTaskExecutorAfterTaskCompletes() throws Throwable {
758802

759803
@Test
760804
public void testTaskExecutorIsDisabledEvenAfterRestart() throws Exception {
761-
doReturn(TASK_EXECUTOR_REGISTRATION).when(mantisJobStore).getTaskExecutor(ArgumentMatchers.eq(TASK_EXECUTOR_ID));
762-
doReturn(ImmutableList.of()).when(mantisJobStore).getJobArtifactsToCache(ArgumentMatchers.eq(CLUSTER_ID));
805+
doReturn(TASK_EXECUTOR_REGISTRATION).when(mantisJobStore).getTaskExecutor(TASK_EXECUTOR_ID);
806+
doReturn(ImmutableList.of()).when(mantisJobStore).getJobArtifactsToCache(CLUSTER_ID);
763807

764808
resourceCluster.registerTaskExecutor(TASK_EXECUTOR_REGISTRATION);
765809

mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/TaskExecutorReconnectionIntegrationTest.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.junit.Assert.*;
2020
import static org.mockito.ArgumentMatchers.any;
2121
import static org.mockito.ArgumentMatchers.argThat;
22+
import static org.mockito.Mockito.doReturn;
2223
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.timeout;
2425
import static org.mockito.Mockito.times;
@@ -44,6 +45,8 @@
4445
import io.mantisrx.server.core.JobCompletedReason;
4546
import io.mantisrx.server.core.TestingRpcService;
4647
import io.mantisrx.server.core.domain.WorkerId;
48+
import io.mantisrx.server.master.ExecuteStageRequestFactory;
49+
import io.mantisrx.server.master.config.MasterConfiguration;
4750
import io.mantisrx.server.master.persistence.MantisJobStore;
4851
import io.mantisrx.server.master.resourcecluster.ClusterID;
4952
import io.mantisrx.server.master.resourcecluster.ContainerSkuID;
@@ -125,13 +128,16 @@ public void setupTest() throws Exception {
125128
.thenReturn(CompletableFuture.completedFuture(Ack.getInstance()));
126129

127130
// Setup required mocks for the MantisJobStore
128-
when(mantisJobStore.loadAllDisableTaskExecutorsRequests(any()))
129-
.thenReturn(ImmutableList.of());
130-
when(mantisJobStore.getJobArtifactsToCache(any()))
131-
.thenReturn(ImmutableList.of());
131+
doReturn(ImmutableList.of())
132+
.when(mantisJobStore)
133+
.loadAllDisableTaskExecutorsRequests(CLUSTER_ID);
134+
doReturn(ImmutableList.of())
135+
.when(mantisJobStore)
136+
.getJobArtifactsToCache(CLUSTER_ID);
132137

133-
LongDynamicProperty checkForDisabledExecutorsInterval =
134-
new LongDynamicProperty(propertiesLoader, "mantis.resourcecluster.disabledtaskexecutors.intervalms", Duration.ofSeconds(10).toMillis());
138+
MasterConfiguration masterConfig = mock(MasterConfiguration.class);
139+
when(masterConfig.getTimeoutSecondsToReportStart()).thenReturn(1);
140+
ExecuteStageRequestFactory executeStageRequestFactory = new ExecuteStageRequestFactory(masterConfig);
135141

136142
final Props props =
137143
ResourceClusterActor.props(
@@ -149,7 +155,7 @@ public void setupTest() throws Exception {
149155
false,
150156
ImmutableMap.of(),
151157
new CpuWeightedFitnessCalculator(),
152-
null,
158+
executeStageRequestFactory,
153159
false);
154160

155161
resourceClusterActor = actorSystem.actorOf(props);
@@ -158,7 +164,7 @@ public void setupTest() throws Exception {
158164
resourceClusterActor,
159165
Duration.ofSeconds(15),
160166
CLUSTER_ID,
161-
new LongDynamicProperty(propertiesLoader, "rate.limite.perSec", 10000L));
167+
new LongDynamicProperty(propertiesLoader, "resourcecluster.gateway.maxConcurrentRequests.test", 100000L));
162168
}
163169

164170
@Test
@@ -268,7 +274,7 @@ public void testTaskExecutorCrashDetectionViaHeartbeat() throws Exception {
268274
} catch (AssertionError e) {
269275
log.error("NOTE: TaskExecutor crash detection via heartbeat mismatch did not trigger WorkerTerminate event. " +
270276
"This may indicate the feature is not fully implemented yet: {}", e.getMessage());
271-
fail();
277+
fail(); //todo flaky test
272278
}
273279

274280
// Step 4: Verify TaskExecutor is available after heartbeat

0 commit comments

Comments
 (0)