diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 508e35a0b..ed0c50a5d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -12,7 +12,7 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.fetch import FetchRequest +from kafka.protocol.fetch import FetchRequest, AbortedTransaction from kafka.protocol.list_offsets import ( ListOffsetsRequest, OffsetResetStrategy, UNKNOWN_OFFSET ) @@ -28,6 +28,11 @@ READ_UNCOMMITTED = 0 READ_COMMITTED = 1 +ISOLATION_LEVEL_CONFIG = { + 'read_uncommitted': READ_UNCOMMITTED, + 'read_committed': READ_COMMITTED, +} + ConsumerRecord = collections.namedtuple("ConsumerRecord", ["topic", "partition", "leader_epoch", "offset", "timestamp", "timestamp_type", "key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"]) @@ -60,6 +65,7 @@ class Fetcher(six.Iterator): 'metric_group_prefix': 'consumer', 'retry_backoff_ms': 100, 'enable_incremental_fetch_sessions': True, + 'isolation_level': 'read_uncommitted', } def __init__(self, client, subscriptions, **configs): @@ -100,12 +106,18 @@ def __init__(self, client, subscriptions, **configs): consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. Default: True + isolation_level (str): Configure KIP-98 transactional consumer by + setting to 'read_committed'. This will cause the consumer to + skip records from aborted tranactions. Default: 'read_uncommitted' """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: self.config[key] = configs[key] + if self.config['isolation_level'] not in ISOLATION_LEVEL_CONFIG: + raise Errors.KafkaConfigurationError('Unrecognized isolation_level') + self._client = client self._subscriptions = subscriptions self._completed_fetches = collections.deque() # Unparsed responses @@ -116,7 +128,7 @@ def __init__(self, client, subscriptions, **configs): self._sensors = FetchManagerMetrics(self.config['metrics'], self.config['metric_group_prefix']) else: self._sensors = None - self._isolation_level = READ_UNCOMMITTED + self._isolation_level = ISOLATION_LEVEL_CONFIG[self.config['isolation_level']] self._session_handlers = {} self._nodes_with_pending_fetch_requests = set() @@ -244,7 +256,7 @@ def _reset_offset(self, partition, timeout_ms=None): else: raise NoOffsetForPartitionError(partition) - log.debug("Resetting offset for partition %s to %s offset.", + log.debug("Resetting offset for partition %s to offset %s.", partition, strategy) offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms=timeout_ms) @@ -765,14 +777,21 @@ def _parse_fetched_data(self, completed_fetch): return None records = MemoryRecords(completed_fetch.partition_data[-1]) + aborted_transactions = None + if completed_fetch.response_version >= 11: + aborted_transactions = completed_fetch.partition_data[-3] + elif completed_fetch.response_version >= 4: + aborted_transactions = completed_fetch.partition_data[-2] log.debug("Preparing to read %s bytes of data for partition %s with offset %d", records.size_in_bytes(), tp, fetch_offset) parsed_records = self.PartitionRecords(fetch_offset, tp, records, - self.config['key_deserializer'], - self.config['value_deserializer'], - self.config['check_crcs'], - completed_fetch.metric_aggregator, - self._on_partition_records_drain) + key_deserializer=self.config['key_deserializer'], + value_deserializer=self.config['value_deserializer'], + check_crcs=self.config['check_crcs'], + isolation_level=self._isolation_level, + aborted_transactions=aborted_transactions, + metric_aggregator=completed_fetch.metric_aggregator, + on_drain=self._on_partition_records_drain) if not records.has_next() and records.size_in_bytes() > 0: if completed_fetch.response_version < 3: # Implement the pre KIP-74 behavior of throwing a RecordTooLargeException. @@ -845,13 +864,23 @@ def close(self): self._next_partition_records.drain() class PartitionRecords(object): - def __init__(self, fetch_offset, tp, records, key_deserializer, value_deserializer, check_crcs, metric_aggregator, on_drain): + def __init__(self, fetch_offset, tp, records, + key_deserializer=None, value_deserializer=None, + check_crcs=True, isolation_level=READ_UNCOMMITTED, + aborted_transactions=None, # raw data from response / list of (producer_id, first_offset) tuples + metric_aggregator=None, on_drain=lambda x: None): self.fetch_offset = fetch_offset self.topic_partition = tp self.leader_epoch = -1 self.next_fetch_offset = fetch_offset self.bytes_read = 0 self.records_read = 0 + self.isolation_level = isolation_level + self.aborted_producer_ids = set() + self.aborted_transactions = collections.deque( + sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [], + key=lambda txn: txn.first_offset) + ) self.metric_aggregator = metric_aggregator self.check_crcs = check_crcs self.record_iterator = itertools.dropwhile( @@ -900,18 +929,35 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer): "Record batch for partition %s at offset %s failed crc check" % ( self.topic_partition, batch.base_offset)) + # Try DefaultsRecordBatch / message log format v2 - # base_offset, last_offset_delta, and control batches + # base_offset, last_offset_delta, aborted transactions, and control batches if batch.magic == 2: self.leader_epoch = batch.leader_epoch + if self.isolation_level == READ_COMMITTED and batch.has_producer_id(): + # remove from the aborted transaction queue all aborted transactions which have begun + # before the current batch's last offset and add the associated producerIds to the + # aborted producer set + self._consume_aborted_transactions_up_to(batch.last_offset) + + producer_id = batch.producer_id + if self._contains_abort_marker(batch): + try: + self.aborted_producer_ids.remove(producer_id) + except KeyError: + pass + elif self._is_batch_aborted(batch): + log.debug("Skipping aborted record batch from partition %s with producer_id %s and" + " offsets %s to %s", + self.topic_partition, producer_id, batch.base_offset, batch.last_offset) + self.next_fetch_offset = batch.next_offset + batch = records.next_batch() + continue + # Control batches have a single record indicating whether a transaction - # was aborted or committed. - # When isolation_level is READ_COMMITTED (currently unsupported) - # we should also skip all messages from aborted transactions - # For now we only support READ_UNCOMMITTED and so we ignore the - # abort/commit signal. + # was aborted or committed. These are not returned to the consumer. if batch.is_control_batch: - self.next_fetch_offset = next(batch).offset + 1 + self.next_fetch_offset = batch.next_offset batch = records.next_batch() continue @@ -944,7 +990,7 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer): # unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck # fetching the same batch repeatedly). if last_batch and last_batch.magic == 2: - self.next_fetch_offset = last_batch.base_offset + last_batch.last_offset_delta + 1 + self.next_fetch_offset = last_batch.next_offset self.drain() # If unpacking raises StopIteration, it is erroneously @@ -961,6 +1007,24 @@ def _deserialize(self, f, topic, bytes_): return f.deserialize(topic, bytes_) return f(bytes_) + def _consume_aborted_transactions_up_to(self, offset): + if not self.aborted_transactions: + return + + while self.aborted_transactions and self.aborted_transactions[0].first_offset <= offset: + self.aborted_producer_ids.add(self.aborted_transactions.popleft().producer_id) + + def _is_batch_aborted(self, batch): + return batch.is_transactional and batch.producer_id in self.aborted_producer_ids + + def _contains_abort_marker(self, batch): + if not batch.is_control_batch: + return False + record = next(batch) + if not record: + return False + return record.abort + class FetchSessionHandler(object): """ diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 4a39dc135..7fff6e795 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -121,6 +121,9 @@ class KafkaConsumer(six.Iterator): consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. Default: True + isolation_level (str): Configure KIP-98 transactional consumer by + setting to 'read_committed'. This will cause the consumer to + skip records from aborted tranactions. Default: 'read_uncommitted' allow_auto_create_topics (bool): Enable/disable auto topic creation on metadata request. Only available with api_version >= (0, 11). Default: True @@ -290,6 +293,7 @@ class KafkaConsumer(six.Iterator): 'auto_commit_interval_ms': 5000, 'default_offset_commit_callback': lambda offsets, response: True, 'check_crcs': True, + 'isolation_level': 'read_uncommitted', 'allow_auto_create_topics': True, 'metadata_max_age_ms': 5 * 60 * 1000, 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 4cc21020e..0ff2ae91b 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import abc -from collections import defaultdict, OrderedDict +from collections import OrderedDict try: from collections.abc import Sequence except ImportError: diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index d193eafcf..036a37eb8 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -1,9 +1,15 @@ from __future__ import absolute_import +import collections + from kafka.protocol.api import Request, Response from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String, Bytes +AbortedTransaction = collections.namedtuple("AbortedTransaction", + ["producer_id", "first_offset"]) + + class FetchResponse_v0(Response): API_KEY = 1 API_VERSION = 0 diff --git a/kafka/record/abc.py b/kafka/record/abc.py index df7178562..c78f0da69 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -110,6 +110,16 @@ def __iter__(self): if needed. """ + @abc.abstractproperty + def base_offset(self): + """ Return base offset for batch + """ + + @abc.abstractproperty + def size_in_bytes(self): + """ Return size of batch in bytes (includes header overhead) + """ + @abc.abstractproperty def magic(self): """ Return magic value (0, 1, 2) for batch. diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 855306bbd..2158c48cb 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -104,6 +104,9 @@ class DefaultRecordBase(object): LOG_APPEND_TIME = 1 CREATE_TIME = 0 + NO_PRODUCER_ID = -1 + NO_SEQUENCE = -1 + MAX_INT = 2147483647 def _assert_has_codec(self, compression_type): if compression_type == self.CODEC_GZIP: @@ -136,6 +139,10 @@ def __init__(self, buffer): def base_offset(self): return self._header_data[0] + @property + def size_in_bytes(self): + return self._header_data[1] + self.AFTER_LEN_OFFSET + @property def leader_epoch(self): return self._header_data[2] @@ -156,6 +163,14 @@ def attributes(self): def last_offset_delta(self): return self._header_data[6] + @property + def last_offset(self): + return self.base_offset + self.last_offset_delta + + @property + def next_offset(self): + return self.last_offset + 1 + @property def compression_type(self): return self.attributes & self.CODEC_MASK @@ -180,6 +195,36 @@ def first_timestamp(self): def max_timestamp(self): return self._header_data[8] + @property + def producer_id(self): + return self._header_data[9] + + def has_producer_id(self): + return self.producer_id > self.NO_PRODUCER_ID + + @property + def producer_epoch(self): + return self._header_data[10] + + @property + def base_sequence(self): + return self._header_data[11] + + @property + def last_sequence(self): + if self.base_sequence == self.NO_SEQUENCE: + return self.NO_SEQUENCE + return self._increment_sequence(self.base_sequence, self.last_offset_delta) + + def _increment_sequence(self, base, increment): + if base > (self.MAX_INT - increment): + return increment - (self.MAX_INT - base) - 1 + return base + increment + + @property + def records_count(self): + return self._header_data[12] + def _maybe_uncompress(self): if not self._decompressed: compression_type = self.compression_type diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 920b4fcc6..c126374b8 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -129,7 +129,7 @@ def _assert_has_codec(self, compression_type): class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): - __slots__ = ("_buffer", "_magic", "_offset", "_crc", "_timestamp", + __slots__ = ("_buffer", "_magic", "_offset", "_length", "_crc", "_timestamp", "_attributes", "_decompressed") def __init__(self, buffer, magic): @@ -141,11 +141,20 @@ def __init__(self, buffer, magic): assert magic == magic_ self._offset = offset + self._length = length self._crc = crc self._timestamp = timestamp self._attributes = attrs self._decompressed = False + @property + def base_offset(self): + return self._offset + + @property + def size_in_bytes(self): + return self._length + self.LOG_OVERHEAD + @property def timestamp_type(self): """0 for CreateTime; 1 for LogAppendTime; None if unsupported. diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 184acc9e1..3fc0c55ae 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -452,7 +452,7 @@ def test__unpack_records(mocker): (None, b"c", None), ] memory_records = MemoryRecords(_build_record_batch(messages)) - part_records = Fetcher.PartitionRecords(0, tp, memory_records, None, None, False, mocker.MagicMock(), lambda x: None) + part_records = Fetcher.PartitionRecords(0, tp, memory_records) records = list(part_records.record_iterator) assert len(records) == 3 assert all(map(lambda x: isinstance(x, ConsumerRecord), records)) @@ -557,7 +557,7 @@ def test_partition_records_offset(mocker): tp = TopicPartition('foo', 0) messages = [(None, b'msg', None) for i in range(batch_start, batch_end)] memory_records = MemoryRecords(_build_record_batch(messages, offset=batch_start)) - records = Fetcher.PartitionRecords(fetch_offset, tp, memory_records, None, None, False, mocker.MagicMock(), lambda x: None) + records = Fetcher.PartitionRecords(fetch_offset, tp, memory_records) assert records assert records.next_fetch_offset == fetch_offset msgs = records.take(1) @@ -574,7 +574,7 @@ def test_partition_records_offset(mocker): def test_partition_records_empty(mocker): tp = TopicPartition('foo', 0) memory_records = MemoryRecords(_build_record_batch([])) - records = Fetcher.PartitionRecords(0, tp, memory_records, None, None, False, mocker.MagicMock(), lambda x: None) + records = Fetcher.PartitionRecords(0, tp, memory_records) msgs = records.take() assert len(msgs) == 0 assert not records @@ -587,7 +587,7 @@ def test_partition_records_no_fetch_offset(mocker): tp = TopicPartition('foo', 0) messages = [(None, b'msg', None) for i in range(batch_start, batch_end)] memory_records = MemoryRecords(_build_record_batch(messages, offset=batch_start)) - records = Fetcher.PartitionRecords(fetch_offset, tp, memory_records, None, None, False, mocker.MagicMock(), lambda x: None) + records = Fetcher.PartitionRecords(fetch_offset, tp, memory_records) msgs = records.take() assert len(msgs) == 0 assert not records @@ -611,7 +611,7 @@ def test_partition_records_compacted_offset(mocker): builder.append(key=None, value=b'msg', timestamp=None, headers=[]) builder.close() memory_records = MemoryRecords(builder.buffer()) - records = Fetcher.PartitionRecords(fetch_offset, tp, memory_records, None, None, False, mocker.MagicMock(), lambda x: None) + records = Fetcher.PartitionRecords(fetch_offset, tp, memory_records) msgs = records.take() assert len(msgs) == batch_end - fetch_offset - 1 assert msgs[0].offset == fetch_offset + 1