Description
kafka-python version: 1.4.5
Steps to reproduce:
from kafka import KafkaConsumer
KafkaConsumer('topic', bootstrap_servers=['not_resolvable_kafka_hostname:9092'])
Expected: NoBrokersAvailable exception
Actual: hangs
log:
WARNING:kafka.conn:DNS lookup failed for kafkak:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
WARNING:kafka.conn:DNS lookup failed for kafkak:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
WARNING:kafka.conn:DNS lookup failed for kafkak:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
...
Backtrace on ctrl+C:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/vasya/prj/sedr_idea_test/kata-dev4.0-poc/tests/packages/kata-robot/src/kafka_client/robot_kafka_consumer.py", line 25, in __init__
auto_offset_reset='latest')
File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/consumer/group.py", line 344, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/client_async.py", line 230, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/client_async.py", line 858, in check_version
self._maybe_connect(try_node)
File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/client_async.py", line 362, in _maybe_connect
broker = self.cluster.broker_metadata(node_id)
File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/cluster.py", line 96, in broker_metadata
return next(self._bootstrap_brokers)
File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/cluster.py", line 75, in _generate_bootstrap_brokers
for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/conn.py", line 1276, in dns_lookup
socket.SOCK_STREAM)))
KeyboardInterrupt
The reason is infinite loop in cluster.py:69: def _generate_bootstrap_brokers(self):
def _generate_bootstrap_brokers(self):
# collect_hosts does not perform DNS, so we should be fine to re-use
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
while True:
for host, port, afi in bootstrap_hosts:
for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)