diff --git a/kafka/record/abc.py b/kafka/record/abc.py index 83121c6f6..d5c172aaa 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -4,6 +4,7 @@ class ABCRecord(object): __metaclass__ = abc.ABCMeta + __slots__ = () @abc.abstractproperty def offset(self): @@ -45,6 +46,7 @@ def headers(self): class ABCRecordBatchBuilder(object): __metaclass__ = abc.ABCMeta + __slots__ = () @abc.abstractmethod def append(self, offset, timestamp, key, value, headers=None): @@ -87,6 +89,7 @@ class ABCRecordBatch(object): compressed) message. """ __metaclass__ = abc.ABCMeta + __slots__ = () @abc.abstractmethod def __iter__(self): @@ -97,6 +100,7 @@ def __iter__(self): class ABCRecords(object): __metaclass__ = abc.ABCMeta + __slots__ = () @abc.abstractmethod def __init__(self, buffer): diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 7f0e2b331..07368bba9 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -70,6 +70,8 @@ class DefaultRecordBase(object): + __slots__ = () + HEADER_STRUCT = struct.Struct( ">q" # BaseOffset => Int64 "i" # Length => Int32 @@ -116,6 +118,9 @@ def _assert_has_codec(self, compression_type): class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): + __slots__ = ("_buffer", "_header_data", "_pos", "_num_records", + "_next_record_index", "_decompressed") + def __init__(self, buffer): self._buffer = bytearray(buffer) self._header_data = self.HEADER_STRUCT.unpack_from(self._buffer) @@ -358,6 +363,11 @@ class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder): # 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes MAX_RECORD_OVERHEAD = 21 + __slots__ = ("_magic", "_compression_type", "_batch_size", "_is_transactional", + "_producer_id", "_producer_epoch", "_base_sequence", + "_first_timestamp", "_max_timestamp", "_last_offset", "_num_records", + "_buffer") + def __init__( self, magic, compression_type, is_transactional, producer_id, producer_epoch, base_sequence, batch_size): diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index bb6c21c2d..e2ee5490c 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -57,6 +57,8 @@ class LegacyRecordBase(object): + __slots__ = () + HEADER_STRUCT_V0 = struct.Struct( ">q" # BaseOffset => Int64 "i" # Length => Int32 @@ -127,6 +129,9 @@ def _assert_has_codec(self, compression_type): class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): + __slots__ = ("_buffer", "_magic", "_offset", "_crc", "_timestamp", + "_attributes", "_decompressed") + def __init__(self, buffer, magic): self._buffer = memoryview(buffer) self._magic = magic @@ -336,6 +341,8 @@ def __repr__(self): class LegacyRecordBatchBuilder(ABCRecordBatchBuilder, LegacyRecordBase): + __slots__ = ("_magic", "_compression_type", "_batch_size", "_buffer") + def __init__(self, magic, compression_type, batch_size): self._magic = magic self._compression_type = compression_type diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index f67c4fe3a..a6c4b51f7 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -37,6 +37,8 @@ class MemoryRecords(ABCRecords): # Minimum space requirements for Record V0 MIN_SLICE = LOG_OVERHEAD + LegacyRecordBatch.RECORD_OVERHEAD_V0 + __slots__ = ("_buffer", "_pos", "_next_slice", "_remaining_bytes") + def __init__(self, bytes_data): self._buffer = bytes_data self._pos = 0 @@ -110,6 +112,9 @@ def next_batch(self, _min_slice=MIN_SLICE, class MemoryRecordsBuilder(object): + __slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed", + "_bytes_written") + def __init__(self, magic, compression_type, batch_size): assert magic in [0, 1, 2], "Not supported magic" assert compression_type in [0, 1, 2, 3], "Not valid compression type"