Skip to content

Commit 034b79a

Browse files
docs(kafka): refactor kafka documentation (#6854)
* Refactoring documentation * Making Sonar Happy
1 parent 80f2958 commit 034b79a

23 files changed

+706
-549
lines changed

docs/utilities/kafka.md

Lines changed: 101 additions & 536 deletions
Large diffs are not rendered by default.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"eventSource":"aws:kafka",
3+
"eventSourceArn":"arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
4+
"bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
5+
"records":{
6+
"python-with-avro-doc-3":[
7+
{
8+
"topic":"python-with-avro-doc",
9+
"partition":3,
10+
"offset":0,
11+
"timestamp":1750547105187,
12+
"timestampType":"CREATE_TIME",
13+
"key":"MTIz",
14+
"value":"AwBXT2qalUhN6oaj2CwEeaEWFFBvd2VydG9vbHMK",
15+
"headers":[
16+
17+
]
18+
}
19+
]
20+
}
21+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"eventSource":"aws:kafka",
3+
"eventSourceArn":"arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
4+
"bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
5+
"records":{
6+
"python-with-avro-doc-5":[
7+
{
8+
"topic":"python-with-avro-doc",
9+
"partition":5,
10+
"offset":0,
11+
"timestamp":1750547462087,
12+
"timestampType":"CREATE_TIME",
13+
"key":"MTIz",
14+
"value":"eyJuYW1lIjogIlBvd2VydG9vbHMiLCAiYWdlIjogNX0=",
15+
"headers":[
16+
17+
]
18+
}
19+
]
20+
}
21+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"eventSource":"aws:kafka",
3+
"eventSourceArn":"arn:aws:kafka:eu-west-3:992382490249:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
4+
"bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
5+
"records":{
6+
"python-with-avro-doc-5":[
7+
{
8+
"topic":"python-with-avro-doc",
9+
"partition":5,
10+
"offset":1,
11+
"timestamp":1750624373324,
12+
"timestampType":"CREATE_TIME",
13+
"key":"MTIz",
14+
"value":"Cgpwb3dlcnRvb2xzEAU=",
15+
"headers":[
16+
17+
]
18+
}
19+
]
20+
}
21+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Resources:
4+
KafkaConsumerFunction:
5+
Type: AWS::Serverless::Function
6+
Properties:
7+
Handler: app.lambda_handler
8+
Runtime: python3.13
9+
Timeout: 30
10+
Events:
11+
MSKEvent:
12+
Type: MSK
13+
Properties:
14+
StartingPosition: LATEST
15+
Stream: "arn:aws:lambda:eu-west-3:123456789012:event-source-mapping:11a2c814-dda3-4df8-b46f-4eeafac869ac"
16+
Topics:
17+
- my-topic-1
18+
BatchSize: 100
19+
MaximumBatchingWindowInSeconds: 5
20+
Policies:
21+
- AWSLambdaMSKExecutionRole
22+
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Resources:
4+
KafkaConsumerFunction:
5+
Type: AWS::Serverless::Function
6+
Properties:
7+
Handler: app.lambda_handler
8+
Runtime: python3.13
9+
Timeout: 30
10+
Events:
11+
MSKEvent:
12+
Type: MSK
13+
Properties:
14+
StartingPosition: LATEST
15+
Stream: "arn:aws:lambda:eu-west-3:123456789012:event-source-mapping:11a2c814-dda3-4df8-b46f-4eeafac869ac"
16+
Topics:
17+
- my-topic-1
18+
Policies:
19+
- AWSLambdaMSKExecutionRole
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"type": "record",
3+
"name": "User",
4+
"namespace": "com.example",
5+
"fields": [
6+
{"name": "name", "type": "string"},
7+
{"name": "age", "type": "int"}
8+
]
9+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"name": "...",
3+
"age": "..."
4+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
syntax = "proto3";
2+
3+
package com.example;
4+
5+
message User {
6+
string name = 1;
7+
int32 age = 2;
8+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
logger = Logger()
6+
7+
# Define Avro schema
8+
avro_schema = """
9+
{
10+
"type": "record",
11+
"name": "User",
12+
"namespace": "com.example",
13+
"fields": [
14+
{"name": "name", "type": "string"},
15+
{"name": "age", "type": "int"}
16+
]
17+
}
18+
"""
19+
20+
schema_config = SchemaConfig(
21+
value_schema_type="AVRO",
22+
value_schema=avro_schema,
23+
)
24+
25+
26+
@kafka_consumer(schema_config=schema_config)
27+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
28+
for record in event.records:
29+
# Log record coordinates for tracing
30+
logger.info(f"Processing message from topic '{record.topic}'")
31+
logger.info(f"Partition: {record.partition}, Offset: {record.offset}")
32+
logger.info(f"Produced at: {record.timestamp}")
33+
34+
# Process message headers
35+
logger.info(f"Headers: {record.headers}")
36+
37+
# Access the Avro deserialized message content
38+
value = record.value
39+
logger.info(f"Deserialized value: {value['name']}")
40+
41+
# For debugging, you can access the original raw data
42+
logger.info(f"Raw message: {record.original_value}")
43+
44+
return {"statusCode": 200}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
2+
from aws_lambda_powertools.utilities.typing import LambdaContext
3+
4+
schema_config = SchemaConfig(value_schema_type="JSON")
5+
6+
7+
@kafka_consumer(schema_config=schema_config)
8+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
9+
return {"statusCode": 200, "processed": len(list(event.records))}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
logger = Logger()
6+
7+
8+
# Define custom serializer
9+
def custom_serializer(data: dict):
10+
del data["age"] # Remove age key just for example
11+
return data
12+
13+
14+
# Configure with Avro schema and function serializer
15+
schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=custom_serializer)
16+
17+
18+
@kafka_consumer(schema_config=schema_config)
19+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
20+
for record in event.records:
21+
# record.value now only contains the key "name"
22+
value = record.value
23+
24+
logger.info(f"Name: '{value['name']}'")
25+
26+
return {"statusCode": 200}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from dataclasses import dataclass
2+
3+
from aws_lambda_powertools import Logger
4+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
5+
from aws_lambda_powertools.utilities.typing import LambdaContext
6+
7+
logger = Logger()
8+
9+
10+
# Define Dataclass model
11+
@dataclass
12+
class User:
13+
name: str
14+
age: int
15+
16+
17+
# Configure with Avro schema and Dataclass output
18+
schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=User)
19+
20+
21+
@kafka_consumer(schema_config=schema_config)
22+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
23+
for record in event.records:
24+
# record.value is now a User instance
25+
value: User = record.value
26+
27+
logger.info(f"Name: '{value.name}'")
28+
logger.info(f"Age: '{value.age}'")
29+
30+
return {"statusCode": 200}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from pydantic import BaseModel
2+
3+
from aws_lambda_powertools import Logger
4+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
5+
from aws_lambda_powertools.utilities.typing import LambdaContext
6+
7+
logger = Logger()
8+
9+
10+
# Define Pydantic model for strong validation
11+
class User(BaseModel):
12+
name: str
13+
age: int
14+
15+
16+
# Configure with Avro schema and Pydantic output
17+
schema_config = SchemaConfig(value_schema_type="JSON", value_output_serializer=User)
18+
19+
20+
@kafka_consumer(schema_config=schema_config)
21+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
22+
for record in event.records:
23+
# record.value is now a User instance
24+
value: User = record.value
25+
26+
logger.info(f"Name: '{value.name}'")
27+
logger.info(f"Age: '{value.age}'")
28+
29+
return {"statusCode": 200}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import base64
2+
import json
3+
4+
from lambda_handler_test import lambda_handler
5+
6+
7+
def test_process_json_message():
8+
"""Test processing a simple JSON message"""
9+
# Create a test Kafka event with JSON data
10+
test_event = {
11+
"eventSource": "aws:kafka",
12+
"records": {
13+
"orders-topic": [
14+
{
15+
"topic": "orders-topic",
16+
"partition": 0,
17+
"offset": 15,
18+
"timestamp": 1545084650987,
19+
"timestampType": "CREATE_TIME",
20+
"key": None,
21+
"value": base64.b64encode(json.dumps({"order_id": "12345", "amount": 99.95}).encode()).decode(),
22+
},
23+
],
24+
},
25+
}
26+
27+
# Invoke the Lambda handler
28+
response = lambda_handler(test_event, {})
29+
30+
# Verify the response
31+
assert response["statusCode"] == 200
32+
assert response.get("processed") == 1
33+
34+
35+
def test_process_multiple_records():
36+
"""Test processing multiple records in a batch"""
37+
# Create a test event with multiple records
38+
test_event = {
39+
"eventSource": "aws:kafka",
40+
"records": {
41+
"customers-topic": [
42+
{
43+
"topic": "customers-topic",
44+
"partition": 0,
45+
"offset": 10,
46+
"value": base64.b64encode(json.dumps({"customer_id": "A1", "name": "Alice"}).encode()).decode(),
47+
},
48+
{
49+
"topic": "customers-topic",
50+
"partition": 0,
51+
"offset": 11,
52+
"value": base64.b64encode(json.dumps({"customer_id": "B2", "name": "Bob"}).encode()).decode(),
53+
},
54+
],
55+
},
56+
}
57+
58+
# Invoke the Lambda handler
59+
response = lambda_handler(test_event, {})
60+
61+
# Verify the response
62+
assert response["statusCode"] == 200
63+
assert response.get("processed") == 2
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from datetime import datetime
2+
3+
from aws_lambda_powertools import Logger
4+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
5+
from aws_lambda_powertools.utilities.typing import LambdaContext
6+
7+
logger = Logger()
8+
9+
# Define schema that matches Java producer
10+
avro_schema = """
11+
{
12+
"namespace": "com.example.orders",
13+
"type": "record",
14+
"name": "OrderEvent",
15+
"fields": [
16+
{"name": "orderId", "type": "string"},
17+
{"name": "customerId", "type": "string"},
18+
{"name": "totalAmount", "type": "double"},
19+
{"name": "orderDate", "type": "long", "logicalType": "timestamp-millis"}
20+
]
21+
}
22+
"""
23+
24+
25+
# Configure schema with field name normalization for Python style
26+
def normalize_field_name(data: dict):
27+
data["order_id"] = data["orderId"]
28+
data["customer_id"] = data["customerId"]
29+
data["total_amount"] = data["totalAmount"]
30+
data["order_date"] = datetime.fromtimestamp(data["orderDate"] / 1000)
31+
return data
32+
33+
34+
schema_config = SchemaConfig(
35+
value_schema_type="AVRO",
36+
value_schema=avro_schema,
37+
value_output_serializer=normalize_field_name,
38+
)
39+
40+
41+
@kafka_consumer(schema_config=schema_config)
42+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
43+
for record in event.records:
44+
order = record.value # OrderProcessor instance
45+
logger.info(f"Processing order {order['order_id']}")

0 commit comments

Comments
 (0)