Closed
Description
This is happening inside a docker container based on:
apache/airflow:2.2.2-python3.9
The version of python inside the container is 3.9.9. We are trying to run a datahub recipe that reads information about consumer groups and it runs just fine on my macbook pro m1 (latest model) using python 3.9.7.
I wanted to increase the wakeup timeout, but it's not possible to do that using the admin client as it's not one of the valid options.
This error happens after we get the list of all consumer groups. When we try to describe all of them, we get the timeout:
354 def _send_request_to_node(self, node_id, request):
(...)
364 while not self._client.ready(node_id):
365 # poll until the connection to broker is ready, otherwise send()
366 # will fail with NodeNotReadyError
367 self._client.poll()
--> 368 return self._client.send(node_id, request)
..................................................
self = <kafka.admin.client.KafkaAdminClient object at 0x4015388310>
node_id = 6
request = GroupCoordinatorRequest_v0(consumer_group='some-group')
self._client.ready = <method 'KafkaClient.ready' of <kafka.client_async.KafkaClient object at 0x4103564fa0> client_async.py:395>
self._client.poll = <method 'KafkaClient.poll' of <kafka.client_async.KafkaClient object at 0x4103564fa0> client_async.py:550>
self._client.send = <method 'KafkaClient.send' of <kafka.client_async.KafkaClient object at 0x4103564fa0> client_async.py:515>
..................................................
File "/home/airflow/.local/lib/python3.9/site-packages/kafka/client_async.py", line 546, in send
515 def send(self, node_id, request, wakeup=True):
(...)
542 # Wakeup signal is useful in case another thread is
543 # blocked waiting for incoming network traffic while holding
544 # the client lock in poll().
545 if wakeup:
--> 546 self.wakeup()
547
..................................................
self = <kafka.client_async.KafkaClient object at 0x4103564fa0>
node_id = 6
request = GroupCoordinatorRequest_v0(consumer_group='some-group')
wakeup = True
self.wakeup = <method 'KafkaClient.wakeup' of <kafka.client_async.KafkaClient object at 0x4103564fa0> client_async.py:929>
..................................................
File "/home/airflow/.local/lib/python3.9/site-packages/kafka/client_async.py", line 935, in wakeup
929 def wakeup(self):
(...)
931 try:
932 self._wake_w.sendall(b'x')
933 except socket.timeout:
934 log.warning('Timeout to send to wakeup socket!')
--> 935 raise Errors.KafkaTimeoutError()
936 except socket.error:
..................................................
self = <kafka.client_async.KafkaClient object at 0x4103564fa0>
socket.timeout = <class 'socket.timeout'>
log.warning = <method 'Logger.warning' of <Logger kafka.client (WARNING)> __init__.py:1448>
Errors.KafkaTimeoutError = <class 'kafka.errors.KafkaTimeoutError'>
socket.error = <class 'OSError'>
Metadata
Metadata
Assignees
Labels
No labels