Skip to content

kafkatest, refactoring, and bug fixes #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
include README.md
include *.c *.h
include confluent_kafka/src/*.[ch]

2 changes: 2 additions & 0 deletions confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__all__ = ['cimpl','kafkatest']
from .cimpl import *
8 changes: 8 additions & 0 deletions confluent_kafka/kafkatest/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FIXME: Instructions on how to use this.


Usage:

python -m confluent_kafka.kafkatest.verifiable_consumer <options>

python -m confluent_kafka.kafkatest.verifiable_producer <options>
1 change: 1 addition & 0 deletions confluent_kafka/kafkatest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
""" Python client implementations of the official Kafka tests/kafkatest clients. """
80 changes: 80 additions & 0 deletions confluent_kafka/kafkatest/verifiable_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2016 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import signal, socket, os, sys, time, json, re, datetime


class VerifiableClient(object):
"""
Generic base class for a kafkatest verifiable client.
Implements the common kafkatest protocol and semantics.
"""
def __init__ (self, conf):
"""
"""
super(VerifiableClient, self).__init__()
self.conf = conf
self.conf['client.id'] = 'python@' + socket.gethostname()
self.run = True
signal.signal(signal.SIGTERM, self.sig_term)
self.dbg('Pid is %d' % os.getpid())

def sig_term (self, sig, frame):
self.dbg('SIGTERM')
self.run = False

@staticmethod
def _timestamp ():
return time.strftime('%H:%M:%S', time.localtime())

def dbg (self, s):
""" Debugging printout """
sys.stderr.write('%% %s DEBUG: %s\n' % (self._timestamp(), s))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these structured like this to match the Java format?

A couple of points. First, you might prefer print and not having to manually add newline characters. You can print to specific files with print("msg", file=sys.stderr) (although I guess maybe you'll have py2/3 compatibility stuff to sort out given print changing to a function, you'll need a __future__ import).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second, should we be changing expectations in the kafkatest logging format so other clients don't have to jump through these formatting hoops?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client logs are not processed by kafkatest, it all just ends up in stderr.log, so a client is free to format its logs in any way it wishes - I think this is fine.

re sys.stderr.write(): It seemed like the most portable py2/3 solution at the time, but if future.print is preferred I'll go with that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine. tbh I haven't done enough 2/3 compatible python code that I don't remember the preferred approaches for this stuff.


def err (self, s, term=False):
""" Error printout, if term=True the process will terminate immediately. """
sys.stderr.write('%% %s ERROR: %s\n' % (self._timestamp(), s))
if term:
sys.stderr.write('%% FATAL ERROR ^\n')
sys.exit(1)

def send (self, d):
""" Send dict as JSON to stdout for consumtion by kafkatest handler """
d['_time'] = str(datetime.datetime.now())
self.dbg('SEND: %s' % json.dumps(d))
sys.stdout.write('%s\n' % json.dumps(d))
sys.stdout.flush()


@staticmethod
def set_config (conf, args):
""" Set client config properties using args dict. """
for n,v in args.iteritems():
if v is None:
continue
# Things to ignore
if '.' not in n:
# App config, skip
continue
if n.startswith('topic.'):
# Set "topic.<...>" properties on default topic conf dict
conf['default.topic.config'][n[6:]] = v
elif n == 'partition.assignment.strategy':
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem like things we should try to make a bit more generic in kafkatest and make the Java client also handle the necessary mapping. I'd like to avoid java-specific details like this leaking out into a bunch of other clients.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that'd be nice, will postpone until kafkatest alt client integration.

# Convert Java class name to config value.
# "org.apache.kafka.clients.consumer.RangeAssignor" -> "range"
conf[n] = re.sub(r'org.apache.kafka.clients.consumer.(\w+)Assignor',
lambda x: x.group(1).lower(), v)
else:
conf[n] = v
288 changes: 288 additions & 0 deletions confluent_kafka/kafkatest/verifiable_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
#!/usr/bin/env python
#
# Copyright 2016 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse, sys
from confluent_kafka import Consumer, KafkaError, KafkaException
from verifiable_client import VerifiableClient

class VerifiableConsumer(VerifiableClient):
"""
confluent-kafka-python backed VerifiableConsumer class for use with
Kafka's kafkatests client tests.
"""
def __init__ (self, conf):
"""
\p conf is a config dict passed to confluent_kafka.Consumer()
"""
super(VerifiableConsumer, self).__init__(conf)
self.conf['on_commit'] = self.on_commit
self.consumer = Consumer(**conf)
self.consumed_msgs = 0
self.consumed_msgs_last_reported = 0
self.consumed_msgs_at_last_commit = 0
self.use_auto_commit = False
self.use_async_commit = False
self.max_msgs = -1
self.assignment = []
self.assignment_dict = dict()


def find_assignment (self, topic, partition):
""" Find and return existing assignment based on \p topic and \p partition,
or None on miss. """
skey = '%s %d' % (topic, partition)
return self.assignment_dict.get(skey)


def send_records_consumed (self, immediate=False):
""" Send records_consumed, every 100 messages, on timeout,
or if immediate is set. """
if (self.consumed_msgs <= self.consumed_msgs_last_reported +
(0 if immediate else 100)):
return

if len(self.assignment) == 0:
return

d = {'name': 'records_consumed',
'count': self.consumed_msgs - self.consumed_msgs_last_reported,
'partitions': []}

for a in self.assignment:
if a.min_offset == -1:
# Skip partitions that havent had any messages since last time.
# This is to circumvent some minOffset checks in kafkatest.
continue
d['partitions'].append(a.to_dict())
a.min_offset = -1

self.send(d)
self.consumed_msgs_last_reported = self.consumed_msgs


def send_assignment (self, evtype, partitions):
""" Send assignment update, \p evtype is either 'assigned' or 'revoked' """
d = { 'name': 'partitions_' + evtype,
'partitions': [{'topic': x.topic, 'partition': x.partition} for x in partitions]}
self.send(d)


def on_assign (self, consumer, partitions):
""" Rebalance on_assign callback """
old_assignment = self.assignment
self.assignment = [AssignedPartition(p.topic, p.partition) for p in partitions]
# Move over our last seen offsets so that we can report a proper
# minOffset even after a rebalance loop.
for a in old_assignment:
b = self.find_assignment(a.topic, a.partition)
b.min_offset = a.min_offset

self.assignment_dict = {a.skey: a for a in self.assignment}
self.send_assignment('assigned', partitions)

def on_revoke (self, consumer, partitions):
""" Rebalance on_revoke callback """
# Send final consumed records prior to rebalancing to make sure
# latest consumed is in par with what is going to be committed.
self.send_records_consumed(immediate=True)
self.assignment = list()
self.assignment_dict = dict()
self.send_assignment('revoked', partitions)
self.do_commit(immediate=True)


def on_commit (self, err, partitions):
""" Offsets Committed callback """
if err is not None and err.code() == KafkaError._NO_OFFSET:
self.dbg('on_commit(): no offsets to commit')
return

# Report consumed messages to make sure consumed position >= committed position
self.send_records_consumed(immediate=True)

d = {'name': 'offsets_committed',
'offsets': []}

if err is not None:
d['success'] = False
d['error'] = str(err)
else:
d['success'] = True
d['error'] = ''

for p in partitions:
pd = {'topic': p.topic, 'partition': p.partition,
'offset': p.offset, 'error': str(p.error)}
d['offsets'].append(pd)

self.send(d)


def do_commit (self, immediate=False, async=None):
""" Commit every 1000 messages or whenever there is a consume timeout
or immediate. """
if (self.use_auto_commit or
self.consumed_msgs_at_last_commit + (0 if immediate else 1000) >
self.consumed_msgs):
return

# Make sure we report consumption before commit,
# otherwise tests may fail because of commit > consumed
if self.consumed_msgs_at_last_commit < self.consumed_msgs:
self.send_records_consumed(immediate=True)

if async is None:
async_mode = self.use_async_commit
else:
async_mode = async

self.dbg('Committing %d messages (Async=%s)' %
(self.consumed_msgs - self.consumed_msgs_at_last_commit,
async_mode))

try:
self.consumer.commit(async=async_mode)
except KafkaException as e:
if e.args[0].code() == KafkaError._WAIT_COORD:
self.dbg('Ignoring commit failure, still waiting for coordinator')
elif e.args[0].code() == KafkaError._NO_OFFSET:
self.dbg('No offsets to commit')
else:
raise

self.consumed_msgs_at_last_commit = self.consumed_msgs


def msg_consume (self, msg):
""" Handle consumed message (or error event) """
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# ignore EOF
pass
else:
self.err('Consume failed: %s' % msg.error(), term=True)
return

if False:
self.dbg('Read msg from %s [%d] @ %d' % \
(msg.topic(), msg.partition(), msg.offset()))

if self.max_msgs >= 0 and self.consumed_msgs >= self.max_msgs:
return # ignore extra messages

# Find assignment.
a = self.find_assignment(msg.topic(), msg.partition())
if a is None:
self.err('Received message on unassigned partition %s [%d] @ %d' %
(msg.topic(), msg.partition(), msg.offset()), term=True)

a.consumed_msgs += 1
if a.min_offset == -1:
a.min_offset = msg.offset()
if a.max_offset < msg.offset():
a.max_offset = msg.offset()

self.consumed_msgs += 1

self.send_records_consumed(immediate=False)
self.do_commit(immediate=False)


class AssignedPartition(object):
""" Local state container for assigned partition. """
def __init__ (self, topic, partition):
super(AssignedPartition, self).__init__()
self.topic = topic
self.partition = partition
self.skey = '%s %d' % (self.topic, self.partition)
self.consumed_msgs = 0
self.min_offset = -1
self.max_offset = 0

def to_dict (self):
""" Return a dict of this partition's state """
return {'topic': self.topic, 'partition': self.partition,
'minOffset': self.min_offset, 'maxOffset': self.max_offset}











if __name__ == '__main__':

parser = argparse.ArgumentParser(description='Verifiable Python Consumer')
parser.add_argument('--topic', action='append', type=str, required=True)
parser.add_argument('--group-id', dest='group.id', required=True)
parser.add_argument('--broker-list', dest='bootstrap.servers', required=True)
parser.add_argument('--session-timeout', type=int, dest='session.timeout.ms', default=6000)
parser.add_argument('--enable-autocommit', action='store_true', dest='enable.auto.commit', default=False)
parser.add_argument('--max-messages', type=int, dest='max_messages', default=-1)
parser.add_argument('--assignment-strategy', dest='partition.assignment.strategy')
parser.add_argument('--reset-policy', dest='topic.auto.offset.reset', default='earliest')
parser.add_argument('--consumer.config', dest='consumer_config')
args = vars(parser.parse_args())

conf = {'broker.version.fallback': '0.9.0',
'default.topic.config': dict()}

VerifiableClient.set_config(conf, args)

vc = VerifiableConsumer(conf)
vc.use_auto_commit = args['enable.auto.commit']
vc.max_msgs = args['max_messages']

vc.dbg('Using config: %s' % conf)

vc.dbg('Subscribing to %s' % args['topic'])
vc.consumer.subscribe(args['topic'],
on_assign=vc.on_assign, on_revoke=vc.on_revoke)


try:
while vc.run:
msg = vc.consumer.poll(timeout=1.0)
if msg is None:
# Timeout.
# Try reporting consumed messages
vc.send_records_consumed(immediate=True)
# Commit every poll() timeout instead of on every message.
# Also commit on every 1000 messages, whichever comes first.
vc.do_commit(immediate=True)
continue

# Handle message (or error event)
vc.msg_consume(msg)

except KeyboardInterrupt:
pass

vc.dbg('Closing consumer')
vc.send_records_consumed(immediate=True)
if not vc.use_auto_commit:
vc.do_commit(immediate=True, async=False)

vc.consumer.close()

vc.send({'name': 'shutdown_complete'})

vc.dbg('All done')
Loading