Skip to content

Commit 059089c

Browse files
committed
[issues/6508](-) key in KafkaRecordModel is made optional, added a new Kafka record without key into json files and added tests
1 parent 5ce119f commit 059089c

File tree

2 files changed

+28
-10
lines changed

2 files changed

+28
-10
lines changed

aws_lambda_powertools/utilities/data_classes/kafka_event.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,25 @@ def timestamp_type(self) -> str:
3737
return self["timestampType"]
3838

3939
@property
40-
def key(self) -> str:
41-
"""The raw (base64 encoded) Kafka record key."""
42-
return self["key"]
40+
def key(self) -> str | None:
41+
"""
42+
The raw (base64 encoded) Kafka record key.
43+
44+
This key is optional; if not provided,
45+
a round-robin algorithm will be used to determine
46+
the partition for the message.
47+
"""
48+
49+
return self.get("key")
4350

4451
@property
45-
def decoded_key(self) -> bytes:
46-
"""Decode the base64 encoded key as bytes."""
47-
return base64.b64decode(self.key)
52+
def decoded_key(self) -> bytes | None:
53+
"""
54+
Decode the base64 encoded key as bytes.
55+
56+
If the key is not provided, this will return None.
57+
"""
58+
return None if self.key is None else base64.b64decode(self.key)
4859

4960
@property
5061
def value(self) -> str:

tests/unit/data_classes/required_dependencies/test_kafka_event.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def test_kafka_msk_event():
2121
assert parsed_event.decoded_bootstrap_servers == bootstrap_servers_list
2222

2323
records = list(parsed_event.records)
24-
assert len(records) == 1
24+
assert len(records) == 2
2525
record = records[0]
2626
raw_record = raw_event["records"]["mytopic-0"][0]
2727
assert record.topic == raw_record["topic"]
@@ -37,6 +37,9 @@ def test_kafka_msk_event():
3737

3838
assert parsed_event.record == records[0]
3939

40+
record = records[1]
41+
assert record.key is None
42+
4043

4144
def test_kafka_self_managed_event():
4245
raw_event = load_event("kafkaEventSelfManaged.json")
@@ -52,7 +55,7 @@ def test_kafka_self_managed_event():
5255
assert parsed_event.decoded_bootstrap_servers == bootstrap_servers_list
5356

5457
records = list(parsed_event.records)
55-
assert len(records) == 1
58+
assert len(records) == 2
5659
record = records[0]
5760
raw_record = raw_event["records"]["mytopic-0"][0]
5861
assert record.topic == raw_record["topic"]
@@ -68,14 +71,18 @@ def test_kafka_self_managed_event():
6871

6972
assert parsed_event.record == records[0]
7073

74+
record = records[1]
75+
assert record.key is None
76+
7177

7278
def test_kafka_record_property_with_stopiteration_error():
7379
# GIVEN a kafka event with one record
7480
raw_event = load_event("kafkaEventMsk.json")
7581
parsed_event = KafkaEvent(raw_event)
7682

77-
# WHEN calling record property twice
83+
# WHEN calling record property thrice
7884
# THEN raise StopIteration
7985
with pytest.raises(StopIteration):
8086
assert parsed_event.record.topic is not None
81-
assert parsed_event.record.partition is not None
87+
assert parsed_event.record.topic is not None
88+
assert parsed_event.record.topic is not None

0 commit comments

Comments
 (0)