Skip to content

Commit 580eeae

Browse files
committed
docs: add pydantic section
1 parent 9057791 commit 580eeae

File tree

1 file changed

+141
-0
lines changed

1 file changed

+141
-0
lines changed

docs/utilities/batch.md

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,147 @@ You need to create a function to handle each record from the batch - We call it
707707

708708
## Advanced
709709

710+
### Pydantic integration
711+
712+
You can bring your own Pydantic models via **`model`** parameter when inheriting from **`SqsRecordModel`**, **`KinesisDataStreamRecord`**, or **`DynamoDBStreamRecordModel`**
713+
714+
Inheritance is importance because we need to access message IDs and sequence numbers from these records in the event of failure. Mypy is fully integrated with this utility, so it should identify whether you're passing the incorrect Model.
715+
716+
717+
=== "SQS"
718+
719+
```python hl_lines="5 9-10 12-19 21 27"
720+
import json
721+
722+
from aws_lambda_powertools import Logger, Tracer
723+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
724+
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
725+
from aws_lambda_powertools.utilities.typing import LambdaContext
726+
727+
728+
class Order(BaseModel):
729+
item: dict
730+
731+
class OrderSqsRecord(SqsRecordModel):
732+
body: Order
733+
734+
# auto transform json string
735+
# so Pydantic can auto-initialize nested Order model
736+
@validator("body", pre=True)
737+
def transform_body_to_dict(cls, value: str):
738+
return json.loads(value)
739+
740+
processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord)
741+
tracer = Tracer()
742+
logger = Logger()
743+
744+
745+
@tracer.capture_method
746+
def record_handler(record: OrderSqsRecord):
747+
return record.body.item
748+
749+
@logger.inject_lambda_context
750+
@tracer.capture_lambda_handler
751+
@batch_processor(record_handler=record_handler, processor=processor)
752+
def lambda_handler(event, context: LambdaContext):
753+
return processor.response()
754+
```
755+
756+
=== "Kinesis Data Streams"
757+
758+
```python hl_lines="5 9-10 12-20 22-23 26 32"
759+
import json
760+
761+
from aws_lambda_powertools import Logger, Tracer
762+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
763+
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord
764+
from aws_lambda_powertools.utilities.typing import LambdaContext
765+
766+
767+
class Order(BaseModel):
768+
item: dict
769+
770+
class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload):
771+
data: Order
772+
773+
# auto transform json string
774+
# so Pydantic can auto-initialize nested Order model
775+
@validator("data", pre=True)
776+
def transform_message_to_dict(cls, value: str):
777+
# Powertools KinesisDataStreamRecordModel already decodes b64 to str here
778+
return json.loads(value)
779+
780+
class OrderKinesisRecord(KinesisDataStreamRecordModel):
781+
kinesis: OrderKinesisPayloadRecord
782+
783+
784+
processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord)
785+
tracer = Tracer()
786+
logger = Logger()
787+
788+
789+
@tracer.capture_method
790+
def record_handler(record: OrderKinesisRecord):
791+
return record.kinesis.data.item
792+
793+
794+
@logger.inject_lambda_context
795+
@tracer.capture_lambda_handler
796+
@batch_processor(record_handler=record_handler, processor=processor)
797+
def lambda_handler(event, context: LambdaContext):
798+
return processor.response()
799+
```
800+
801+
=== "DynamoDB Streams"
802+
803+
```python hl_lines="7 11-12 14-21 23-25 27-28 31 37"
804+
import json
805+
806+
from typing import Dict, Literal
807+
808+
from aws_lambda_powertools import Logger, Tracer
809+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
810+
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel
811+
from aws_lambda_powertools.utilities.typing import LambdaContext
812+
813+
814+
class Order(BaseModel):
815+
item: dict
816+
817+
class OrderDynamoDB(BaseModel):
818+
Message: Order
819+
820+
# auto transform json string
821+
# so Pydantic can auto-initialize nested Order model
822+
@validator("Message", pre=True)
823+
def transform_message_to_dict(cls, value: Dict[Literal["S"], str]):
824+
return json.loads(value["S"])
825+
826+
class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel):
827+
NewImage: Optional[OrderDynamoDB]
828+
OldImage: Optional[OrderDynamoDB]
829+
830+
class OrderDynamoDBRecord(DynamoDBStreamRecordModel):
831+
dynamodb: OrderDynamoDBChangeRecord
832+
833+
834+
processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderKinesisRecord)
835+
tracer = Tracer()
836+
logger = Logger()
837+
838+
839+
@tracer.capture_method
840+
def record_handler(record: OrderDynamoDBRecord):
841+
return record.dynamodb.NewImage.Message.item
842+
843+
844+
@logger.inject_lambda_context
845+
@tracer.capture_lambda_handler
846+
@batch_processor(record_handler=record_handler, processor=processor)
847+
def lambda_handler(event, context: LambdaContext):
848+
return processor.response()
849+
```
850+
710851
### Accessing processed messages
711852

712853
Use the context manager to access a list of all returned values from your `record_handler` function.

0 commit comments

Comments
 (0)