Skip to content

"Timeout to send to wakeup socket!" when trying to describe consumer group #2286

Closed
@lvicentesanchez

Description

@lvicentesanchez

This is happening inside a docker container based on:

apache/airflow:2.2.2-python3.9

The version of python inside the container is 3.9.9. We are trying to run a datahub recipe that reads information about consumer groups and it runs just fine on my macbook pro m1 (latest model) using python 3.9.7.

I wanted to increase the wakeup timeout, but it's not possible to do that using the admin client as it's not one of the valid options.

This error happens after we get the list of all consumer groups. When we try to describe all of them, we get the timeout:

    354  def _send_request_to_node(self, node_id, request):
 (...)
    364      while not self._client.ready(node_id):
    365          # poll until the connection to broker is ready, otherwise send()
    366          # will fail with NodeNotReadyError
    367          self._client.poll()
--> 368      return self._client.send(node_id, request)
    ..................................................
     self = <kafka.admin.client.KafkaAdminClient object at 0x4015388310>
     node_id = 6
     request = GroupCoordinatorRequest_v0(consumer_group='some-group')
     self._client.ready = <method 'KafkaClient.ready' of <kafka.client_async.KafkaClient object at 0x4103564fa0> client_async.py:395>
     self._client.poll = <method 'KafkaClient.poll' of <kafka.client_async.KafkaClient object at 0x4103564fa0> client_async.py:550>
     self._client.send = <method 'KafkaClient.send' of <kafka.client_async.KafkaClient object at 0x4103564fa0> client_async.py:515>
    ..................................................

File "/home/airflow/.local/lib/python3.9/site-packages/kafka/client_async.py", line 546, in send
    515  def send(self, node_id, request, wakeup=True):
 (...)
    542      # Wakeup signal is useful in case another thread is
    543      # blocked waiting for incoming network traffic while holding
    544      # the client lock in poll().
    545      if wakeup:
--> 546          self.wakeup()
    547
    ..................................................
     self = <kafka.client_async.KafkaClient object at 0x4103564fa0>
     node_id = 6
     request = GroupCoordinatorRequest_v0(consumer_group='some-group')
     wakeup = True
     self.wakeup = <method 'KafkaClient.wakeup' of <kafka.client_async.KafkaClient object at 0x4103564fa0> client_async.py:929>
    ..................................................

File "/home/airflow/.local/lib/python3.9/site-packages/kafka/client_async.py", line 935, in wakeup
    929  def wakeup(self):
 (...)
    931          try:
    932              self._wake_w.sendall(b'x')
    933          except socket.timeout:
    934              log.warning('Timeout to send to wakeup socket!')
--> 935              raise Errors.KafkaTimeoutError()
    936          except socket.error:
    ..................................................
     self = <kafka.client_async.KafkaClient object at 0x4103564fa0>
     socket.timeout = <class 'socket.timeout'>
     log.warning = <method 'Logger.warning' of <Logger kafka.client (WARNING)> __init__.py:1448>
     Errors.KafkaTimeoutError = <class 'kafka.errors.KafkaTimeoutError'>
     socket.error = <class 'OSError'>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions