diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ce950c987127e..e6f9e6e95ecc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -3355,6 +3355,12 @@ protected CompletableFuture internalEnableMigrationAsync(boolean migrated) .log("Successfully updated migration on namespace")); } + protected CompletableFuture internalGetMigrationAsync() { + return validateSuperUserAccessAsync() + .thenCompose(__ -> getLocalPolicies().getLocalPoliciesAsync(namespaceName)) + .thenApply(policiesOpt -> policiesOpt.map(localPolicies -> localPolicies.migrated).orElse(false)); + } + protected Policies getDefaultPolicesIfNull(Policies policies) { if (policies == null) { policies = new Policies(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 626c4b314a08d..3a50abf89e24d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -3419,7 +3419,7 @@ public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse, @POST @Path("/{tenant}/{namespace}/migration") - @ApiOperation(hidden = true, value = "Update migration for all topics in a namespace") + @ApiOperation(hidden = true, value = "Update the migration state for a namespace") @ApiResponses(value = { @ApiResponse(code = 200, message = "Operation successful"), @ApiResponse(code = 403, message = "Don't have admin permission"), @@ -3442,6 +3442,31 @@ public void enableMigration(@Suspended AsyncResponse asyncResponse, }); } + @GET + @Path("/{tenant}/{namespace}/migration") + @ApiOperation(hidden = true, value = "Get the migration state for a namespace", + response = Boolean.class) + @ApiResponses(value = { + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist")}) + public void getMigration(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalGetMigrationAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + log.error() + .attr("tenant", tenant) + .attr("namespace", namespace) + .exception(ex) + .log("Failed to get migration"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @POST @Path("/{tenant}/{namespace}/dispatcherPauseOnAckStatePersistent") @ApiOperation(value = "Set dispatcher pause on ack state persistent configuration for specified namespace.") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index c6f66e1498236..afd52914819cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -2312,25 +2312,25 @@ public void testMigratedInfoIsNotLostDuringOtherLocalPoliciesUpdate() throws Exc admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster)); admin.namespaces().updateMigrationState(namespace, true); - assertTrue(admin.namespaces().getPolicies(namespace).migrated); + assertTrue(admin.namespaces().getMigrationState(namespace)); String bookieAffinityGroupPrimary = "group1"; admin.namespaces().setBookieAffinityGroup(namespace, BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(bookieAffinityGroupPrimary).build()); assertEquals(admin.namespaces().getBookieAffinityGroup(namespace).getBookkeeperAffinityGroupPrimary(), bookieAffinityGroupPrimary); - assertTrue(admin.namespaces().getPolicies(namespace).migrated); + assertTrue(admin.namespaces().getMigrationState(namespace)); String namespaceAntiAffinityGroup = "group2"; admin.namespaces().setNamespaceAntiAffinityGroup(namespace, namespaceAntiAffinityGroup); assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), namespaceAntiAffinityGroup); - assertTrue(admin.namespaces().getPolicies(namespace).migrated); + assertTrue(admin.namespaces().getMigrationState(namespace)); admin.namespaces().deleteBookieAffinityGroup(namespace); - assertTrue(admin.namespaces().getPolicies(namespace).migrated); + assertTrue(admin.namespaces().getMigrationState(namespace)); admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace); - assertTrue(admin.namespaces().getPolicies(namespace).migrated); + assertTrue(admin.namespaces().getMigrationState(namespace)); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java index 913e395c2bf46..f5d7e7e64aa48 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java @@ -364,9 +364,9 @@ public void testEnableMigrationWithEmptyPolicies() throws Exception { assertEquals(bundlesData.getNumBundles(), conf.getDefaultNumberOfNamespaceBundles()); // 4.assert namespace enable migration - Policies policiesResp = (Policies) asyncRequests( - response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs)); - assertEquals(policiesResp.migrated, enableMigrationReq); + Boolean migratedResp = (Boolean) asyncRequests( + response -> namespaces.getMigration(response, testTenant, enableMigrationGroupNs)); + assertEquals(migratedResp, enableMigrationReq); } @Test @@ -389,9 +389,9 @@ public void testEnableMigrationWithExistBundlePolicies() throws Exception { assertEquals(bundlesData, policiesReq.bundles); // 4.assert namespace enable migration - Policies policiesResp = (Policies) asyncRequests( - response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs)); - assertEquals(policiesResp.migrated, enableMigrationReq); + Boolean migratedResp = (Boolean) asyncRequests( + response -> namespaces.getMigration(response, testTenant, enableMigrationGroupNs)); + assertEquals(migratedResp, enableMigrationReq); } @Test @@ -508,12 +508,18 @@ public void testEnableMigrationAndDisableMigration() throws Exception { // Enable migration asyncRequests(response -> namespaces.enableMigration(response, testTenant, enableMigrationGroupNs, true)); + Boolean migratedResp = (Boolean) asyncRequests( + response -> namespaces.getMigration(response, testTenant, enableMigrationGroupNs)); + assertTrue(migratedResp); Policies policiesResp = (Policies) asyncRequests( response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs)); assertTrue(policiesResp.migrated); // Disable migration asyncRequests(response -> namespaces.enableMigration(response, testTenant, enableMigrationGroupNs, false)); + migratedResp = (Boolean) asyncRequests( + response -> namespaces.getMigration(response, testTenant, enableMigrationGroupNs)); + assertFalse(migratedResp); policiesResp = (Policies) asyncRequests( response -> namespaces.getPolicies(response, testTenant, enableMigrationGroupNs)); assertFalse(policiesResp.migrated); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index d75a9d3bda668..697efa5a83026 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -1134,7 +1134,7 @@ public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subTyp pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); admin1.clusters().updateClusterMigration("r1", isClusterMigrate, migratedUrl); admin1.namespaces().updateMigrationState(namespace, isNamespaceMigrate); - assertEquals(admin1.namespaces().getPolicies(namespace).migrated, isNamespaceMigrate); + assertEquals(admin1.namespaces().getMigrationState(namespace), isNamespaceMigrate); log.info("update cluster migration called"); retryStrategically((test) -> { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 85a8d1d744ee0..f1127ef26cb8d 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -4725,9 +4725,7 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem CompletableFuture removeNamespaceEntryFiltersAsync(String namespace); /** - * Enable migration for all topics within a namespace. - *

- * Migrate all topics of a namespace to new broker. + * Update the migration state for a namespace. *

* Request example: * @@ -4748,6 +4746,30 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem */ void updateMigrationState(String namespace, boolean migrated) throws PulsarAdminException; + /** + * Get the migration state for a namespace. + * + * @param namespace + * Namespace name + * @return whether the namespace is marked as migrated + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + boolean getMigrationState(String namespace) throws PulsarAdminException; + + /** + * Get the migration state for a namespace asynchronously. + * + * @param namespace + * Namespace name + * @return whether the namespace is marked as migrated + */ + CompletableFuture getMigrationStateAsync(String namespace); + /** * Set DispatcherPauseOnAckStatePersistent for a namespace asynchronously. */ diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index c62ecd0b205c2..1ae2c281fda4d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1990,12 +1990,25 @@ public void updateMigrationState(String namespace, boolean migrated) throws Puls sync(() -> updateMigrationStateAsync(namespace, migrated)); } + @Override + public boolean getMigrationState(String namespace) throws PulsarAdminException { + return sync(() -> getMigrationStateAsync(namespace)); + } + public CompletableFuture updateMigrationStateAsync(String namespace, boolean migrated) { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, "migration"); return asyncPostRequest(path, Entity.entity(migrated, MediaType.APPLICATION_JSON)); } + @Override + public CompletableFuture getMigrationStateAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "migration"); + return asyncGetRequest(path, new FutureCallback() { + }); + } + private WebTarget namespacePath(NamespaceName namespace, String... parts) { WebTarget namespacePath = adminV2Namespaces.path(namespace.toString()); namespacePath = WebTargets.addParts(namespacePath, parts);