@@ -32,13 +32,15 @@ check code (perhaps using zookeeper or consul). For older brokers, you can
32
32
achieve something similar by manually assigning different partitions to each
33
33
consumer instance with config management tools like chef, ansible, etc. This
34
34
approach will work fine, though it does not support rebalancing on failures.
35
- See < https://kafka-python.readthedocs.io/en/master/compatibility.html>
35
+ See https://kafka-python.readthedocs.io/en/master/compatibility.html
36
36
for more details.
37
37
38
38
Please note that the master branch may contain unreleased features. For release
39
39
documentation, please see readthedocs and/or python's inline help.
40
40
41
- >>> pip install kafka- python
41
+ .. code-block :: bash
42
+
43
+ $ pip install kafka-python
42
44
43
45
44
46
KafkaConsumer
@@ -48,89 +50,119 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly
48
50
as possible to the official java client. Full support for coordinated
49
51
consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+.
50
52
51
- See < https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>
53
+ See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
52
54
for API and configuration details.
53
55
54
56
The consumer iterator returns ConsumerRecords, which are simple namedtuples
55
57
that expose basic message attributes: topic, partition, offset, key, and value:
56
58
57
- >>> from kafka import KafkaConsumer
58
- >>> consumer = KafkaConsumer(' my_favorite_topic' )
59
- >>> for msg in consumer:
60
- ... print (msg)
59
+ .. code-block :: python
60
+
61
+ from kafka import KafkaConsumer
62
+ consumer = KafkaConsumer(' my_favorite_topic' )
63
+ for msg in consumer:
64
+ print (msg)
65
+
66
+ .. code-block :: python
67
+
68
+ # join a consumer group for dynamic partition assignment and offset commits
69
+ from kafka import KafkaConsumer
70
+ consumer = KafkaConsumer(' my_favorite_topic' , group_id = ' my_favorite_group' )
71
+ for msg in consumer:
72
+ print (msg)
61
73
62
- >>> # join a consumer group for dynamic partition assignment and offset commits
63
- >>> from kafka import KafkaConsumer
64
- >>> consumer = KafkaConsumer(' my_favorite_topic' , group_id = ' my_favorite_group' )
65
- >>> for msg in consumer:
66
- ... print (msg)
74
+ .. code-block :: python
67
75
68
- >>> # manually assign the partition list for the consumer
69
- >>> from kafka import TopicPartition
70
- >>> consumer = KafkaConsumer(bootstrap_servers = ' localhost:1234' )
71
- >>> consumer.assign([TopicPartition(' foobar' , 2 )])
72
- >>> msg = next (consumer)
76
+ # manually assign the partition list for the consumer
77
+ from kafka import TopicPartition
78
+ consumer = KafkaConsumer(bootstrap_servers = ' localhost:1234' )
79
+ consumer.assign([TopicPartition(' foobar' , 2 )])
80
+ msg = next (consumer)
73
81
74
- >>> # Deserialize msgpack-encoded values
75
- >>> consumer = KafkaConsumer(value_deserializer = msgpack.loads)
76
- >>> consumer.subscribe([' msgpackfoo' ])
77
- >>> for msg in consumer:
78
- ... assert isinstance (msg.value, dict )
82
+ .. code-block :: python
79
83
80
- >>> # Access record headers. The returned value is a list of tuples
81
- >>> # with str, bytes for key and value
82
- >>> for msg in consumer:
83
- ... print (msg.headers)
84
+ # Deserialize msgpack-encoded values
85
+ consumer = KafkaConsumer(value_deserializer = msgpack.loads)
86
+ consumer.subscribe([' msgpackfoo' ])
87
+ for msg in consumer:
88
+ assert isinstance (msg.value, dict )
84
89
85
- >>> # Get consumer metrics
86
- >>> metrics = consumer.metrics()
90
+ .. code-block :: python
91
+
92
+ # Access record headers. The returned value is a list of tuples
93
+ # with str, bytes for key and value
94
+ for msg in consumer:
95
+ print (msg.headers)
96
+
97
+ .. code-block :: python
98
+
99
+ # Get consumer metrics
100
+ metrics = consumer.metrics()
87
101
88
102
89
103
KafkaProducer
90
104
*************
91
105
92
106
KafkaProducer is a high-level, asynchronous message producer. The class is
93
107
intended to operate as similarly as possible to the official java client.
94
- See < https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>
108
+ See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
95
109
for more details.
96
110
97
- >>> from kafka import KafkaProducer
98
- >>> producer = KafkaProducer(bootstrap_servers = ' localhost:1234' )
99
- >>> for _ in range (100 ):
100
- ... producer.send(' foobar' , b ' some_message_bytes' )
111
+ .. code-block :: python
112
+
113
+ from kafka import KafkaProducer
114
+ producer = KafkaProducer(bootstrap_servers = ' localhost:1234' )
115
+ for _ in range (100 ):
116
+ producer.send(' foobar' , b ' some_message_bytes' )
117
+
118
+ .. code-block :: python
119
+
120
+ # Block until a single message is sent (or timeout)
121
+ future = producer.send(' foobar' , b ' another_message' )
122
+ result = future.get(timeout = 60 )
123
+
124
+ .. code-block :: python
125
+
126
+ # Block until all pending messages are at least put on the network
127
+ # NOTE : This does not guarantee delivery or success! It is really
128
+ # only useful if you configure internal batching using linger_ms
129
+ producer.flush()
130
+
131
+ .. code-block :: python
132
+
133
+ # Use a key for hashed-partitioning
134
+ producer.send(' foobar' , key = b ' foo' , value = b ' bar' )
135
+
136
+ .. code-block :: python
137
+
138
+ # Serialize json messages
139
+ import json
140
+ producer = KafkaProducer(value_serializer = lambda v : json.dumps(v).encode(' utf-8' ))
141
+ producer.send(' fizzbuzz' , {' foo' : ' bar' })
101
142
102
- >>> # Block until a single message is sent (or timeout)
103
- >>> future = producer.send(' foobar' , b ' another_message' )
104
- >>> result = future.get(timeout = 60 )
143
+ .. code-block :: python
105
144
106
- >>> # Block until all pending messages are at least put on the network
107
- >>> # NOTE : This does not guarantee delivery or success! It is really
108
- >>> # only useful if you configure internal batching using linger_ms
109
- >>> producer.flush()
145
+ # Serialize string keys
146
+ producer = KafkaProducer(key_serializer = str .encode)
147
+ producer.send(' flipflap' , key = ' ping' , value = b ' 1234' )
110
148
111
- >>> # Use a key for hashed-partitioning
112
- >>> producer.send(' foobar' , key = b ' foo' , value = b ' bar' )
149
+ .. code-block :: python
113
150
114
- >>> # Serialize json messages
115
- >>> import json
116
- >>> producer = KafkaProducer( value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
117
- >>> producer.send(' fizzbuzz ' , { ' foo ' : ' bar ' } )
151
+ # Compress messages
152
+ producer = KafkaProducer( compression_type = ' gzip ' )
153
+ for i in range ( 1000 ):
154
+ producer.send(' foobar ' , b ' msg %d ' % i )
118
155
119
- >>> # Serialize string keys
120
- >>> producer = KafkaProducer(key_serializer = str .encode)
121
- >>> producer.send(' flipflap' , key = ' ping' , value = b ' 1234' )
156
+ .. code-block :: python
122
157
123
- >>> # Compress messages
124
- >>> producer = KafkaProducer(compression_type = ' gzip' )
125
- >>> for i in range (1000 ):
126
- ... producer.send(' foobar' , b ' msg %d ' % i)
158
+ # Include record headers. The format is list of tuples with string key
159
+ # and bytes value.
160
+ producer.send(' foobar' , value = b ' c29tZSB2YWx1ZQ==' , headers = [(' content-encoding' , b ' base64' )])
127
161
128
- >>> # Include record headers. The format is list of tuples with string key
129
- >>> # and bytes value.
130
- >>> producer.send(' foobar' , value = b ' c29tZSB2YWx1ZQ==' , headers = [(' content-encoding' , b ' base64' )])
162
+ .. code-block :: python
131
163
132
- >>> # Get producer performance metrics
133
- >>> metrics = producer.metrics()
164
+ # Get producer performance metrics
165
+ metrics = producer.metrics()
134
166
135
167
136
168
Thread safety
@@ -154,7 +186,7 @@ kafka-python supports the following compression formats:
154
186
- Zstandard (zstd)
155
187
156
188
gzip is supported natively, the others require installing additional libraries.
157
- See < https://kafka-python.readthedocs.io/en/master/install.html> for more information.
189
+ See https://kafka-python.readthedocs.io/en/master/install.html for more information.
158
190
159
191
160
192
Optimized CRC32 Validation
@@ -163,7 +195,7 @@ Optimized CRC32 Validation
163
195
Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure
164
196
python implementation for compatibility. To improve performance for high-throughput
165
197
applications, kafka-python will use `crc32c ` for optimized native code if installed.
166
- See < https://kafka-python.readthedocs.io/en/master/install.html> for installation instructions.
198
+ See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions.
167
199
See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.
168
200
169
201
0 commit comments