Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3355,6 +3355,12 @@ protected CompletableFuture<Void> internalEnableMigrationAsync(boolean migrated)
.log("Successfully updated migration on namespace"));
}

protected CompletableFuture<Boolean> internalGetMigrationAsync() {
return validateSuperUserAccessAsync()
.thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName))
.thenApply(policiesOpt -> policiesOpt.map(localPolicies -> localPolicies.migrated).orElse(false));
}

protected Policies getDefaultPolicesIfNull(Policies policies) {
if (policies == null) {
policies = new Policies();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3419,7 +3419,7 @@ public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse,

@POST
@Path("/{tenant}/{namespace}/migration")
@ApiOperation(hidden = true, value = "Update migration for all topics in a namespace")
@ApiOperation(hidden = true, value = "Update the migration state for a namespace")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
Expand All @@ -3442,6 +3442,31 @@ public void enableMigration(@Suspended AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/migration")
@ApiOperation(hidden = true, value = "Get the migration state for a namespace",
response = Boolean.class)
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")})
public void getMigration(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalGetMigrationAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error()
.attr("tenant", tenant)
.attr("namespace", namespace)
.exception(ex)
.log("Failed to get migration");
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent")
@ApiOperation(value = "Set dispatcher pause on ack state persistent configuration for specified namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2312,25 +2312,25 @@ public void testMigratedInfoIsNotLostDuringOtherLocalPoliciesUpdate() throws Exc
admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster));

admin.namespaces().updateMigrationState(namespace, true);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);
assertTrue(admin.namespaces().getMigrationState(namespace));

String bookieAffinityGroupPrimary = "group1";
admin.namespaces().setBookieAffinityGroup(namespace,
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(bookieAffinityGroupPrimary).build());
assertEquals(admin.namespaces().getBookieAffinityGroup(namespace).getBookkeeperAffinityGroupPrimary(),
bookieAffinityGroupPrimary);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);
assertTrue(admin.namespaces().getMigrationState(namespace));

String namespaceAntiAffinityGroup = "group2";
admin.namespaces().setNamespaceAntiAffinityGroup(namespace, namespaceAntiAffinityGroup);
assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), namespaceAntiAffinityGroup);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);
assertTrue(admin.namespaces().getMigrationState(namespace));

admin.namespaces().deleteBookieAffinityGroup(namespace);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);
assertTrue(admin.namespaces().getMigrationState(namespace));

admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
assertTrue(admin.namespaces().getPolicies(namespace).migrated);
assertTrue(admin.namespaces().getMigrationState(namespace));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ public void testEnableMigrationWithEmptyPolicies() throws Exception {
assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles());

// 4.assert namespace enable migration
Policies policiesResp = (Policies) asyncRequests(
response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs));
assertEquals(policiesResp.migrated, enableMigrationReq);
Boolean migratedResp = (Boolean) asyncRequests(
response -> namespaces.getMigration(response, testTenant, enableMigrationGroupNs));
assertEquals(migratedResp, enableMigrationReq);
}

@Test
Expand All @@ -389,9 +389,9 @@ public void testEnableMigrationWithExistBundlePolicies() throws Exception {
assertEquals(bundlesData, policiesReq.bundles);

// 4.assert namespace enable migration
Policies policiesResp = (Policies) asyncRequests(
response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs));
assertEquals(policiesResp.migrated, enableMigrationReq);
Boolean migratedResp = (Boolean) asyncRequests(
response -> namespaces.getMigration(response, testTenant, enableMigrationGroupNs));
assertEquals(migratedResp, enableMigrationReq);
}

@Test
Expand Down Expand Up @@ -508,12 +508,18 @@ public void testEnableMigrationAndDisableMigration() throws Exception {

// Enable migration
asyncRequests(response -> namespaces.enableMigration(response, testTenant, enableMigrationGroupNs, true));
Boolean migratedResp = (Boolean) asyncRequests(
response -> namespaces.getMigration(response, testTenant, enableMigrationGroupNs));
assertTrue(migratedResp);
Policies policiesResp = (Policies) asyncRequests(
response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs));
assertTrue(policiesResp.migrated);

// Disable migration
asyncRequests(response -> namespaces.enableMigration(response, testTenant, enableMigrationGroupNs, false));
migratedResp = (Boolean) asyncRequests(
response -> namespaces.getMigration(response, testTenant, enableMigrationGroupNs));
assertFalse(migratedResp);
policiesResp = (Policies) asyncRequests(
response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs));
assertFalse(policiesResp.migrated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subTyp
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", isClusterMigrate, migratedUrl);
admin1.namespaces().updateMigrationState(namespace, isNamespaceMigrate);
assertEquals(admin1.namespaces().getPolicies(namespace).migrated, isNamespaceMigrate);
assertEquals(admin1.namespaces().getMigrationState(namespace), isNamespaceMigrate);
log.info("update cluster migration called");

retryStrategically((test) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4725,9 +4725,7 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem
CompletableFuture<Void> removeNamespaceEntryFiltersAsync(String namespace);

/**
* Enable migration for all topics within a namespace.
* <p/>
* Migrate all topics of a namespace to new broker.
* Update the migration state for a namespace.
* <p/>
* Request example:
*
Expand All @@ -4748,6 +4746,30 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem
*/
void updateMigrationState(String namespace, boolean migrated) throws PulsarAdminException;

/**
* Get the migration state for a namespace.
*
* @param namespace
* Namespace name
* @return whether the namespace is marked as migrated
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
boolean getMigrationState(String namespace) throws PulsarAdminException;

/**
* Get the migration state for a namespace asynchronously.
*
* @param namespace
* Namespace name
* @return whether the namespace is marked as migrated
*/
CompletableFuture<Boolean> getMigrationStateAsync(String namespace);

/**
* Set DispatcherPauseOnAckStatePersistent for a namespace asynchronously.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1990,12 +1990,25 @@ public void updateMigrationState(String namespace, boolean migrated) throws Puls
sync(() -> updateMigrationStateAsync(namespace, migrated));
}

@Override
public boolean getMigrationState(String namespace) throws PulsarAdminException {
return sync(() -> getMigrationStateAsync(namespace));
}

public CompletableFuture<Void> updateMigrationStateAsync(String namespace, boolean migrated) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "migration");
return asyncPostRequest(path, Entity.entity(migrated, MediaType.APPLICATION_JSON));
}

@Override
public CompletableFuture<Boolean> getMigrationStateAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "migration");
return asyncGetRequest(path, new FutureCallback<Boolean>() {
});
}

private WebTarget namespacePath(NamespaceName namespace, String... parts) {
WebTarget namespacePath = adminV2Namespaces.path(namespace.toString());
namespacePath = WebTargets.addParts(namespacePath, parts);
Expand Down