Skip to content

DP-238 Test a fix for race condition with closed connection #1746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ def __init__(self, host, port, afi, **configs):
# including tracking request futures and timestamps, we
# can use a simple dictionary of correlation_id => request data
self.in_flight_requests = dict()
# Ensures that all in_flight_requests futures are properly closed on disconnect.
self._ifr_lock = threading.Lock()

self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
Expand Down Expand Up @@ -741,15 +743,19 @@ def close(self, error=None):
self.config['state_change_callback'](self)
self._update_reconnect_backoff()
self._close_socket()
self.state = ConnectionStates.DISCONNECTED
self._sasl_auth_future = None
self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
api_version=self.config['api_version'])

with self._ifr_lock:
self.state = ConnectionStates.DISCONNECTED
self._sasl_auth_future = None
self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
api_version=self.config['api_version'])
fail_ifrs = dict(self.in_flight_requests)
self.in_flight_requests.clear()

if error is None:
error = Errors.Cancelled(str(self))
while self.in_flight_requests:
(_correlation_id, (future, _timestamp)) = self.in_flight_requests.popitem()
for future, _timestamp in fail_ifrs.values():
future.failure(error)
self.config['state_change_callback'](self)

Expand All @@ -772,9 +778,14 @@ def _send(self, request, blocking=True):

log.debug('%s Request %d: %s', self, correlation_id, request)
if request.expect_response():
sent_time = time.time()
assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!'
self.in_flight_requests[correlation_id] = (future, sent_time)
with self._ifr_lock:
if self.disconnected():
log.warning("%s: Race condition: connection already closed.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are actually 2 problems:

  1. Connection is closed with "Protocol out of sync". I suspect I know the root cause but I am intentionally not fixing it now. When I deploy this version, this log message will tell me whether I'm right about it and I will know for sure. That's when I fix it.
  2. When connection is closed, it leaves in_flight_requests hanging.
    The timeline is as follows:
  • send() is called, sends a message
  • before future is saved to in_flight_requests, another thread calls close() (such as because of problem number 1)
  • close() terminates all futures in in_flight_requests, but there are none there
  • thread which called send() stores the future to disconnected connection's in_flight_requests
  • the future remains unresolved forever = worker deadlocks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will deploy this on production and check the behavior. If my theory is right, I will eventually get to see this sequence of events:

  1. "protocol out of sync error"
  2. Connection close
  3. Race condition warning
  4. Worker reestabilishing connection successfully

future.failure(Errors.Cancelled(str(self)))
else:
sent_time = time.time()
assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!'
self.in_flight_requests[correlation_id] = (future, sent_time)
else:
future.success(None)

Expand All @@ -790,7 +801,8 @@ def send_pending_requests(self):
if self.state not in (ConnectionStates.AUTHENTICATING,
ConnectionStates.CONNECTED):
return Errors.NodeNotReadyError(str(self))
data = self._protocol.send_bytes()
with self._lock:
data = self._protocol.send_bytes()
try:
# In the future we might manage an internal write buffer
# and send bytes asynchronously. For now, just block
Expand Down Expand Up @@ -896,14 +908,15 @@ def _recv(self):
return responses

def requests_timed_out(self):
if self.in_flight_requests:
get_timestamp = lambda v: v[1]
oldest_at = min(map(get_timestamp,
self.in_flight_requests.values()))
timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() >= oldest_at + timeout:
return True
return False
with self._ifr_lock:
if self.in_flight_requests:
get_timestamp = lambda v: v[1]
oldest_at = min(map(get_timestamp,
self.in_flight_requests.values()))
timeout = self.config['request_timeout_ms'] / 1000.0
if time.time() >= oldest_at + timeout:
return True
return False

def _handle_api_version_response(self, response):
error_type = Errors.for_code(response.error_code)
Expand Down