Skip to content

Expand metrics docs #1243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ that expose basic message attributes: topic, partition, offset, key, and value:
>>> for msg in consumer:
... assert isinstance(msg.value, dict)

>>> # Get consumer metrics
>>> metrics = consumer.metrics()

KafkaProducer
*************
Expand Down Expand Up @@ -110,6 +112,9 @@ for more details.
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)

>>> # Get producer performance metrics
>>> metrics = producer.metrics()

Thread safety
*************

Expand All @@ -122,8 +127,8 @@ multiprocessing is recommended.
Compression
***********

kafka-python supports gzip compression/decompression natively. To produce or consume lz4
compressed messages, you should install python-lz4 (pip install lz4).
kafka-python supports gzip compression/decompression natively. To produce or consume lz4
compressed messages, you should install python-lz4 (pip install lz4).
To enable snappy compression/decompression install python-snappy (also requires snappy library).
See <https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install>
for more information.
Expand All @@ -138,7 +143,6 @@ leveraged to enable a KafkaClient.check_version() method that
probes a kafka broker and attempts to identify which version it is running
(0.8.0 to 0.11).


Low-level
*********

Expand Down
15 changes: 11 additions & 4 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,13 +846,20 @@ def unsubscribe(self):
log.debug("Unsubscribed all topics or patterns and assigned partitions")

def metrics(self, raw=False):
"""Warning: this is an unstable interface.
It may change in future releases without warning"""
"""Get metrics on consumer performance.

This is ported from the Java Consumer, for details see:
https://kafka.apache.org/documentation/#new_consumer_monitoring

Warning:
This is an unstable interface. It may change in future
releases without warning.
"""
if raw:
return self._metrics.metrics

metrics = {}
for k, v in self._metrics.metrics.items():
for k, v in six.iteritems(self._metrics.metrics):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]:
Expand Down Expand Up @@ -897,7 +904,7 @@ def offsets_for_times(self, timestamps):
raise UnsupportedVersionError(
"offsets_for_times API not supported for cluster version {}"
.format(self.config['api_version']))
for tp, ts in timestamps.items():
for tp, ts in six.iteritems(timestamps):
timestamps[tp] = int(ts)
if ts < 0:
raise ValueError(
Expand Down
21 changes: 15 additions & 6 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import time
import weakref

from ..vendor import six
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the import style was inconsistent between kafka/producer/kafka.py and kafka/consumer/group.py, not sure why...?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I must have started kafka/consumer/group.py before deciding to use relative imports. I like this style better.


from .. import errors as Errors
from ..client_async import KafkaClient, selectors
from ..metrics import MetricConfig, Metrics
Expand Down Expand Up @@ -566,10 +568,10 @@ def flush(self, timeout=None):

Arguments:
timeout (float, optional): timeout in seconds to wait for completion.

Raises:
KafkaTimeoutError: failure to flush buffered records within the
provided timeout
KafkaTimeoutError: failure to flush buffered records within the
provided timeout
"""
log.debug("Flushing accumulated records in producer.") # trace
self._accumulator.begin_flush()
Expand Down Expand Up @@ -655,13 +657,20 @@ def _partition(self, topic, partition, key, value,
available)

def metrics(self, raw=False):
"""Warning: this is an unstable interface.
It may change in future releases without warning"""
"""Get metrics on producer performance.

This is ported from the Java Producer, for details see:
https://kafka.apache.org/documentation/#producer_monitoring

Warning:
This is an unstable interface. It may change in future
releases without warning.
"""
if raw:
return self._metrics.metrics

metrics = {}
for k, v in self._metrics.metrics.items():
for k, v in six.iteritems(self._metrics.metrics):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]:
Expand Down