Skip to content

Commit 87e5d16

Browse files
jeffwidmandpkp
authored andcommitted
Expand metrics docs (#1243)
* Expand metrics docstrings * Document metrics interface in readme * Use six.iteritems(d) rather than d.items() * Use Sphinx warning syntax
1 parent 8f21180 commit 87e5d16

File tree

3 files changed

+33
-13
lines changed

3 files changed

+33
-13
lines changed

README.rst

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ that expose basic message attributes: topic, partition, offset, key, and value:
7070
>>> for msg in consumer:
7171
... assert isinstance(msg.value, dict)
7272

73+
>>> # Get consumer metrics
74+
>>> metrics = consumer.metrics()
7375

7476
KafkaProducer
7577
*************
@@ -110,6 +112,9 @@ for more details.
110112
>>> for i in range(1000):
111113
... producer.send('foobar', b'msg %d' % i)
112114

115+
>>> # Get producer performance metrics
116+
>>> metrics = producer.metrics()
117+
113118
Thread safety
114119
*************
115120

@@ -122,8 +127,8 @@ multiprocessing is recommended.
122127
Compression
123128
***********
124129

125-
kafka-python supports gzip compression/decompression natively. To produce or consume lz4
126-
compressed messages, you should install python-lz4 (pip install lz4).
130+
kafka-python supports gzip compression/decompression natively. To produce or consume lz4
131+
compressed messages, you should install python-lz4 (pip install lz4).
127132
To enable snappy compression/decompression install python-snappy (also requires snappy library).
128133
See <https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install>
129134
for more information.
@@ -138,7 +143,6 @@ leveraged to enable a KafkaClient.check_version() method that
138143
probes a kafka broker and attempts to identify which version it is running
139144
(0.8.0 to 0.11).
140145

141-
142146
Low-level
143147
*********
144148

kafka/consumer/group.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -846,13 +846,20 @@ def unsubscribe(self):
846846
log.debug("Unsubscribed all topics or patterns and assigned partitions")
847847

848848
def metrics(self, raw=False):
849-
"""Warning: this is an unstable interface.
850-
It may change in future releases without warning"""
849+
"""Get metrics on consumer performance.
850+
851+
This is ported from the Java Consumer, for details see:
852+
https://kafka.apache.org/documentation/#new_consumer_monitoring
853+
854+
Warning:
855+
This is an unstable interface. It may change in future
856+
releases without warning.
857+
"""
851858
if raw:
852859
return self._metrics.metrics
853860

854861
metrics = {}
855-
for k, v in self._metrics.metrics.items():
862+
for k, v in six.iteritems(self._metrics.metrics):
856863
if k.group not in metrics:
857864
metrics[k.group] = {}
858865
if k.name not in metrics[k.group]:
@@ -897,7 +904,7 @@ def offsets_for_times(self, timestamps):
897904
raise UnsupportedVersionError(
898905
"offsets_for_times API not supported for cluster version {}"
899906
.format(self.config['api_version']))
900-
for tp, ts in timestamps.items():
907+
for tp, ts in six.iteritems(timestamps):
901908
timestamps[tp] = int(ts)
902909
if ts < 0:
903910
raise ValueError(

kafka/producer/kafka.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import time
99
import weakref
1010

11+
from ..vendor import six
12+
1113
from .. import errors as Errors
1214
from ..client_async import KafkaClient, selectors
1315
from ..metrics import MetricConfig, Metrics
@@ -566,10 +568,10 @@ def flush(self, timeout=None):
566568
567569
Arguments:
568570
timeout (float, optional): timeout in seconds to wait for completion.
569-
571+
570572
Raises:
571-
KafkaTimeoutError: failure to flush buffered records within the
572-
provided timeout
573+
KafkaTimeoutError: failure to flush buffered records within the
574+
provided timeout
573575
"""
574576
log.debug("Flushing accumulated records in producer.") # trace
575577
self._accumulator.begin_flush()
@@ -655,13 +657,20 @@ def _partition(self, topic, partition, key, value,
655657
available)
656658

657659
def metrics(self, raw=False):
658-
"""Warning: this is an unstable interface.
659-
It may change in future releases without warning"""
660+
"""Get metrics on producer performance.
661+
662+
This is ported from the Java Producer, for details see:
663+
https://kafka.apache.org/documentation/#producer_monitoring
664+
665+
Warning:
666+
This is an unstable interface. It may change in future
667+
releases without warning.
668+
"""
660669
if raw:
661670
return self._metrics.metrics
662671

663672
metrics = {}
664-
for k, v in self._metrics.metrics.items():
673+
for k, v in six.iteritems(self._metrics.metrics):
665674
if k.group not in metrics:
666675
metrics[k.group] = {}
667676
if k.name not in metrics[k.group]:

0 commit comments

Comments
 (0)