Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,16 @@ public interface PulsarClientBuilder {
PulsarClient build() throws PulsarClientException;

/**
* Set the Pulsar service URL (e.g., {@code pulsar://localhost:6650}).
* Set the Pulsar service URL — the broker's binary-protocol endpoint.
*
* @param serviceUrl the Pulsar service URL to connect to
* <p>Must use {@code pulsar://} or {@code pulsar+ssl://}. The admin/web
* service URL ({@code http://...} / {@code https://...}) is NOT accepted.
*
* @param serviceUrl the Pulsar broker service URL to connect to,
* e.g. {@code pulsar://localhost:6650}
* @return this builder instance for chaining
* @throws IllegalArgumentException if {@code serviceUrl} is null, blank, or
* does not use {@code pulsar://} / {@code pulsar+ssl://}
*/
PulsarClientBuilder serviceUrl(String serviceUrl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public PulsarClient build() throws PulsarClientException {

@Override
public PulsarClientBuilder serviceUrl(String serviceUrl) {
validatePulsarServiceUrl(serviceUrl, "serviceUrl");
conf.setServiceUrl(serviceUrl);
return this;
}
Expand Down Expand Up @@ -91,6 +92,7 @@ public PulsarClientBuilder connectionPolicy(ConnectionPolicy policy) {
conf.setNumIoThreads(policy.ioThreads());
conf.setNumListenerThreads(policy.callbackThreads());
if (policy.proxyServiceUrl() != null) {
validatePulsarServiceUrl(policy.proxyServiceUrl(), "ConnectionPolicy.proxyServiceUrl");
conf.setProxyServiceUrl(policy.proxyServiceUrl());
if (policy.proxyProtocol() != null) {
conf.setProxyProtocol(
Expand Down Expand Up @@ -142,4 +144,28 @@ public PulsarClientBuilder description(String description) {
conf.setDescription(description);
return this;
}

/**
* Reject anything that isn't the broker binary protocol. The most common
* mistake is passing the admin/web service URL ({@code http://...}) where a
* broker URL is expected — call that out specifically. The v4 client used to
* silently fail far downstream with cryptic connection errors; here we fail
* fast at configure time with a message the user can act on.
*/
private static void validatePulsarServiceUrl(String url, String fieldName) {
if (url == null || url.isBlank()) {
throw new IllegalArgumentException(fieldName + " must not be null or blank");
}
if (url.startsWith("pulsar://") || url.startsWith("pulsar+ssl://")) {
return;
}
if (url.startsWith("http://") || url.startsWith("https://")) {
throw new IllegalArgumentException(fieldName + " must use the broker binary protocol "
+ "(pulsar:// or pulsar+ssl://); got '" + url + "'. This looks like the admin/web "
+ "service URL — pass the broker service URL instead (typically port 6650, or "
+ "6651 for TLS).");
}
throw new IllegalArgumentException(fieldName + " must use the broker binary protocol "
+ "(pulsar:// or pulsar+ssl://); got '" + url + "'.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.v5;

import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.pulsar.client.api.v5.PulsarClient;
import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
import org.apache.pulsar.client.api.v5.config.ConnectionPolicy;
import org.testng.annotations.Test;

/**
* Service-URL validation on the V5 client builder. The v5 client only speaks the
* broker binary protocol, so {@code pulsar://} / {@code pulsar+ssl://} are the
* only valid schemes — anything else (especially the admin/web service URL) gets
* rejected at configure-time with a message that points to the right URL.
*/
public class PulsarClientBuilderV5Test {

@Test
public void testAcceptsPulsarScheme() {
// Must not throw — these are the valid forms.
PulsarClient.builder().serviceUrl("pulsar://localhost:6650");
PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:6651");
PulsarClient.builder().serviceUrl("pulsar://h1:6650,h2:6650,h3:6650");
}

@Test
public void testRejectsHttpWithGuidance() {
IllegalArgumentException e = assertThrowsIAE(() ->
PulsarClient.builder().serviceUrl("http://localhost:8080"));
assertTrue(e.getMessage().contains("pulsar://"),
"error must point at the correct scheme: " + e.getMessage());
assertTrue(e.getMessage().toLowerCase().contains("admin")
|| e.getMessage().toLowerCase().contains("web"),
"error must call out the http→admin-URL confusion: " + e.getMessage());
assertTrue(e.getMessage().contains("6650"),
"error must hint at the broker port: " + e.getMessage());
}

@Test
public void testRejectsHttpsWithGuidance() {
IllegalArgumentException e = assertThrowsIAE(() ->
PulsarClient.builder().serviceUrl("https://localhost:8443"));
assertTrue(e.getMessage().contains("pulsar+ssl://"),
"error must mention the TLS broker scheme: " + e.getMessage());
}

@Test
public void testRejectsUnknownScheme() {
IllegalArgumentException e = assertThrowsIAE(() ->
PulsarClient.builder().serviceUrl("ws://localhost:6650"));
assertTrue(e.getMessage().contains("pulsar://"),
"error must point at the correct scheme: " + e.getMessage());
}

@Test
public void testRejectsNullAndBlank() {
assertThrows(IllegalArgumentException.class,
() -> PulsarClient.builder().serviceUrl(null));
assertThrows(IllegalArgumentException.class,
() -> PulsarClient.builder().serviceUrl(""));
assertThrows(IllegalArgumentException.class,
() -> PulsarClient.builder().serviceUrl(" "));
}

@Test
public void testProxyServiceUrlIsValidatedToo() {
PulsarClientBuilder builder = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650");

ConnectionPolicy badProxy = ConnectionPolicy.builder()
.proxy("http://proxy:8080", null)
.build();

IllegalArgumentException e = assertThrowsIAE(() -> builder.connectionPolicy(badProxy));
assertTrue(e.getMessage().contains("proxyServiceUrl"),
"error must name the offending field: " + e.getMessage());
}

private static IllegalArgumentException assertThrowsIAE(Runnable r) {
try {
r.run();
fail("expected IllegalArgumentException");
return null; // unreachable
} catch (IllegalArgumentException e) {
return e;
}
}
}
Loading