Closed
Description
It's seems there is a thread safety issue when retrieving producer metrics from an independent thread.
We did not encounter this behavior on consumers but we checked the library code it should be affected too.
This is the error we got when reading metrics for a producer:
File "...\lib\site-packages\kafka\producer\kafka.py", line 722, in metrics
for k, v in six.iteritems(self._metrics.metrics):
RuntimeError: dictionary changed size during iteration
Here is the current faulty code in the producer (note the direct use of self._metrics.metrics):
def metrics(self, raw=False):
[...]
metrics = {}
for k, v in six.iteritems(self._metrics.metrics):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]:
metrics[k.group][k.name] = {}
metrics[k.group][k.name] = v.value()
return metrics
One solution is to return a copy of actual metrics :
def metrics(self, raw=False):
[...]
metrics = {}
for k, v in six.iteritems(self._metrics.metrics.copy()):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]:
metrics[k.group][k.name] = {}
metrics[k.group][k.name] = v.value()
return metrics
I will create a PR for this
Metadata
Metadata
Assignees
Labels
No labels