Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions module_utils/kafka_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,34 @@ def __init__(self, **configs):
self.connect_max_retry = configs.pop('connect_max_retry', 50)
kafka.client_async.BrokerConnection = CustomBrokerConnection
self.client = kafka.client_async.KafkaClient(**configs)

# Check connection to at least one broker before proceeding
# This prevents indefinite hanging when broker is unreachable
brokers = self.client.cluster.brokers()
if not brokers:
self.close()
raise UnableToRefreshState(
'No brokers available. Is your Kafka server running and '
'available on \'%s\'?' % configs.get('bootstrap_servers')
)

connected = False
for broker in brokers:
if self.connection_check(broker.nodeId):
connected = True
break

if not connected:
self.close()
raise UnableToRefreshState(
'Unable to connect to any broker. Is your Kafka server '
'running and available on \'%s\' with security protocol '
'\'%s\'?' % (
configs.get('bootstrap_servers'),
configs.get('security_protocol', 'PLAINTEXT')
)
)

self.refresh()

def init_zk_client(self):
Expand Down
Loading