diff --git a/kafka/client_async.py b/kafka/client_async.py index ea5e606cb..f8919e028 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -204,8 +204,9 @@ def __init__(self, **configs): # these properties need to be set on top of the initialization pipeline # because they are used when __del__ method is called self._closed = False - self._wake_r, self._wake_w = socket.socketpair() self._selector = self.config['selector']() + self._init_wakeup_socketpair() + self._wake_lock = threading.Lock() self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata @@ -217,9 +218,6 @@ def __init__(self, **configs): self._refresh_on_disconnects = True self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._wake_r.setblocking(False) - self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) - self._wake_lock = threading.Lock() self._lock = threading.RLock() @@ -228,7 +226,6 @@ def __init__(self, **configs): # lock above. self._pending_completion = collections.deque() - self._selector.register(self._wake_r, selectors.EVENT_READ) self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) self._sensors = None if self.config['metrics']: @@ -243,6 +240,25 @@ def __init__(self, **configs): check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 self.config['api_version'] = self.check_version(timeout=check_timeout) + def _init_wakeup_socketpair(self): + self._wake_r, self._wake_w = socket.socketpair() + self._wake_r.setblocking(False) + self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) + self._waking = False + self._selector.register(self._wake_r, selectors.EVENT_READ) + + def _close_wakeup_socketpair(self): + if self._wake_r is not None: + try: + self._selector.unregister(self._wake_r) + except KeyError: + pass + self._wake_r.close() + if self._wake_w is not None: + self._wake_w.close() + self._wake_r = None + self._wake_w = None + def _can_bootstrap(self): effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts backoff_factor = 2 ** effective_failures @@ -416,9 +432,8 @@ def connected(self, node_id): def _close(self): if not self._closed: self._closed = True - self._wake_r.close() - self._wake_w.close() self._selector.close() + self._close_wakeup_socketpair() def close(self, node_id=None): """Close one or all broker connections. @@ -944,22 +959,34 @@ def check_version(self, node_id=None, timeout=2, strict=False): raise Errors.NoBrokersAvailable() def wakeup(self): + if self._waking or self._wake_w is None: + return with self._wake_lock: try: self._wake_w.sendall(b'x') - except socket.timeout: + self._waking = True + except socket.timeout as e: log.warning('Timeout to send to wakeup socket!') - raise Errors.KafkaTimeoutError() - except socket.error: - log.warning('Unable to send to wakeup socket!') + raise Errors.KafkaTimeoutError(e) + except socket.error as e: + log.warning('Unable to send to wakeup socket! %s', e) + raise e def _clear_wake_fd(self): # reading from wake socket should only happen in a single thread - while True: - try: - self._wake_r.recv(1024) - except socket.error: - break + with self._wake_lock: + self._waking = False + while True: + try: + if not self._wake_r.recv(1024): + # Non-blocking socket returns empty on error + log.warning("Error reading wakeup socket. Rebuilding socketpair.") + self._close_wakeup_socketpair() + self._init_wakeup_socketpair() + break + except socket.error: + # Non-blocking socket raises when socket is ok but no data available to read + break def _maybe_close_oldest_connection(self): expired_connection = self._idle_expiry_manager.poll_expired_connection()