diff --git a/MANIFEST.in b/MANIFEST.in index e15da190c..c65577823 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,3 @@ include README.md -include *.c *.h +include confluent_kafka/src/*.[ch] diff --git a/confluent_kafka/__init__.py b/confluent_kafka/__init__.py new file mode 100644 index 000000000..d998bec72 --- /dev/null +++ b/confluent_kafka/__init__.py @@ -0,0 +1,2 @@ +__all__ = ['cimpl','kafkatest'] +from .cimpl import * diff --git a/confluent_kafka/kafkatest/README b/confluent_kafka/kafkatest/README new file mode 100644 index 000000000..4c75a6fc7 --- /dev/null +++ b/confluent_kafka/kafkatest/README @@ -0,0 +1,8 @@ +FIXME: Instructions on how to use this. + + +Usage: + + python -m confluent_kafka.kafkatest.verifiable_consumer + + python -m confluent_kafka.kafkatest.verifiable_producer diff --git a/confluent_kafka/kafkatest/__init__.py b/confluent_kafka/kafkatest/__init__.py new file mode 100644 index 000000000..eea8ccb16 --- /dev/null +++ b/confluent_kafka/kafkatest/__init__.py @@ -0,0 +1 @@ +""" Python client implementations of the official Kafka tests/kafkatest clients. """ diff --git a/confluent_kafka/kafkatest/verifiable_client.py b/confluent_kafka/kafkatest/verifiable_client.py new file mode 100644 index 000000000..a0eb1e19a --- /dev/null +++ b/confluent_kafka/kafkatest/verifiable_client.py @@ -0,0 +1,80 @@ +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import signal, socket, os, sys, time, json, re, datetime + + +class VerifiableClient(object): + """ + Generic base class for a kafkatest verifiable client. + Implements the common kafkatest protocol and semantics. + """ + def __init__ (self, conf): + """ + """ + super(VerifiableClient, self).__init__() + self.conf = conf + self.conf['client.id'] = 'python@' + socket.gethostname() + self.run = True + signal.signal(signal.SIGTERM, self.sig_term) + self.dbg('Pid is %d' % os.getpid()) + + def sig_term (self, sig, frame): + self.dbg('SIGTERM') + self.run = False + + @staticmethod + def _timestamp (): + return time.strftime('%H:%M:%S', time.localtime()) + + def dbg (self, s): + """ Debugging printout """ + sys.stderr.write('%% %s DEBUG: %s\n' % (self._timestamp(), s)) + + def err (self, s, term=False): + """ Error printout, if term=True the process will terminate immediately. """ + sys.stderr.write('%% %s ERROR: %s\n' % (self._timestamp(), s)) + if term: + sys.stderr.write('%% FATAL ERROR ^\n') + sys.exit(1) + + def send (self, d): + """ Send dict as JSON to stdout for consumtion by kafkatest handler """ + d['_time'] = str(datetime.datetime.now()) + self.dbg('SEND: %s' % json.dumps(d)) + sys.stdout.write('%s\n' % json.dumps(d)) + sys.stdout.flush() + + + @staticmethod + def set_config (conf, args): + """ Set client config properties using args dict. """ + for n,v in args.iteritems(): + if v is None: + continue + # Things to ignore + if '.' not in n: + # App config, skip + continue + if n.startswith('topic.'): + # Set "topic.<...>" properties on default topic conf dict + conf['default.topic.config'][n[6:]] = v + elif n == 'partition.assignment.strategy': + # Convert Java class name to config value. + # "org.apache.kafka.clients.consumer.RangeAssignor" -> "range" + conf[n] = re.sub(r'org.apache.kafka.clients.consumer.(\w+)Assignor', + lambda x: x.group(1).lower(), v) + else: + conf[n] = v diff --git a/confluent_kafka/kafkatest/verifiable_consumer.py b/confluent_kafka/kafkatest/verifiable_consumer.py new file mode 100755 index 000000000..3e01367db --- /dev/null +++ b/confluent_kafka/kafkatest/verifiable_consumer.py @@ -0,0 +1,288 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse, sys +from confluent_kafka import Consumer, KafkaError, KafkaException +from verifiable_client import VerifiableClient + +class VerifiableConsumer(VerifiableClient): + """ + confluent-kafka-python backed VerifiableConsumer class for use with + Kafka's kafkatests client tests. + """ + def __init__ (self, conf): + """ + \p conf is a config dict passed to confluent_kafka.Consumer() + """ + super(VerifiableConsumer, self).__init__(conf) + self.conf['on_commit'] = self.on_commit + self.consumer = Consumer(**conf) + self.consumed_msgs = 0 + self.consumed_msgs_last_reported = 0 + self.consumed_msgs_at_last_commit = 0 + self.use_auto_commit = False + self.use_async_commit = False + self.max_msgs = -1 + self.assignment = [] + self.assignment_dict = dict() + + + def find_assignment (self, topic, partition): + """ Find and return existing assignment based on \p topic and \p partition, + or None on miss. """ + skey = '%s %d' % (topic, partition) + return self.assignment_dict.get(skey) + + + def send_records_consumed (self, immediate=False): + """ Send records_consumed, every 100 messages, on timeout, + or if immediate is set. """ + if (self.consumed_msgs <= self.consumed_msgs_last_reported + + (0 if immediate else 100)): + return + + if len(self.assignment) == 0: + return + + d = {'name': 'records_consumed', + 'count': self.consumed_msgs - self.consumed_msgs_last_reported, + 'partitions': []} + + for a in self.assignment: + if a.min_offset == -1: + # Skip partitions that havent had any messages since last time. + # This is to circumvent some minOffset checks in kafkatest. + continue + d['partitions'].append(a.to_dict()) + a.min_offset = -1 + + self.send(d) + self.consumed_msgs_last_reported = self.consumed_msgs + + + def send_assignment (self, evtype, partitions): + """ Send assignment update, \p evtype is either 'assigned' or 'revoked' """ + d = { 'name': 'partitions_' + evtype, + 'partitions': [{'topic': x.topic, 'partition': x.partition} for x in partitions]} + self.send(d) + + + def on_assign (self, consumer, partitions): + """ Rebalance on_assign callback """ + old_assignment = self.assignment + self.assignment = [AssignedPartition(p.topic, p.partition) for p in partitions] + # Move over our last seen offsets so that we can report a proper + # minOffset even after a rebalance loop. + for a in old_assignment: + b = self.find_assignment(a.topic, a.partition) + b.min_offset = a.min_offset + + self.assignment_dict = {a.skey: a for a in self.assignment} + self.send_assignment('assigned', partitions) + + def on_revoke (self, consumer, partitions): + """ Rebalance on_revoke callback """ + # Send final consumed records prior to rebalancing to make sure + # latest consumed is in par with what is going to be committed. + self.send_records_consumed(immediate=True) + self.assignment = list() + self.assignment_dict = dict() + self.send_assignment('revoked', partitions) + self.do_commit(immediate=True) + + + def on_commit (self, err, partitions): + """ Offsets Committed callback """ + if err is not None and err.code() == KafkaError._NO_OFFSET: + self.dbg('on_commit(): no offsets to commit') + return + + # Report consumed messages to make sure consumed position >= committed position + self.send_records_consumed(immediate=True) + + d = {'name': 'offsets_committed', + 'offsets': []} + + if err is not None: + d['success'] = False + d['error'] = str(err) + else: + d['success'] = True + d['error'] = '' + + for p in partitions: + pd = {'topic': p.topic, 'partition': p.partition, + 'offset': p.offset, 'error': str(p.error)} + d['offsets'].append(pd) + + self.send(d) + + + def do_commit (self, immediate=False, async=None): + """ Commit every 1000 messages or whenever there is a consume timeout + or immediate. """ + if (self.use_auto_commit or + self.consumed_msgs_at_last_commit + (0 if immediate else 1000) > + self.consumed_msgs): + return + + # Make sure we report consumption before commit, + # otherwise tests may fail because of commit > consumed + if self.consumed_msgs_at_last_commit < self.consumed_msgs: + self.send_records_consumed(immediate=True) + + if async is None: + async_mode = self.use_async_commit + else: + async_mode = async + + self.dbg('Committing %d messages (Async=%s)' % + (self.consumed_msgs - self.consumed_msgs_at_last_commit, + async_mode)) + + try: + self.consumer.commit(async=async_mode) + except KafkaException as e: + if e.args[0].code() == KafkaError._WAIT_COORD: + self.dbg('Ignoring commit failure, still waiting for coordinator') + elif e.args[0].code() == KafkaError._NO_OFFSET: + self.dbg('No offsets to commit') + else: + raise + + self.consumed_msgs_at_last_commit = self.consumed_msgs + + + def msg_consume (self, msg): + """ Handle consumed message (or error event) """ + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + # ignore EOF + pass + else: + self.err('Consume failed: %s' % msg.error(), term=True) + return + + if False: + self.dbg('Read msg from %s [%d] @ %d' % \ + (msg.topic(), msg.partition(), msg.offset())) + + if self.max_msgs >= 0 and self.consumed_msgs >= self.max_msgs: + return # ignore extra messages + + # Find assignment. + a = self.find_assignment(msg.topic(), msg.partition()) + if a is None: + self.err('Received message on unassigned partition %s [%d] @ %d' % + (msg.topic(), msg.partition(), msg.offset()), term=True) + + a.consumed_msgs += 1 + if a.min_offset == -1: + a.min_offset = msg.offset() + if a.max_offset < msg.offset(): + a.max_offset = msg.offset() + + self.consumed_msgs += 1 + + self.send_records_consumed(immediate=False) + self.do_commit(immediate=False) + + +class AssignedPartition(object): + """ Local state container for assigned partition. """ + def __init__ (self, topic, partition): + super(AssignedPartition, self).__init__() + self.topic = topic + self.partition = partition + self.skey = '%s %d' % (self.topic, self.partition) + self.consumed_msgs = 0 + self.min_offset = -1 + self.max_offset = 0 + + def to_dict (self): + """ Return a dict of this partition's state """ + return {'topic': self.topic, 'partition': self.partition, + 'minOffset': self.min_offset, 'maxOffset': self.max_offset} + + + + + + + + + + + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='Verifiable Python Consumer') + parser.add_argument('--topic', action='append', type=str, required=True) + parser.add_argument('--group-id', dest='group.id', required=True) + parser.add_argument('--broker-list', dest='bootstrap.servers', required=True) + parser.add_argument('--session-timeout', type=int, dest='session.timeout.ms', default=6000) + parser.add_argument('--enable-autocommit', action='store_true', dest='enable.auto.commit', default=False) + parser.add_argument('--max-messages', type=int, dest='max_messages', default=-1) + parser.add_argument('--assignment-strategy', dest='partition.assignment.strategy') + parser.add_argument('--reset-policy', dest='topic.auto.offset.reset', default='earliest') + parser.add_argument('--consumer.config', dest='consumer_config') + args = vars(parser.parse_args()) + + conf = {'broker.version.fallback': '0.9.0', + 'default.topic.config': dict()} + + VerifiableClient.set_config(conf, args) + + vc = VerifiableConsumer(conf) + vc.use_auto_commit = args['enable.auto.commit'] + vc.max_msgs = args['max_messages'] + + vc.dbg('Using config: %s' % conf) + + vc.dbg('Subscribing to %s' % args['topic']) + vc.consumer.subscribe(args['topic'], + on_assign=vc.on_assign, on_revoke=vc.on_revoke) + + + try: + while vc.run: + msg = vc.consumer.poll(timeout=1.0) + if msg is None: + # Timeout. + # Try reporting consumed messages + vc.send_records_consumed(immediate=True) + # Commit every poll() timeout instead of on every message. + # Also commit on every 1000 messages, whichever comes first. + vc.do_commit(immediate=True) + continue + + # Handle message (or error event) + vc.msg_consume(msg) + + except KeyboardInterrupt: + pass + + vc.dbg('Closing consumer') + vc.send_records_consumed(immediate=True) + if not vc.use_auto_commit: + vc.do_commit(immediate=True, async=False) + + vc.consumer.close() + + vc.send({'name': 'shutdown_complete'}) + + vc.dbg('All done') diff --git a/confluent_kafka/kafkatest/verifiable_producer.py b/confluent_kafka/kafkatest/verifiable_producer.py new file mode 100755 index 000000000..af188a25f --- /dev/null +++ b/confluent_kafka/kafkatest/verifiable_producer.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse, time +from confluent_kafka import Producer, KafkaError, KafkaException +from verifiable_client import VerifiableClient + +class VerifiableProducer(VerifiableClient): + """ + confluent-kafka-python backed VerifiableProducer class for use with + Kafka's kafkatests client tests. + """ + def __init__ (self, conf): + """ + \p conf is a config dict passed to confluent_kafka.Producer() + """ + super(VerifiableProducer, self).__init__(conf) + self.conf['on_delivery'] = self.dr_cb + self.conf['default.topic.config']['produce.offset.report'] = True + self.producer = Producer(**self.conf) + self.num_acked = 0 + self.num_sent = 0 + self.num_err = 0 + + def dr_cb (self, err, msg): + """ Per-message Delivery report callback. Called from poll() """ + if err: + self.num_err += 1 + self.send({'name': 'producer_send_error', + 'message': str(err), + 'topic': msg.topic(), + 'key': msg.key(), + 'value': msg.value()}) + else: + self.num_acked += 1 + self.send({'name': 'producer_send_success', + 'topic': msg.topic(), + 'partition': msg.partition(), + 'offset': msg.offset(), + 'key': msg.key(), + 'value': msg.value()}) + + pass + + + + + + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='Verifiable Python Producer') + parser.add_argument('--topic', type=str, required=True) + parser.add_argument('--throughput', type=int, default=0) + parser.add_argument('--broker-list', dest='bootstrap.servers', required=True) + parser.add_argument('--max-messages', type=int, dest='max_msgs', default=1000000) # avoid infinite + parser.add_argument('--value-prefix', dest='value_prefix', type=str, default=None) + parser.add_argument('--acks', type=int, dest='topic.request.required.acks', default=-1) + parser.add_argument('--producer.config', dest='producer_config') + args = vars(parser.parse_args()) + + conf = {'broker.version.fallback': '0.9.0', + 'default.topic.config': dict()} + + VerifiableClient.set_config(conf, args) + + vp = VerifiableProducer(conf) + + vp.max_msgs = args['max_msgs'] + throughput = args['throughput'] + topic = args['topic'] + if args['value_prefix'] is not None: + value_fmt = args['value_prefix'] + '.%d' + else: + value_fmt = '%d' + + if throughput > 0: + delay = 1.0/throughput + else: + delay = 0 + + vp.dbg('Producing %d messages at a rate of %d/s' % (vp.max_msgs, throughput)) + + try: + for i in range(0, vp.max_msgs): + if not vp.run: + break + + t_end = time.time() + delay + while vp.run: + try: + vp.producer.produce(topic, value=(value_fmt % i)) + vp.num_sent += 1 + except KafkaException as e: + self.err('produce() #%d/%d failed: %s' % \ + (i, vp.max_msgs, str(e))) + vp.num_err += 1 + except BufferError: + vp.dbg('Local produce queue full (produced %d/%d msgs), waiting for deliveries..' % \ + (i, vp.max_msgs)) + vp.producer.poll(timeout=0.5) + continue + break + + + # Delay to achieve desired throughput, + # but make sure poll is called at least once + # to serve DRs. + while True: + remaining = max(0, t_end - time.time()) + vp.producer.poll(timeout=remaining) + if remaining <= 0.00000001: + break + + except KeyboardInterrupt: + pass + + # Flush remaining messages to broker. + vp.dbg('Flushing') + try: + vp.producer.flush() + except KeyboardInterrupt: + pass + + vp.send({'name': 'shutdown_complete'}) + + vp.dbg('All done') diff --git a/Consumer.c b/confluent_kafka/src/Consumer.c similarity index 91% rename from Consumer.c rename to confluent_kafka/src/Consumer.c index 4bd58a848..80e136a9e 100644 --- a/Consumer.c +++ b/confluent_kafka/src/Consumer.c @@ -37,6 +37,10 @@ static int Consumer_clear (Consumer *self) { Py_DECREF(self->on_revoke); self->on_revoke = NULL; } + if (self->on_commit) { + Py_DECREF(self->on_commit); + self->on_commit = NULL; + } return 0; } @@ -518,7 +522,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, PyTypeObject ConsumerType = { PyVarObject_HEAD_INIT(NULL, 0) - "confluent_kafka.Consumer", /*tp_name*/ + "cimpl.Consumer", /*tp_name*/ sizeof(Consumer), /*tp_basicsize*/ 0, /*tp_itemsize*/ (destructor)Consumer_dealloc, /*tp_dealloc*/ @@ -543,6 +547,18 @@ PyTypeObject ConsumerType = { "\n" " Create new Consumer instance using provided configuration dict.\n" "\n" + " Special configuration properties:\n" + " ``on_commit``: Optional callback will be called when a commit " + "request has succeeded or failed.\n" + "\n" + "\n" + ".. py:function:: on_commit(err, partitions)\n" + "\n" + " :param Consumer consumer: Consumer instance.\n" + " :param KafkaError err: Commit error object, or None on success.\n" + " :param list(TopicPartition) partitions: List of partitions with " + "their committed offsets or per-partition errors.\n" + "\n" "\n", /*tp_doc*/ (traverseproc)Consumer_traverse, /* tp_traverse */ (inquiry)Consumer_clear, /* tp_clear */ @@ -566,8 +582,8 @@ PyTypeObject ConsumerType = { static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *c_parts, - void *opaque) { + rd_kafka_topic_partition_list_t *c_parts, + void *opaque) { Consumer *self = opaque; PyEval_RestoreThread(self->thread_state); @@ -590,6 +606,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL, "Unable to build callback args"); self->thread_state = PyEval_SaveThread(); + self->callback_crashed++; return; } @@ -622,6 +639,51 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, } +static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *c_parts, + void *opaque) { + Consumer *self = opaque; + PyObject *parts, *k_err, *args, *result; + + if (!self->on_commit) + return; + + PyEval_RestoreThread(self->thread_state); + + /* Insantiate error object */ + k_err = KafkaError_new_or_None(err, NULL); + + /* Construct list of TopicPartition based on 'c_parts' */ + parts = c_parts_to_py(c_parts); + + args = Py_BuildValue("(OO)", k_err, parts); + + Py_DECREF(k_err); + Py_DECREF(parts); + + if (!args) { + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL, + "Unable to build callback args"); + self->thread_state = PyEval_SaveThread(); + self->callback_crashed++; + return; + } + + result = PyObject_CallObject(self->on_commit, args); + + Py_DECREF(args); + + if (result) + Py_DECREF(result); + else { + self->callback_crashed++; + rd_kafka_yield(rk); + } + + self->thread_state = PyEval_SaveThread(); +} + + static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, PyObject *kwargs) { @@ -640,6 +702,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, } rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb); + rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb); self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); diff --git a/Producer.c b/confluent_kafka/src/Producer.c similarity index 96% rename from Producer.c rename to confluent_kafka/src/Producer.c index f25c07d8e..438642051 100644 --- a/Producer.c +++ b/confluent_kafka/src/Producer.c @@ -250,7 +250,7 @@ static PyObject *Producer_produce (Producer *self, PyObject *args, "key", "partition", "callback", - "delivery_callback", /* Alias */ + "on_delivery", /* Alias */ "partitioner", NULL }; @@ -365,16 +365,16 @@ static PyMethodDef Producer_methods[] = { "\n" " Produce message to topic.\n" " This is an asynchronous operation, an application may use the " - "``ondelivery`` argument to pass a function (or lambda) that " - "will be called from :py:func:`poll()` when the message has been " - "succesfully delivered or permanently fails delivery.\n" + "``callback`` (alias ``on_delivery``) argument to pass a function " + "(or lambda) that will be called from :py:func:`poll()` when the " + "message has been succesfully delivered or permanently fails delivery.\n" "\n" " :param str topic: Topic to produce message to\n" " :param str value: Message payload\n" " :param str key: Message key\n" " :param int partition: Partition to produce to, elses uses the " "configured partitioner.\n" - " :param func ondelivery(err,msg): Delivery report callback to call " + " :param func on_delivery(err,msg): Delivery report callback to call " "(from :py:func:`poll()` or :py:func:`flush()`) on succesful or " "failed delivery\n" "\n" @@ -393,7 +393,7 @@ static PyMethodDef Producer_methods[] = { "\n" " Callbacks:\n" "\n" - " - ``ondelivery`` callbacks from :py:func:`produce()`\n" + " - ``on_delivery`` callbacks from :py:func:`produce()`\n" " - ...\n" "\n" " :param float timeout: Maximum time to block waiting for events.\n" @@ -430,7 +430,7 @@ static PyObject *Producer_new (PyTypeObject *type, PyObject *args, PyTypeObject ProducerType = { PyVarObject_HEAD_INIT(NULL, 0) - "confluent_kafka.Producer", /*tp_name*/ + "cimpl.Producer", /*tp_name*/ sizeof(Producer), /*tp_basicsize*/ 0, /*tp_itemsize*/ (destructor)Producer_dealloc, /*tp_dealloc*/ diff --git a/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c similarity index 94% rename from confluent_kafka.c rename to confluent_kafka/src/confluent_kafka.c index deffd83f3..a39a0c4b4 100644 --- a/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -168,7 +168,7 @@ static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2, static PyTypeObject KafkaErrorType = { PyVarObject_HEAD_INIT(NULL, 0) - "confluent_kafka.KafkaError", /*tp_name*/ + "cimpl.KafkaError", /*tp_name*/ sizeof(KafkaError), /*tp_basicsize*/ 0, /*tp_itemsize*/ (destructor)KafkaError_dealloc, /*tp_dealloc*/ @@ -248,7 +248,7 @@ PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...) { va_end(ap); } - KafkaError_init(self, err, fmt ? buf : NULL); + KafkaError_init(self, err, fmt ? buf : rd_kafka_err2str(err)); return (PyObject *)self; } @@ -257,11 +257,10 @@ PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...) { * @brief Internal factory to create KafkaError object. * @returns a new KafkaError object if \p err != 0, else a None object. */ -static PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, - const char *str) { + PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str) { if (!err) Py_RETURN_NONE; - return KafkaError_new0(err, str); + return KafkaError_new0(err, "%s", str); } @@ -417,7 +416,7 @@ static PySequenceMethods Message_seq_methods = { PyTypeObject MessageType = { PyVarObject_HEAD_INIT(NULL, 0) - "confluent_kafka.Message", /*tp_name*/ + "cimpl.Message", /*tp_name*/ sizeof(Message), /*tp_basicsize*/ 0, /*tp_itemsize*/ (destructor)Message_dealloc, /*tp_dealloc*/ @@ -661,7 +660,7 @@ static long TopicPartition_hash (TopicPartition *self) { static PyTypeObject TopicPartitionType = { PyVarObject_HEAD_INIT(NULL, 0) - "confluent_kafka.TopicPartition", /*tp_name*/ + "cimpl.TopicPartition", /*tp_name*/ sizeof(TopicPartition), /*tp_basicsize*/ 0, /*tp_itemsize*/ (destructor)TopicPartition_dealloc, /*tp_dealloc*/ @@ -773,7 +772,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { if (!PyList_Check(plist)) { PyErr_SetString(PyExc_TypeError, - "requires list of confluent_kafka.TopicPartition"); + "requires list of TopicPartition"); return NULL; } @@ -886,7 +885,7 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf, PyObject *vs; const char *val; - if (!strcasecmp(name, "delivery_callback")) { + if (!strcasecmp(name, "on_delivery")) { if (!PyCallable_Check(valobj)) { cfl_PyErr_Format( RD_KAFKA_RESP_ERR__INVALID_ARG, @@ -966,6 +965,34 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf, } +/** + * @brief Set single special consumer config value. + * + * @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised). + */ +static int consumer_conf_set_special (Consumer *self, rd_kafka_conf_t *conf, + rd_kafka_topic_conf_t *tconf, + const char *name, PyObject *valobj) { + + if (!strcasecmp(name, "on_commit")) { + if (!PyCallable_Check(valobj)) { + cfl_PyErr_Format( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "%s requires a callable " + "object", name); + return -1; + } + + self->on_commit = valobj; + Py_INCREF(self->on_commit); + + return 1; + } + + return 0; +} + + /** * Common config setup for Kafka client handles. * @@ -1004,6 +1031,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, const char *k; const char *v; char errstr[256]; + int r; if (!(ks = cfl_PyObject_Unistr(ko))) { PyErr_SetString(PyExc_TypeError, @@ -1028,24 +1056,22 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, } /* Special handling for certain config keys. */ - if (ktype == RD_KAFKA_PRODUCER) { - int r; - + if (ktype == RD_KAFKA_PRODUCER) r = producer_conf_set_special((Producer *)self0, conf, tconf, k, vo); - if (r == -1) { - /* Error */ - Py_DECREF(ks); - rd_kafka_topic_conf_destroy(tconf); - rd_kafka_conf_destroy(conf); - return NULL; - - } else if (r == 1) { - /* Handled */ - continue; - } + else + r = consumer_conf_set_special((Consumer *)self0, + conf, tconf, k, vo); + if (r == -1) { + /* Error */ + Py_DECREF(ks); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + return NULL; - /* FALLTHRU */ + } else if (r == 1) { + /* Handled */ + continue; } @@ -1111,7 +1137,7 @@ static PyObject *version (PyObject *self, PyObject *args) { return Py_BuildValue("si", "0.9.1", 0x00090100); } -static PyMethodDef confluent_kafka_methods[] = { +static PyMethodDef cimpl_methods[] = { {"libversion", libversion, METH_NOARGS, " Retrieve librdkafka version string and integer\n" "\n" @@ -1204,17 +1230,17 @@ static char *KafkaError_add_errs (PyObject *dict, const char *origdoc) { #ifdef PY3 -static struct PyModuleDef confluent_kafka_moduledef = { +static struct PyModuleDef cimpl_moduledef = { PyModuleDef_HEAD_INIT, - "confluent_kafka", /* m_name */ - "Confluent's Apache Kafka Python client", /* m_doc */ + "cimpl", /* m_name */ + "Confluent's Apache Kafka Python client (C implementation)", /* m_doc */ -1, /* m_size */ - confluent_kafka_methods, /* m_methods */ + cimpl_methods, /* m_methods */ }; #endif -static PyObject *_init_confluent_kafka (void) { +static PyObject *_init_cimpl (void) { PyObject *m; if (PyType_Ready(&KafkaErrorType) < 0) @@ -1229,10 +1255,10 @@ static PyObject *_init_confluent_kafka (void) { return NULL; #ifdef PY3 - m = PyModule_Create(&confluent_kafka_moduledef); + m = PyModule_Create(&cimpl_moduledef); #else - m = Py_InitModule3("confluent_kafka", confluent_kafka_methods, - "Confluent's Apache Kafka Python client"); + m = Py_InitModule3("cimpl", cimpl_methods, + "Confluent's Apache Kafka Python client (C implementation)"); #endif if (!m) return NULL; @@ -1257,7 +1283,7 @@ static PyObject *_init_confluent_kafka (void) { PyModule_AddObject(m, "Consumer", (PyObject *)&ConsumerType); KafkaException = PyErr_NewExceptionWithDoc( - "confluent_kafka.KafkaException", + "cimpl.KafkaException", "Kafka exception that wraps the :py:class:`KafkaError` " "class.\n" "\n" @@ -1273,11 +1299,11 @@ static PyObject *_init_confluent_kafka (void) { #ifdef PY3 -PyMODINIT_FUNC PyInit_confluent_kafka (void) { - return _init_confluent_kafka(); +PyMODINIT_FUNC PyInit_cimpl (void) { + return _init_cimpl(); } #else -PyMODINIT_FUNC initconfluent_kafka (void) { - _init_confluent_kafka(); +PyMODINIT_FUNC initcimpl (void) { + _init_cimpl(); } #endif diff --git a/confluent_kafka.h b/confluent_kafka/src/confluent_kafka.h similarity index 97% rename from confluent_kafka.h rename to confluent_kafka/src/confluent_kafka.h index 07799e762..e8b15c90a 100644 --- a/confluent_kafka.h +++ b/confluent_kafka/src/confluent_kafka.h @@ -84,6 +84,7 @@ extern PyObject *KafkaException; PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...); +PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str); /** @@ -197,6 +198,7 @@ typedef struct { int rebalance_assigned; /* Rebalance: Callback performed assign() call.*/ PyObject *on_assign; /* Rebalance: on_assign callback */ PyObject *on_revoke; /* Rebalance: on_revoke callback */ + PyObject *on_commit; /* Commit callback */ int callback_crashed; PyThreadState *thread_state; } Consumer; diff --git a/docs/index.rst b/docs/index.rst index a9c0b681d..55871a128 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -13,6 +13,49 @@ Indices and tables :synopsis: Confluent's Apache Kafka Python client. :members: +******** +Consumer +******** + +.. autoclass:: confluent_kafka.Consumer + :members: + +******** +Producer +******** + +.. autoclass:: confluent_kafka.Producer + :members: + +******* +Message +******* + +.. autoclass:: confluent_kafka.Message + :members: + +************** +TopicPartition +************** + +.. autoclass:: confluent_kafka.TopicPartition + :members: + +********** +KafkaError +********** + +.. autoclass:: confluent_kafka.KafkaError + :members: + +************** +KafkaException +************** + +.. autoclass:: confluent_kafka.KafkaException + :members: + + Configuration ============= @@ -21,6 +64,7 @@ providing a dict of configuration properties to the instance constructor, e.g.:: conf = {'bootstrap.servers': 'mybroker.com', 'group.id': 'mygroup', 'session.timeout.ms': 6000, + 'on_commit': my_commit_callback, 'default.topic.config': {'auto.offset.reset': 'smallest'}} consumer = confluent_kafka.Consumer(**conf) @@ -34,11 +78,11 @@ The Python bindings also provide some additional configuration properties: * ``default.topic.config``: value is a dict of topic-level configuration properties that are applied to all used topics for the instance. -* ``delivery_callback`` (**Producer**): value is a Python function reference +* ``on_delivery`` (**Producer**): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). - This property may also be set per-message by passing ``callback=somefunc`` - to the confluent_kafka.Producer.produce() function. - + This property may also be set per-message by passing ``callback=callable`` + (or ``on_delivery=callable``) to the confluent_kafka.Producer.produce() function. - +* ``on_commit`` (**Consumer**): Callback used to indicate success or failure + of commit requests. diff --git a/integration_test.py b/examples/integration_test.py similarity index 97% rename from integration_test.py rename to examples/integration_test.py index 357e2c679..f2c5d43bf 100755 --- a/integration_test.py +++ b/examples/integration_test.py @@ -191,6 +191,13 @@ def verify_producer_performance(with_dr_cb=True): (t_delivery_spent - t_produce_spent)) +def print_commit_result (err, partitions): + if err is not None: + print('# Failed to commit offsets: %s: %s' % (err, partitions)) + else: + print('# Committed offsets for: %s' % partitions) + + def verify_consumer(): """ Verify basic Consumer functionality """ @@ -199,6 +206,7 @@ def verify_consumer(): 'group.id': 'test.py', 'session.timeout.ms': 6000, 'enable.auto.commit': False, + 'on_commit': print_commit_result, 'default.topic.config': { 'auto.offset.reset': 'earliest' }} diff --git a/setup.py b/setup.py index 40dc69267..798ea1ac6 100644 --- a/setup.py +++ b/setup.py @@ -1,19 +1,21 @@ #!/usr/bin/env python -from setuptools import setup +from setuptools import setup, find_packages from distutils.core import Extension -module = Extension('confluent_kafka', +module = Extension('confluent_kafka.cimpl', include_dirs = ['/usr/local/include'], libraries= ['rdkafka'], - sources=['confluent_kafka.c', 'Producer.c', 'Consumer.c']) - -setup (name='confluent-kafka', - version='0.9.1', - description='Confluent\'s Apache Kafka client for Python', - author='Confluent Inc', - author_email='support@confluent.io', - url='https://github.com/confluentinc/confluent-kafka-python', - ext_modules=[module]) + sources=['confluent_kafka/src/confluent_kafka.c', + 'confluent_kafka/src/Producer.c', + 'confluent_kafka/src/Consumer.c']) +setup(name='confluent-kafka', + version='0.9.1', + description='Confluent\'s Apache Kafka client for Python', + author='Confluent Inc', + author_email='support@confluent.io', + url='https://github.com/confluentinc/confluent-kafka-python', + ext_modules=[module], + packages=find_packages()) diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 0af99d074..8d538123e 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -12,7 +12,12 @@ def test_basic_api(): except TypeError as e: assert str(e) == "expected configuration dict" - kc = Consumer({'group.id':'test', 'socket.timeout.ms':'100'}) + def dummy_commit_cb (err, partitions): + pass + + kc = Consumer({'group.id':'test', 'socket.timeout.ms':'100', + 'session.timeout.ms': 1000, # Avoid close() blocking too long + 'on_commit': dummy_commit_cb}) kc.subscribe(["test"]) kc.unsubscribe() @@ -41,7 +46,7 @@ def dummy_assign_revoke (consumer, partitions): try: kc.commit(async=False) except KafkaException as e: - assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._WAIT_COORD) + assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._NO_OFFSET) # Get current position, should all be invalid. kc.position(partitions) diff --git a/tests/test_docs.py b/tests/test_docs.py index c44912d2a..779e421c4 100644 --- a/tests/test_docs.py +++ b/tests/test_docs.py @@ -2,7 +2,7 @@ import confluent_kafka import re - +from types import ModuleType def test_verify_docs(): """ Make sure all exported functions, classes, etc, have proper docstrings @@ -10,7 +10,7 @@ def test_verify_docs(): fails = 0 for n in dir(confluent_kafka): - if n[0:2] == '__': + if n.startswith('__'): # Skip internals continue @@ -21,7 +21,8 @@ def test_verify_docs(): fails += 1 elif not re.search(r':', d): print('Missing Doxygen tag for: %s (type %s)' % (n, type(o))) - fails += 1 + if not isinstance(o, ModuleType): + fails += 1 assert fails == 0