From f0bb25cbd93c4943dd8c9f8c8955b6403a0b9261 Mon Sep 17 00:00:00 2001 From: Ilya Nepsha Date: Fri, 7 Apr 2023 08:52:32 +0000 Subject: [PATCH 01/12] Fix batch processing with pydantic models --- aws_lambda_powertools/utilities/batch/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 3aea2b70fa4..81a0fd6008f 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -471,8 +471,8 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons record: dict A batch record to be processed. """ - data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) try: + data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) if self._handler_accepts_lambda_context: result = self.handler(record=data, lambda_context=self.lambda_context) else: @@ -651,8 +651,8 @@ async def _async_process_record(self, record: dict) -> Union[SuccessResponse, Fa record: dict A batch record to be processed. """ - data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) try: + data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) if self._handler_accepts_lambda_context: result = await self.handler(record=data, lambda_context=self.lambda_context) else: From 268d32463db1dfd4d6d267007cb64d53b52a5d18 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 11:57:53 +0200 Subject: [PATCH 02/12] fix: local unbound, convert and check failed models --- aws_lambda_powertools/utilities/batch/base.py | 20 +++++++++- tests/functional/test_utilities_batch.py | 40 +++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 81a0fd6008f..6d90cfbabdb 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -37,6 +37,7 @@ KinesisStreamRecord, ) from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.parser import ValidationError from aws_lambda_powertools.utilities.typing import LambdaContext logger = logging.getLogger(__name__) @@ -316,7 +317,14 @@ def _get_messages_to_report(self) -> List[Dict[str, str]]: def _collect_sqs_failures(self): failures = [] for msg in self.fail_messages: - msg_id = msg.messageId if self.model else msg.message_id + # If a message failed due to model validation (e.g., poison pill) + # we convert to an event source data class...but self.model is still true + # therefore, we do an additional check on whether the failed message is still a model + # see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091 + if self.model and getattr(msg, "parse_obj", None): + msg_id = msg.messageId + else: + msg_id = msg.message_id failures.append({"itemIdentifier": msg_id}) return failures @@ -471,6 +479,7 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons record: dict A batch record to be processed. """ + data: Optional["BatchTypeModels"] = None try: data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) if self._handler_accepts_lambda_context: @@ -479,6 +488,15 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons result = self.handler(record=data) return self.success_handler(record=record, result=result) + # Parser will fail validation if record is a poison pill (malformed input) + # this means we can't collect the message id if we try transforming again + # so we convert into to the equivalent batch type model (e.g., SQS, Kinesis, DynamoDB Stream) + # and downstream we can correctly collect the correct message id identifier and make the failed record available + # see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091 + except ValidationError: + logger.debug("Record cannot be converted to customer's model; converting without model") + failed_record: "EventSourceDataClassTypes" = self._to_batch_type(record=record, event_type=self.event_type) + return self.failure_handler(record=failed_record, exception=sys.exc_info()) except Exception: return self.failure_handler(record=data, exception=sys.exc_info()) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index c98d59a7042..6f728e3ef08 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -775,3 +775,43 @@ def test_async_batch_processor_context_with_failure(sqs_event_factory, async_rec assert batch.response() == { "batchItemFailures": [{"itemIdentifier": first_record.message_id}, {"itemIdentifier": third_record.message_id}] } + + +def test_batch_processor_model_with_partial_validation_error(sqs_event_factory, order_event_factory): + # GIVEN + class Order(BaseModel): + item: dict + + # NOTE: export JSON type and fix Model + class OrderSqs(SqsRecordModel): + body: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("body", pre=True) + def transform_body_to_dict(cls, value: str): + return json.loads(value) + + def record_handler(record: OrderSqs): + if "fail" in record.body.item["type"]: + raise Exception("Failed to process record.") + return record.body.item + + order_event = order_event_factory({"type": "success"}) + first_record = sqs_event_factory(order_event) + second_record = sqs_event_factory(order_event) + malformed_record = sqs_event_factory({"poison": "pill"}) + records = [first_record, malformed_record, second_record] + + # WHEN + processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqs) + with processor(records, record_handler) as batch: + batch.process() + + # THEN + assert len(batch.fail_messages) == 1 + assert batch.response() == { + "batchItemFailures": [ + {"itemIdentifier": malformed_record["messageId"]}, + ] + } From 5aafc3eb0d5c352975926f1eb3baf76d2a106189 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 12:01:14 +0200 Subject: [PATCH 03/12] fix: extend fix to dynamodb and kinesis --- aws_lambda_powertools/utilities/batch/base.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 6d90cfbabdb..fcd13e7aa82 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -331,14 +331,20 @@ def _collect_sqs_failures(self): def _collect_kinesis_failures(self): failures = [] for msg in self.fail_messages: - msg_id = msg.kinesis.sequenceNumber if self.model else msg.kinesis.sequence_number + if self.model and getattr(msg, "parse_obj", None): + msg_id = msg.kinesis.sequenceNumber + else: + msg_id = msg.kinesis.sequence_number failures.append({"itemIdentifier": msg_id}) return failures def _collect_dynamodb_failures(self): failures = [] for msg in self.fail_messages: - msg_id = msg.dynamodb.SequenceNumber if self.model else msg.dynamodb.sequence_number + if self.model and getattr(msg, "parse_obj", None): + msg_id = msg.dynamodb.SequenceNumber + else: + msg_id = msg.dynamodb.sequence_number failures.append({"itemIdentifier": msg_id}) return failures From 725732c67a358d0cf98dea041b0e3bc3036ff59d Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 12:09:05 +0200 Subject: [PATCH 04/12] chore: isolate failed model conversion to keep it clean and documented --- aws_lambda_powertools/utilities/batch/base.py | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index fcd13e7aa82..83905bf29cd 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -361,6 +361,17 @@ def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["B return model.parse_obj(record) return self._DATA_CLASS_MAPPING[event_type](record) + def _register_model_validation_error_record(self, record: dict): + """Convert and register failure due to poison pills where model failed validation early""" + # Parser will fail validation if record is a poison pill (malformed input) + # this means we can't collect the message id if we try transforming again + # so we convert into to the equivalent batch type model (e.g., SQS, Kinesis, DynamoDB Stream) + # and downstream we can correctly collect the correct message id identifier and make the failed record available + # see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091 + logger.debug("Record cannot be converted to customer's model; converting without model") + failed_record: "EventSourceDataClassTypes" = self._to_batch_type(record=record, event_type=self.event_type) + return self.failure_handler(record=failed_record, exception=sys.exc_info()) + class BatchProcessor(BasePartialBatchProcessor): # Keep old name for compatibility """Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB. @@ -494,15 +505,8 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons result = self.handler(record=data) return self.success_handler(record=record, result=result) - # Parser will fail validation if record is a poison pill (malformed input) - # this means we can't collect the message id if we try transforming again - # so we convert into to the equivalent batch type model (e.g., SQS, Kinesis, DynamoDB Stream) - # and downstream we can correctly collect the correct message id identifier and make the failed record available - # see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091 except ValidationError: - logger.debug("Record cannot be converted to customer's model; converting without model") - failed_record: "EventSourceDataClassTypes" = self._to_batch_type(record=record, event_type=self.event_type) - return self.failure_handler(record=failed_record, exception=sys.exc_info()) + return self._register_model_validation_error_record(record) except Exception: return self.failure_handler(record=data, exception=sys.exc_info()) @@ -675,6 +679,7 @@ async def _async_process_record(self, record: dict) -> Union[SuccessResponse, Fa record: dict A batch record to be processed. """ + data: Optional["BatchTypeModels"] = None try: data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) if self._handler_accepts_lambda_context: @@ -683,6 +688,8 @@ async def _async_process_record(self, record: dict) -> Union[SuccessResponse, Fa result = await self.handler(record=data) return self.success_handler(record=record, result=result) + except ValidationError: + return self._register_model_validation_error_record(record) except Exception: return self.failure_handler(record=data, exception=sys.exc_info()) From 8ce83a8fff28608daad8a935c5828f372a8b280a Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 12:10:28 +0200 Subject: [PATCH 05/12] chore: reference issue in ddb and kinesis --- aws_lambda_powertools/utilities/batch/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 83905bf29cd..4cdea6d28f5 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -331,6 +331,7 @@ def _collect_sqs_failures(self): def _collect_kinesis_failures(self): failures = [] for msg in self.fail_messages: + # # see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091 if self.model and getattr(msg, "parse_obj", None): msg_id = msg.kinesis.sequenceNumber else: @@ -341,6 +342,7 @@ def _collect_kinesis_failures(self): def _collect_dynamodb_failures(self): failures = [] for msg in self.fail_messages: + # see https://github.com/awslabs/aws-lambda-powertools-python/issues/2091 if self.model and getattr(msg, "parse_obj", None): msg_id = msg.dynamodb.SequenceNumber else: From 856f66068005b2ad0a4713e26d7a1fc07f8c0f61 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 12:14:25 +0200 Subject: [PATCH 06/12] chore: tech debt to make json deserialization cleaner --- aws_lambda_powertools/utilities/parser/models/sqs.py | 2 +- aws_lambda_powertools/utilities/parser/types.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/models/sqs.py b/aws_lambda_powertools/utilities/parser/models/sqs.py index 1d56c4f8e34..c92a8361b7c 100644 --- a/aws_lambda_powertools/utilities/parser/models/sqs.py +++ b/aws_lambda_powertools/utilities/parser/models/sqs.py @@ -52,7 +52,7 @@ class SqsMsgAttributeModel(BaseModel): class SqsRecordModel(BaseModel): messageId: str receiptHandle: str - body: Union[str, Type[BaseModel]] + body: Union[str, Type[BaseModel], BaseModel] attributes: SqsAttributesModel messageAttributes: Dict[str, SqsMsgAttributeModel] md5OfBody: str diff --git a/aws_lambda_powertools/utilities/parser/types.py b/aws_lambda_powertools/utilities/parser/types.py index e9acceb8963..d3f00646d52 100644 --- a/aws_lambda_powertools/utilities/parser/types.py +++ b/aws_lambda_powertools/utilities/parser/types.py @@ -3,16 +3,18 @@ import sys from typing import Any, Dict, Type, TypeVar, Union -from pydantic import BaseModel +from pydantic import BaseModel, Json # We only need typing_extensions for python versions <3.8 if sys.version_info >= (3, 8): - from typing import Literal # noqa: F401 + from typing import Literal else: - from typing_extensions import Literal # noqa: F401 + from typing_extensions import Literal Model = TypeVar("Model", bound=BaseModel) EnvelopeModel = TypeVar("EnvelopeModel") EventParserReturnType = TypeVar("EventParserReturnType") AnyInheritedModel = Union[Type[BaseModel], BaseModel] RawDictOrModel = Union[Dict[str, Any], AnyInheritedModel] + +__all__ = ["Json", "Literal"] From 704fa1b437a437ce3af4c5bd3923d7f5ab21fb9d Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 12:18:47 +0200 Subject: [PATCH 07/12] chore: test dynamodb partial model poison pill --- tests/functional/test_utilities_batch.py | 59 ++++++++++++++++++++---- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 6f728e3ef08..2e3121a0a79 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -34,7 +34,7 @@ KinesisDataStreamRecordPayload, SqsRecordModel, ) -from aws_lambda_powertools.utilities.parser.types import Literal +from aws_lambda_powertools.utilities.parser.types import Json, Literal from tests.functional.utils import b64_to_str, str_to_b64 @@ -782,15 +782,8 @@ def test_batch_processor_model_with_partial_validation_error(sqs_event_factory, class Order(BaseModel): item: dict - # NOTE: export JSON type and fix Model class OrderSqs(SqsRecordModel): - body: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("body", pre=True) - def transform_body_to_dict(cls, value: str): - return json.loads(value) + body: Json[Order] def record_handler(record: OrderSqs): if "fail" in record.body.item["type"]: @@ -815,3 +808,51 @@ def record_handler(record: OrderSqs): {"itemIdentifier": malformed_record["messageId"]}, ] } + + +def test_batch_processor_dynamodb_context_model_with_partial_validation_error( + dynamodb_event_factory, order_event_factory +): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderDynamoDB(BaseModel): + Message: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("Message", pre=True) + def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): + return json.loads(value["S"]) + + class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): + NewImage: Optional[OrderDynamoDB] + OldImage: Optional[OrderDynamoDB] + + class OrderDynamoDBRecord(DynamoDBStreamRecordModel): + dynamodb: OrderDynamoDBChangeRecord + + def record_handler(record: OrderDynamoDBRecord): + if "fail" in record.dynamodb.NewImage.Message.item["type"]: + raise Exception("Failed to process record.") + return record.dynamodb.NewImage.Message.item + + order_event = order_event_factory({"type": "success"}) + first_record = dynamodb_event_factory(order_event) + second_record = dynamodb_event_factory(order_event) + malformed_record = dynamodb_event_factory({"poison": "pill"}) + records = [first_record, malformed_record, second_record] + + # WHEN + processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) + with processor(records, record_handler) as batch: + batch.process() + + # THEN + assert len(batch.fail_messages) == 1 + assert batch.response() == { + "batchItemFailures": [ + {"itemIdentifier": malformed_record["dynamodb"]["SequenceNumber"]}, + ] + } From 94a2b9551ff277794f97aa0681aa6f7d0d577eb0 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 12:22:04 +0200 Subject: [PATCH 08/12] chore: test kinesis partial model poison pill --- tests/functional/test_utilities_batch.py | 38 ++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 2e3121a0a79..838ede7a668 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -856,3 +856,41 @@ def record_handler(record: OrderDynamoDBRecord): {"itemIdentifier": malformed_record["dynamodb"]["SequenceNumber"]}, ] } + + +def test_batch_processor_kinesis_context_parser_model_with_partial_validation_error( + kinesis_event_factory, order_event_factory +): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): + data: Json[Order] + + class OrderKinesisRecord(KinesisDataStreamRecordModel): + kinesis: OrderKinesisPayloadRecord + + def record_handler(record: OrderKinesisRecord): + if "fail" in record.kinesis.data.item["type"]: + raise Exception("Failed to process record.") + return record.kinesis.data.item + + order_event = order_event_factory({"type": "success"}) + first_record = kinesis_event_factory(order_event) + second_record = kinesis_event_factory(order_event) + malformed_record = kinesis_event_factory({"poison": "pill"}) + records = [first_record, malformed_record, second_record] + + # WHEN + processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) + with processor(records, record_handler) as batch: + batch.process() + + # THEN + assert len(batch.fail_messages) == 1 + assert batch.response() == { + "batchItemFailures": [ + {"itemIdentifier": malformed_record["kinesis"]["SequenceNumber"]}, + ] + } From 6f1f1986d76ab1374de9933a98e5d526b41bf9f4 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 13:23:15 +0200 Subject: [PATCH 09/12] chore: cover async tests --- tests/functional/test_utilities_batch.py | 123 ++++++++++++++++++++++- 1 file changed, 121 insertions(+), 2 deletions(-) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 838ede7a668..d7daece8cac 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -873,13 +873,13 @@ class OrderKinesisRecord(KinesisDataStreamRecordModel): def record_handler(record: OrderKinesisRecord): if "fail" in record.kinesis.data.item["type"]: - raise Exception("Failed to process record.") + raise RuntimeError("Failed to process record.") return record.kinesis.data.item order_event = order_event_factory({"type": "success"}) first_record = kinesis_event_factory(order_event) second_record = kinesis_event_factory(order_event) - malformed_record = kinesis_event_factory({"poison": "pill"}) + malformed_record = kinesis_event_factory('{"poison": "pill"}') records = [first_record, malformed_record, second_record] # WHEN @@ -887,6 +887,125 @@ def record_handler(record: OrderKinesisRecord): with processor(records, record_handler) as batch: batch.process() + # THEN + assert len(batch.fail_messages) == 1 + assert batch.response() == { + "batchItemFailures": [ + {"itemIdentifier": malformed_record["kinesis"]["sequenceNumber"]}, + ] + } + + +def test_async_batch_processor_model_with_partial_validation_error(sqs_event_factory, order_event_factory): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderSqs(SqsRecordModel): + body: Json[Order] + + async def async_record_handler(record: OrderSqs): + if "fail" in record.body.item["type"]: + raise Exception("Failed to process record.") + return record.body.item + + order_event = order_event_factory({"type": "success"}) + first_record = sqs_event_factory(order_event) + second_record = sqs_event_factory(order_event) + malformed_record = sqs_event_factory({"poison": "pill"}) + records = [first_record, malformed_record, second_record] + + # WHEN + processor = AsyncBatchProcessor(event_type=EventType.SQS, model=OrderSqs) + with processor(records, async_record_handler) as batch: + batch.async_process() + + # THEN + assert len(batch.fail_messages) == 1 + assert batch.response() == { + "batchItemFailures": [ + {"itemIdentifier": malformed_record["messageId"]}, + ] + } + + +def test_async_batch_processor_dynamodb_context_model_with_partial_validation_error( + dynamodb_event_factory, order_event_factory +): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderDynamoDB(BaseModel): + Message: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("Message", pre=True) + def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): + return json.loads(value["S"]) + + class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): + NewImage: Optional[OrderDynamoDB] + OldImage: Optional[OrderDynamoDB] + + class OrderDynamoDBRecord(DynamoDBStreamRecordModel): + dynamodb: OrderDynamoDBChangeRecord + + async def record_handler(record: OrderDynamoDBRecord): + if "fail" in record.dynamodb.NewImage.Message.item["type"]: + raise Exception("Failed to process record.") + return record.dynamodb.NewImage.Message.item + + order_event = order_event_factory({"type": "success"}) + first_record = dynamodb_event_factory(order_event) + second_record = dynamodb_event_factory(order_event) + malformed_record = dynamodb_event_factory({"poison": "pill"}) + records = [first_record, malformed_record, second_record] + + # WHEN + processor = AsyncBatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) + with processor(records, record_handler) as batch: + batch.async_process() + + # THEN + assert len(batch.fail_messages) == 1 + assert batch.response() == { + "batchItemFailures": [ + {"itemIdentifier": malformed_record["dynamodb"]["SequenceNumber"]}, + ] + } + + +def test_async_batch_processor_kinesis_context_parser_model_with_partial_validation_error( + kinesis_event_factory, order_event_factory +): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): + data: Json[Order] + + class OrderKinesisRecord(KinesisDataStreamRecordModel): + kinesis: OrderKinesisPayloadRecord + + async def record_handler(record: OrderKinesisRecord): + if "fail" in record.kinesis.data.item["type"]: + raise Exception("Failed to process record.") + return record.kinesis.data.item + + order_event = order_event_factory({"type": "success"}) + first_record = kinesis_event_factory(order_event) + second_record = kinesis_event_factory(order_event) + malformed_record = kinesis_event_factory({"poison": "pill"}) + records = [first_record, malformed_record, second_record] + + # WHEN + processor = AsyncBatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) + with processor(records, record_handler) as batch: + batch.async_process() + # THEN assert len(batch.fail_messages) == 1 assert batch.response() == { From eca5dba6c23580017ac792029eeb7f119821d25a Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 13:44:11 +0200 Subject: [PATCH 10/12] chore: define test models once to ease reading & authoring tests --- tests/functional/batch/__init__.py | 0 tests/functional/batch/sample_models.py | 47 +++++++++++ tests/functional/test_utilities_batch.py | 100 +---------------------- 3 files changed, 51 insertions(+), 96 deletions(-) create mode 100644 tests/functional/batch/__init__.py create mode 100644 tests/functional/batch/sample_models.py diff --git a/tests/functional/batch/__init__.py b/tests/functional/batch/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/functional/batch/sample_models.py b/tests/functional/batch/sample_models.py new file mode 100644 index 00000000000..556ff0ebf8a --- /dev/null +++ b/tests/functional/batch/sample_models.py @@ -0,0 +1,47 @@ +import json +from typing import Dict, Optional + +from aws_lambda_powertools.utilities.parser import BaseModel, validator +from aws_lambda_powertools.utilities.parser.models import ( + DynamoDBStreamChangedRecordModel, + DynamoDBStreamRecordModel, + KinesisDataStreamRecord, + KinesisDataStreamRecordPayload, + SqsRecordModel, +) +from aws_lambda_powertools.utilities.parser.types import Json, Literal + + +class Order(BaseModel): + item: dict + + +class OrderSqs(SqsRecordModel): + body: Json[Order] + + +class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): + data: Json[Order] + + +class OrderKinesisRecord(KinesisDataStreamRecord): + kinesis: OrderKinesisPayloadRecord + + +class OrderDynamoDB(BaseModel): + Message: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("Message", pre=True) + def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): + return json.loads(value["S"]) + + +class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): + NewImage: Optional[OrderDynamoDB] + OldImage: Optional[OrderDynamoDB] + + +class OrderDynamoDBRecord(DynamoDBStreamRecordModel): + dynamodb: OrderDynamoDBChangeRecord diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index d7daece8cac..c29c47987de 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -27,14 +27,8 @@ DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel, ) -from aws_lambda_powertools.utilities.parser.models import ( - KinesisDataStreamRecord as KinesisDataStreamRecordModel, -) -from aws_lambda_powertools.utilities.parser.models import ( - KinesisDataStreamRecordPayload, - SqsRecordModel, -) -from aws_lambda_powertools.utilities.parser.types import Json, Literal +from aws_lambda_powertools.utilities.parser.types import Literal +from tests.functional.batch.sample_models import OrderKinesisRecord, OrderSqs from tests.functional.utils import b64_to_str, str_to_b64 @@ -374,18 +368,6 @@ def lambda_handler(event, context): def test_batch_processor_context_model(sqs_event_factory, order_event_factory): # GIVEN - class Order(BaseModel): - item: dict - - class OrderSqs(SqsRecordModel): - body: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("body", pre=True) - def transform_body_to_dict(cls, value: str): - return json.loads(value) - def record_handler(record: OrderSqs): return record.body.item @@ -411,18 +393,6 @@ def record_handler(record: OrderSqs): def test_batch_processor_context_model_with_failure(sqs_event_factory, order_event_factory): # GIVEN - class Order(BaseModel): - item: dict - - class OrderSqs(SqsRecordModel): - body: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("body", pre=True) - def transform_body_to_dict(cls, value: str): - return json.loads(value) - def record_handler(record: OrderSqs): if "fail" in record.body.item["type"]: raise Exception("Failed to process record.") @@ -544,22 +514,6 @@ def record_handler(record: OrderDynamoDBRecord): def test_batch_processor_kinesis_context_parser_model(kinesis_event_factory, order_event_factory): # GIVEN - class Order(BaseModel): - item: dict - - class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): - data: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("data", pre=True) - def transform_message_to_dict(cls, value: str): - # Powertools KinesisDataStreamRecordModel already decodes b64 to str here - return json.loads(value) - - class OrderKinesisRecord(KinesisDataStreamRecordModel): - kinesis: OrderKinesisPayloadRecord - def record_handler(record: OrderKinesisRecord): return record.kinesis.data.item @@ -585,22 +539,6 @@ def record_handler(record: OrderKinesisRecord): def test_batch_processor_kinesis_context_parser_model_with_failure(kinesis_event_factory, order_event_factory): # GIVEN - class Order(BaseModel): - item: dict - - class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): - data: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("data", pre=True) - def transform_message_to_dict(cls, value: str): - # Powertools KinesisDataStreamRecordModel - return json.loads(value) - - class OrderKinesisRecord(KinesisDataStreamRecordModel): - kinesis: OrderKinesisPayloadRecord - def record_handler(record: OrderKinesisRecord): if "fail" in record.kinesis.data.item["type"]: raise Exception("Failed to process record.") @@ -779,12 +717,6 @@ def test_async_batch_processor_context_with_failure(sqs_event_factory, async_rec def test_batch_processor_model_with_partial_validation_error(sqs_event_factory, order_event_factory): # GIVEN - class Order(BaseModel): - item: dict - - class OrderSqs(SqsRecordModel): - body: Json[Order] - def record_handler(record: OrderSqs): if "fail" in record.body.item["type"]: raise Exception("Failed to process record.") @@ -862,15 +794,6 @@ def test_batch_processor_kinesis_context_parser_model_with_partial_validation_er kinesis_event_factory, order_event_factory ): # GIVEN - class Order(BaseModel): - item: dict - - class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): - data: Json[Order] - - class OrderKinesisRecord(KinesisDataStreamRecordModel): - kinesis: OrderKinesisPayloadRecord - def record_handler(record: OrderKinesisRecord): if "fail" in record.kinesis.data.item["type"]: raise RuntimeError("Failed to process record.") @@ -898,12 +821,6 @@ def record_handler(record: OrderKinesisRecord): def test_async_batch_processor_model_with_partial_validation_error(sqs_event_factory, order_event_factory): # GIVEN - class Order(BaseModel): - item: dict - - class OrderSqs(SqsRecordModel): - body: Json[Order] - async def async_record_handler(record: OrderSqs): if "fail" in record.body.item["type"]: raise Exception("Failed to process record.") @@ -981,15 +898,6 @@ def test_async_batch_processor_kinesis_context_parser_model_with_partial_validat kinesis_event_factory, order_event_factory ): # GIVEN - class Order(BaseModel): - item: dict - - class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): - data: Json[Order] - - class OrderKinesisRecord(KinesisDataStreamRecordModel): - kinesis: OrderKinesisPayloadRecord - async def record_handler(record: OrderKinesisRecord): if "fail" in record.kinesis.data.item["type"]: raise Exception("Failed to process record.") @@ -998,7 +906,7 @@ async def record_handler(record: OrderKinesisRecord): order_event = order_event_factory({"type": "success"}) first_record = kinesis_event_factory(order_event) second_record = kinesis_event_factory(order_event) - malformed_record = kinesis_event_factory({"poison": "pill"}) + malformed_record = kinesis_event_factory('{"poison": "pill"}') records = [first_record, malformed_record, second_record] # WHEN @@ -1010,6 +918,6 @@ async def record_handler(record: OrderKinesisRecord): assert len(batch.fail_messages) == 1 assert batch.response() == { "batchItemFailures": [ - {"itemIdentifier": malformed_record["kinesis"]["SequenceNumber"]}, + {"itemIdentifier": malformed_record["kinesis"]["sequenceNumber"]}, ] } From ee6837b2027fca12f472ee5e3ba647aa7405c8ba Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 13:45:26 +0200 Subject: [PATCH 11/12] chore: pay tech debt to ease authoring Json models --- aws_lambda_powertools/utilities/parser/models/dynamodb.py | 4 ++-- aws_lambda_powertools/utilities/parser/models/kinesis.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/parser/models/dynamodb.py b/aws_lambda_powertools/utilities/parser/models/dynamodb.py index 772b8fb580f..4c85c72d438 100644 --- a/aws_lambda_powertools/utilities/parser/models/dynamodb.py +++ b/aws_lambda_powertools/utilities/parser/models/dynamodb.py @@ -9,8 +9,8 @@ class DynamoDBStreamChangedRecordModel(BaseModel): ApproximateCreationDateTime: Optional[date] Keys: Dict[str, Dict[str, Any]] - NewImage: Optional[Union[Dict[str, Any], Type[BaseModel]]] - OldImage: Optional[Union[Dict[str, Any], Type[BaseModel]]] + NewImage: Optional[Union[Dict[str, Any], Type[BaseModel], BaseModel]] + OldImage: Optional[Union[Dict[str, Any], Type[BaseModel], BaseModel]] SequenceNumber: str SizeBytes: int StreamViewType: Literal["NEW_AND_OLD_IMAGES", "KEYS_ONLY", "NEW_IMAGE", "OLD_IMAGE"] diff --git a/aws_lambda_powertools/utilities/parser/models/kinesis.py b/aws_lambda_powertools/utilities/parser/models/kinesis.py index 6fb9a7076b5..bb6d6b5318f 100644 --- a/aws_lambda_powertools/utilities/parser/models/kinesis.py +++ b/aws_lambda_powertools/utilities/parser/models/kinesis.py @@ -15,7 +15,7 @@ class KinesisDataStreamRecordPayload(BaseModel): kinesisSchemaVersion: str partitionKey: str sequenceNumber: str - data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes + data: Union[bytes, Type[BaseModel], BaseModel] # base64 encoded str is parsed into bytes approximateArrivalTimestamp: float @validator("data", pre=True, allow_reuse=True) From b4cca25886a981ffbdfc9854e2461ff8ffd1d070 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 13:58:41 +0200 Subject: [PATCH 12/12] chore: pay tech debt to reuse record handlers --- tests/functional/test_utilities_batch.py | 184 +++++++++++------------ 1 file changed, 90 insertions(+), 94 deletions(-) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index c29c47987de..2205d47660c 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -28,7 +28,11 @@ DynamoDBStreamRecordModel, ) from aws_lambda_powertools.utilities.parser.types import Literal -from tests.functional.batch.sample_models import OrderKinesisRecord, OrderSqs +from tests.functional.batch.sample_models import ( + OrderDynamoDBRecord, + OrderKinesisRecord, + OrderSqs, +) from tests.functional.utils import b64_to_str, str_to_b64 @@ -113,6 +117,16 @@ def handler(record): return handler +@pytest.fixture(scope="module") +def record_handler_model() -> Callable: + def record_handler(record: OrderSqs): + if "fail" in record.body.item["type"]: + raise Exception("Failed to process record.") + return record.body.item + + return record_handler + + @pytest.fixture(scope="module") def async_record_handler() -> Callable[..., Awaitable[Any]]: async def handler(record): @@ -124,6 +138,16 @@ async def handler(record): return handler +@pytest.fixture(scope="module") +def async_record_handler_model() -> Callable[..., Awaitable[Any]]: + async def async_record_handler(record: OrderSqs): + if "fail" in record.body.item["type"]: + raise ValueError("Failed to process record.") + return record.body.item + + return async_record_handler + + @pytest.fixture(scope="module") def kinesis_record_handler() -> Callable: def handler(record: KinesisStreamRecord): @@ -135,17 +159,57 @@ def handler(record: KinesisStreamRecord): return handler +@pytest.fixture(scope="module") +def kinesis_record_handler_model() -> Callable: + def record_handler(record: OrderKinesisRecord): + if "fail" in record.kinesis.data.item["type"]: + raise ValueError("Failed to process record.") + return record.kinesis.data.item + + return record_handler + + +@pytest.fixture(scope="module") +def async_kinesis_record_handler_model() -> Callable[..., Awaitable[Any]]: + async def record_handler(record: OrderKinesisRecord): + if "fail" in record.kinesis.data.item["type"]: + raise Exception("Failed to process record.") + return record.kinesis.data.item + + return record_handler + + @pytest.fixture(scope="module") def dynamodb_record_handler() -> Callable: def handler(record: DynamoDBRecord): body = record.dynamodb.new_image.get("Message") if "fail" in body: - raise Exception("Failed to process record.") + raise ValueError("Failed to process record.") return body return handler +@pytest.fixture(scope="module") +def dynamodb_record_handler_model() -> Callable: + def record_handler(record: OrderDynamoDBRecord): + if "fail" in record.dynamodb.NewImage.Message.item["type"]: + raise ValueError("Failed to process record.") + return record.dynamodb.NewImage.Message.item + + return record_handler + + +@pytest.fixture(scope="module") +def async_dynamodb_record_handler() -> Callable[..., Awaitable[Any]]: + async def record_handler(record: OrderDynamoDBRecord): + if "fail" in record.dynamodb.NewImage.Message.item["type"]: + raise ValueError("Failed to process record.") + return record.dynamodb.NewImage.Message.item + + return record_handler + + @pytest.fixture(scope="module") def config() -> Config: return Config(region_name="us-east-1") @@ -512,11 +576,10 @@ def record_handler(record: OrderDynamoDBRecord): } -def test_batch_processor_kinesis_context_parser_model(kinesis_event_factory, order_event_factory): +def test_batch_processor_kinesis_context_parser_model( + kinesis_record_handler_model: Callable, kinesis_event_factory, order_event_factory +): # GIVEN - def record_handler(record: OrderKinesisRecord): - return record.kinesis.data.item - order_event = order_event_factory({"type": "success"}) first_record = kinesis_event_factory(order_event) second_record = kinesis_event_factory(order_event) @@ -524,7 +587,7 @@ def record_handler(record: OrderKinesisRecord): # WHEN processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) - with processor(records, record_handler) as batch: + with processor(records, kinesis_record_handler_model) as batch: processed_messages = batch.process() # THEN @@ -537,13 +600,10 @@ def record_handler(record: OrderKinesisRecord): assert batch.response() == {"batchItemFailures": []} -def test_batch_processor_kinesis_context_parser_model_with_failure(kinesis_event_factory, order_event_factory): +def test_batch_processor_kinesis_context_parser_model_with_failure( + kinesis_record_handler_model: Callable, kinesis_event_factory, order_event_factory +): # GIVEN - def record_handler(record: OrderKinesisRecord): - if "fail" in record.kinesis.data.item["type"]: - raise Exception("Failed to process record.") - return record.kinesis.data.item - order_event = order_event_factory({"type": "success"}) order_event_fail = order_event_factory({"type": "fail"}) @@ -554,7 +614,7 @@ def record_handler(record: OrderKinesisRecord): # WHEN processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) - with processor(records, record_handler) as batch: + with processor(records, kinesis_record_handler_model) as batch: batch.process() # THEN @@ -715,13 +775,10 @@ def test_async_batch_processor_context_with_failure(sqs_event_factory, async_rec } -def test_batch_processor_model_with_partial_validation_error(sqs_event_factory, order_event_factory): +def test_batch_processor_model_with_partial_validation_error( + record_handler_model: Callable, sqs_event_factory, order_event_factory +): # GIVEN - def record_handler(record: OrderSqs): - if "fail" in record.body.item["type"]: - raise Exception("Failed to process record.") - return record.body.item - order_event = order_event_factory({"type": "success"}) first_record = sqs_event_factory(order_event) second_record = sqs_event_factory(order_event) @@ -730,7 +787,7 @@ def record_handler(record: OrderSqs): # WHEN processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqs) - with processor(records, record_handler) as batch: + with processor(records, record_handler_model) as batch: batch.process() # THEN @@ -743,33 +800,9 @@ def record_handler(record: OrderSqs): def test_batch_processor_dynamodb_context_model_with_partial_validation_error( - dynamodb_event_factory, order_event_factory + dynamodb_record_handler_model: Callable, dynamodb_event_factory, order_event_factory ): # GIVEN - class Order(BaseModel): - item: dict - - class OrderDynamoDB(BaseModel): - Message: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("Message", pre=True) - def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): - return json.loads(value["S"]) - - class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): - NewImage: Optional[OrderDynamoDB] - OldImage: Optional[OrderDynamoDB] - - class OrderDynamoDBRecord(DynamoDBStreamRecordModel): - dynamodb: OrderDynamoDBChangeRecord - - def record_handler(record: OrderDynamoDBRecord): - if "fail" in record.dynamodb.NewImage.Message.item["type"]: - raise Exception("Failed to process record.") - return record.dynamodb.NewImage.Message.item - order_event = order_event_factory({"type": "success"}) first_record = dynamodb_event_factory(order_event) second_record = dynamodb_event_factory(order_event) @@ -778,7 +811,7 @@ def record_handler(record: OrderDynamoDBRecord): # WHEN processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) - with processor(records, record_handler) as batch: + with processor(records, dynamodb_record_handler_model) as batch: batch.process() # THEN @@ -791,14 +824,9 @@ def record_handler(record: OrderDynamoDBRecord): def test_batch_processor_kinesis_context_parser_model_with_partial_validation_error( - kinesis_event_factory, order_event_factory + kinesis_record_handler_model: Callable, kinesis_event_factory, order_event_factory ): # GIVEN - def record_handler(record: OrderKinesisRecord): - if "fail" in record.kinesis.data.item["type"]: - raise RuntimeError("Failed to process record.") - return record.kinesis.data.item - order_event = order_event_factory({"type": "success"}) first_record = kinesis_event_factory(order_event) second_record = kinesis_event_factory(order_event) @@ -807,7 +835,7 @@ def record_handler(record: OrderKinesisRecord): # WHEN processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) - with processor(records, record_handler) as batch: + with processor(records, kinesis_record_handler_model) as batch: batch.process() # THEN @@ -819,13 +847,10 @@ def record_handler(record: OrderKinesisRecord): } -def test_async_batch_processor_model_with_partial_validation_error(sqs_event_factory, order_event_factory): +def test_async_batch_processor_model_with_partial_validation_error( + async_record_handler_model: Callable, sqs_event_factory, order_event_factory +): # GIVEN - async def async_record_handler(record: OrderSqs): - if "fail" in record.body.item["type"]: - raise Exception("Failed to process record.") - return record.body.item - order_event = order_event_factory({"type": "success"}) first_record = sqs_event_factory(order_event) second_record = sqs_event_factory(order_event) @@ -834,7 +859,7 @@ async def async_record_handler(record: OrderSqs): # WHEN processor = AsyncBatchProcessor(event_type=EventType.SQS, model=OrderSqs) - with processor(records, async_record_handler) as batch: + with processor(records, async_record_handler_model) as batch: batch.async_process() # THEN @@ -847,33 +872,9 @@ async def async_record_handler(record: OrderSqs): def test_async_batch_processor_dynamodb_context_model_with_partial_validation_error( - dynamodb_event_factory, order_event_factory + async_dynamodb_record_handler: Callable, dynamodb_event_factory, order_event_factory ): # GIVEN - class Order(BaseModel): - item: dict - - class OrderDynamoDB(BaseModel): - Message: Order - - # auto transform json string - # so Pydantic can auto-initialize nested Order model - @validator("Message", pre=True) - def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): - return json.loads(value["S"]) - - class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): - NewImage: Optional[OrderDynamoDB] - OldImage: Optional[OrderDynamoDB] - - class OrderDynamoDBRecord(DynamoDBStreamRecordModel): - dynamodb: OrderDynamoDBChangeRecord - - async def record_handler(record: OrderDynamoDBRecord): - if "fail" in record.dynamodb.NewImage.Message.item["type"]: - raise Exception("Failed to process record.") - return record.dynamodb.NewImage.Message.item - order_event = order_event_factory({"type": "success"}) first_record = dynamodb_event_factory(order_event) second_record = dynamodb_event_factory(order_event) @@ -882,7 +883,7 @@ async def record_handler(record: OrderDynamoDBRecord): # WHEN processor = AsyncBatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) - with processor(records, record_handler) as batch: + with processor(records, async_dynamodb_record_handler) as batch: batch.async_process() # THEN @@ -895,14 +896,9 @@ async def record_handler(record: OrderDynamoDBRecord): def test_async_batch_processor_kinesis_context_parser_model_with_partial_validation_error( - kinesis_event_factory, order_event_factory + async_kinesis_record_handler_model: Callable, kinesis_event_factory, order_event_factory ): # GIVEN - async def record_handler(record: OrderKinesisRecord): - if "fail" in record.kinesis.data.item["type"]: - raise Exception("Failed to process record.") - return record.kinesis.data.item - order_event = order_event_factory({"type": "success"}) first_record = kinesis_event_factory(order_event) second_record = kinesis_event_factory(order_event) @@ -911,7 +907,7 @@ async def record_handler(record: OrderKinesisRecord): # WHEN processor = AsyncBatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) - with processor(records, record_handler) as batch: + with processor(records, async_kinesis_record_handler_model) as batch: batch.async_process() # THEN