Skip to content

Fix timestamp not passed to RecordMetadata #1273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion kafka/producer/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ def _produce_success(self, offset_and_timestamp):
(relative_offset, timestamp_ms, checksum,
serialized_key_size, serialized_value_size) = self.args

if produce_timestamp_ms is not None:
# None is when Broker does not support the API (<0.10) and
# -1 is when the broker is configured for CREATE_TIME timestamps
if produce_timestamp_ms is not None and produce_timestamp_ms != -1:
timestamp_ms = produce_timestamp_ms
if offset != -1 and relative_offset is not None:
offset += relative_offset
Expand Down
11 changes: 5 additions & 6 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,14 @@ def record_count(self):
return self.records.next_offset()

def try_append(self, timestamp_ms, key, value):
offset = self.records.next_offset()
checksum, record_size = self.records.append(timestamp_ms, key, value)
if record_size == 0:
metadata = self.records.append(timestamp_ms, key, value)
if metadata is None:
return None

self.max_record_size = max(self.max_record_size, record_size)
self.max_record_size = max(self.max_record_size, metadata.size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, offset,
timestamp_ms, checksum,
future = FutureRecordMetadata(self.produce_future, metadata.offset,
metadata.timestamp, metadata.crc,
len(key) if key is not None else -1,
len(value) if value is not None else -1)
return future
Expand Down
48 changes: 44 additions & 4 deletions kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class LegacyRecordBase(object):
LOG_APPEND_TIME = 1
CREATE_TIME = 0

NO_TIMESTAMP = -1


class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):

Expand Down Expand Up @@ -333,10 +335,14 @@ def append(self, offset, timestamp, key, value):
# Check types
if type(offset) != int:
raise TypeError(offset)
if timestamp is None:
if self._magic == 0:
timestamp = self.NO_TIMESTAMP
elif timestamp is None:
timestamp = int(time.time() * 1000)
elif type(timestamp) != int:
raise TypeError(timestamp)
raise TypeError(
"`timestamp` should be int, but {} provided".format(
type(timestamp)))
if not (key is None or
isinstance(key, (bytes, bytearray, memoryview))):
raise TypeError(
Expand All @@ -351,15 +357,15 @@ def append(self, offset, timestamp, key, value):
size = self.size_in_bytes(offset, timestamp, key, value)
# We always allow at least one record to be appended
if offset != 0 and pos + size >= self._batch_size:
return None, 0
return None

# Allocate proper buffer length
self._buffer.extend(bytearray(size))

# Encode message
crc = self._encode_msg(pos, offset, timestamp, key, value)

return crc, size
return LegacyRecordMetadata(offset, crc, size, timestamp)

def _encode_msg(self, start_pos, offset, timestamp, key, value,
attributes=0):
Expand Down Expand Up @@ -484,3 +490,37 @@ def estimate_size_in_bytes(cls, magic, compression_type, key, value):
cls.record_size(magic, key, value)
)
return cls.LOG_OVERHEAD + cls.record_size(magic, key, value)


class LegacyRecordMetadata(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I am clearly missing something... What is the purpose of this class? It doesn't seem to do anything other than enforcing read-only on the attributes. Is there something else it does that I'm not realizing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's more or less as you said. It's just a class to store results of a function. I find this a bit more clean than namedtuple when it comes to optimization. This is a hot part.

It's a more convenient way of returning several values from a function. Returning a tuple will require you to parse it every time, but returning either None or an instance will only require a None check. See https://github.com/dpkp/kafka-python/pull/1273/files#diff-261f56705197d1ab9e6fca9249856556 and https://github.com/dpkp/kafka-python/pull/1273/files#diff-d98d7e54a13e15511c44d90e78f105e9R368.


__slots__ = ("_crc", "_size", "_timestamp", "_offset")

def __init__(self, offset, crc, size, timestamp):
self._offset = offset
self._crc = crc
self._size = size
self._timestamp = timestamp

@property
def offset(self):
return self._offset

@property
def crc(self):
return self._crc

@property
def size(self):
return self._size

@property
def timestamp(self):
return self._timestamp

def __repr__(self):
return (
"LegacyRecordMetadata(offset={!r}, crc={!r}, size={!r},"
" timestamp={!r})".format(
self._offset, self._crc, self._size, self._timestamp)
)
9 changes: 4 additions & 5 deletions kafka/record/memory_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,13 @@ def append(self, timestamp, key, value):
return None, 0

offset = self._next_offset
checksum, actual_size = self._builder.append(
offset, timestamp, key, value)
metadata = self._builder.append(offset, timestamp, key, value)
# Return of 0 size means there's no space to add a new message
if actual_size == 0:
return None, 0
if metadata is None:
return None

self._next_offset += 1
return checksum, actual_size
return metadata

def close(self):
# This method may be called multiple times on the same batch
Expand Down
93 changes: 87 additions & 6 deletions test/record/test_legacy_records.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import unicode_literals
import pytest
from kafka.record.legacy_records import (
LegacyRecordBatch, LegacyRecordBatchBuilder
)
from kafka.protocol.message import Message


@pytest.mark.parametrize("magic", [0, 1])
Expand All @@ -27,9 +27,9 @@ def test_read_write_serde_v0_v1_no_compression(magic):


@pytest.mark.parametrize("compression_type", [
Message.CODEC_GZIP,
Message.CODEC_SNAPPY,
Message.CODEC_LZ4
LegacyRecordBatch.CODEC_GZIP,
LegacyRecordBatch.CODEC_SNAPPY,
LegacyRecordBatch.CODEC_LZ4
])
@pytest.mark.parametrize("magic", [0, 1])
def test_read_write_serde_v0_v1_with_compression(compression_type, magic):
Expand All @@ -43,14 +43,14 @@ def test_read_write_serde_v0_v1_with_compression(compression_type, magic):
batch = LegacyRecordBatch(bytes(buffer), magic)
msgs = list(batch)

expected_checksum = (-2095076219 if magic else 278251978) & 0xffffffff
for offset, msg in enumerate(msgs):
assert msg.offset == offset
assert msg.timestamp == (9999999 if magic else None)
assert msg.timestamp_type == (0 if magic else None)
assert msg.key == b"test"
assert msg.value == b"Super"
assert msg.checksum == expected_checksum
assert msg.checksum == (-2095076219 if magic else 278251978) & \
0xffffffff


@pytest.mark.parametrize("magic", [0, 1])
Expand Down Expand Up @@ -83,3 +83,84 @@ def test_estimate_size_in_bytes_bigger_than_batch(magic):
buf = builder.build()
assert len(buf) <= estimate_size, \
"Estimate should always be upper bound"


@pytest.mark.parametrize("magic", [0, 1])
def test_legacy_batch_builder_validates_arguments(magic):
builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=0, batch_size=1024 * 1024)

# Key should not be str
with pytest.raises(TypeError):
builder.append(
0, timestamp=9999999, key="some string", value=None)

# Value should not be str
with pytest.raises(TypeError):
builder.append(
0, timestamp=9999999, key=None, value="some string")

# Timestamp should be of proper type
if magic != 0:
with pytest.raises(TypeError):
builder.append(
0, timestamp="1243812793", key=None, value=b"some string")

# Offset of invalid type
with pytest.raises(TypeError):
builder.append(
"0", timestamp=9999999, key=None, value=b"some string")

# Ok to pass value as None
builder.append(
0, timestamp=9999999, key=b"123", value=None)

# Timestamp can be None
builder.append(
1, timestamp=None, key=None, value=b"some string")

# Ok to pass offsets in not incremental order. This should not happen thou
builder.append(
5, timestamp=9999999, key=b"123", value=None)

# in case error handling code fails to fix inner buffer in builder
assert len(builder.build()) == 119 if magic else 95


@pytest.mark.parametrize("magic", [0, 1])
def test_legacy_correct_metadata_response(magic):
builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=0, batch_size=1024 * 1024)
meta = builder.append(
0, timestamp=9999999, key=b"test", value=b"Super")

assert meta.offset == 0
assert meta.timestamp == (9999999 if magic else -1)
assert meta.crc == (-2095076219 if magic else 278251978) & 0xffffffff
assert repr(meta) == (
"LegacyRecordMetadata(offset=0, crc={}, size={}, "
"timestamp={})".format(meta.crc, meta.size, meta.timestamp)
)


@pytest.mark.parametrize("magic", [0, 1])
def test_legacy_batch_size_limit(magic):
# First message can be added even if it's too big
builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=0, batch_size=1024)
meta = builder.append(0, timestamp=None, key=None, value=b"M" * 2000)
assert meta.size > 0
assert meta.crc is not None
assert meta.offset == 0
assert meta.timestamp is not None
assert len(builder.build()) > 2000

builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=0, batch_size=1024)
meta = builder.append(0, timestamp=None, key=None, value=b"M" * 700)
assert meta is not None
meta = builder.append(1, timestamp=None, key=None, value=b"M" * 700)
assert meta is None
meta = builder.append(2, timestamp=None, key=None, value=b"M" * 700)
assert meta is None
assert len(builder.build()) < 1000
54 changes: 52 additions & 2 deletions test/test_producer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import gc
import platform
import sys
import time
import threading

import pytest

from kafka import KafkaConsumer, KafkaProducer
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from kafka.producer.buffer import SimpleBufferPool
from test.conftest import version
from test.testutil import random_string
Expand Down Expand Up @@ -78,3 +78,53 @@ def test_kafka_producer_gc_cleanup():
del(producer)
gc.collect()
assert threading.active_count() == threads


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
max_block_ms=10000,
compression_type=compression)
if producer.config['api_version'] >= (0, 10):
magic = 1
else:
magic = 0

topic = random_string(5)
future = producer.send(
topic,
value=b"Simple value", key=b"Simple key", timestamp_ms=9999999,
partition=0)
record = future.get(timeout=5)
assert record is not None
assert record.topic == topic
assert record.partition == 0
assert record.topic_partition == TopicPartition(topic, 0)
assert record.offset == 0
if magic >= 1:
assert record.timestamp == 9999999
else:
assert record.timestamp == -1 # NO_TIMESTAMP

if magic == 1:
assert record.checksum == 1370034956
else:
assert record.checksum == 3296137851

assert record.serialized_key_size == 10
assert record.serialized_value_size == 12

# generated timestamp case is skipped for broker 0.9 and below
if magic == 0:
return

send_time = time.time() * 1000
future = producer.send(
topic,
value=b"Simple value", key=b"Simple key", timestamp_ms=None,
partition=0)
record = future.get(timeout=5)
assert abs(record.timestamp - send_time) <= 1000 # Allow 1s deviation