Skip to content

KafkaConsumer ctor hangs in case it can't resolve bootstrap server hostname #1751

Closed
@vasily-novikov

Description

@vasily-novikov

kafka-python version: 1.4.5

Steps to reproduce:

from kafka import KafkaConsumer
KafkaConsumer('topic', bootstrap_servers=['not_resolvable_kafka_hostname:9092'])

Expected: NoBrokersAvailable exception
Actual: hangs

log:
WARNING:kafka.conn:DNS lookup failed for kafkak:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
WARNING:kafka.conn:DNS lookup failed for kafkak:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
WARNING:kafka.conn:DNS lookup failed for kafkak:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?
...

Backtrace on ctrl+C:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/vasya/prj/sedr_idea_test/kata-dev4.0-poc/tests/packages/kata-robot/src/kafka_client/robot_kafka_consumer.py", line 25, in __init__
    auto_offset_reset='latest')
  File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/consumer/group.py", line 344, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/client_async.py", line 230, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/client_async.py", line 858, in check_version
    self._maybe_connect(try_node)
  File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/client_async.py", line 362, in _maybe_connect
    broker = self.cluster.broker_metadata(node_id)
  File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/cluster.py", line 96, in broker_metadata
    return next(self._bootstrap_brokers)
  File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/cluster.py", line 75, in _generate_bootstrap_brokers
    for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
  File "/home/vasya/prj/venv.sedr/lib/python3.6/site-packages/kafka/conn.py", line 1276, in dns_lookup
    socket.SOCK_STREAM)))
KeyboardInterrupt

The reason is infinite loop in cluster.py:69: def _generate_bootstrap_brokers(self):

    def _generate_bootstrap_brokers(self):
        # collect_hosts does not perform DNS, so we should be fine to re-use
        bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])

        while True:
            for host, port, afi in bootstrap_hosts:
                for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
                    yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions