From 8929bc94fe0dd446e72931915b37c18e7819574a Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 6 Apr 2023 11:16:09 +0200 Subject: [PATCH 01/10] refactor: move decorators to its separate file --- .../utilities/batch/__init__.py | 6 +- aws_lambda_powertools/utilities/batch/base.py | 107 +----------------- .../utilities/batch/decorators.py | 99 ++++++++++++++++ 3 files changed, 104 insertions(+), 108 deletions(-) create mode 100644 aws_lambda_powertools/utilities/batch/decorators.py diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 0e2637cc358..24f760e092d 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -12,6 +12,8 @@ EventType, FailureResponse, SuccessResponse, +) +from aws_lambda_powertools.utilities.batch.decorators import ( async_batch_processor, batch_processor, ) @@ -22,6 +24,8 @@ from aws_lambda_powertools.utilities.batch.types import BatchTypeModels __all__ = ( + "async_batch_processor", + "batch_processor", "BatchProcessor", "AsyncBatchProcessor", "BasePartialProcessor", @@ -32,6 +36,4 @@ "FailureResponse", "SuccessResponse", "SqsFifoPartialProcessor", - "batch_processor", - "async_batch_processor", ) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 3aea2b70fa4..38930f7bb13 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -11,19 +11,8 @@ import sys from abc import ABC, abstractmethod from enum import Enum -from typing import ( - Any, - Awaitable, - Callable, - Dict, - List, - Optional, - Tuple, - Union, - overload, -) +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, overload -from aws_lambda_powertools.middleware_factory import lambda_handler_decorator from aws_lambda_powertools.shared import constants from aws_lambda_powertools.utilities.batch.exceptions import ( BatchProcessingError, @@ -483,51 +472,6 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons return self.failure_handler(record=data, exception=sys.exc_info()) -@lambda_handler_decorator -def batch_processor( - handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BatchProcessor -): - """ - Middleware to handle batch event processing - - Parameters - ---------- - handler: Callable - Lambda's handler - event: Dict - Lambda's Event - context: LambdaContext - Lambda's Context - record_handler: Callable - Callable or corutine to process each record from the batch - processor: BatchProcessor - Batch Processor to handle partial failure cases - - Examples - -------- - **Processes Lambda's event with a BasePartialProcessor** - - >>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor - >>> - >>> def record_handler(record): - >>> return record["body"] - >>> - >>> @batch_processor(record_handler=record_handler, processor=BatchProcessor()) - >>> def handler(event, context): - >>> return {"StatusCode": 200} - - Limitations - ----------- - * Async batch processors. Use `async_batch_processor` instead. - """ - records = event["Records"] - - with processor(records, record_handler, lambda_context=context): - processor.process() - - return handler(event, context) - - class AsyncBatchProcessor(BasePartialBatchProcessor): """Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB asynchronously. @@ -661,52 +605,3 @@ async def _async_process_record(self, record: dict) -> Union[SuccessResponse, Fa return self.success_handler(record=record, result=result) except Exception: return self.failure_handler(record=data, exception=sys.exc_info()) - - -@lambda_handler_decorator -def async_batch_processor( - handler: Callable, - event: Dict, - context: LambdaContext, - record_handler: Callable[..., Awaitable[Any]], - processor: AsyncBatchProcessor, -): - """ - Middleware to handle batch event processing - Parameters - ---------- - handler: Callable - Lambda's handler - event: Dict - Lambda's Event - context: LambdaContext - Lambda's Context - record_handler: Callable[..., Awaitable[Any]] - Callable to process each record from the batch - processor: AsyncBatchProcessor - Batch Processor to handle partial failure cases - Examples - -------- - **Processes Lambda's event with a BasePartialProcessor** - >>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor - >>> - >>> async def async_record_handler(record): - >>> payload: str = record.body - >>> return payload - >>> - >>> processor = AsyncBatchProcessor(event_type=EventType.SQS) - >>> - >>> @async_batch_processor(record_handler=async_record_handler, processor=processor) - >>> async def lambda_handler(event, context: LambdaContext): - >>> return processor.response() - - Limitations - ----------- - * Sync batch processors. Use `batch_processor` instead. - """ - records = event["Records"] - - with processor(records, record_handler, lambda_context=context): - processor.async_process() - - return handler(event, context) diff --git a/aws_lambda_powertools/utilities/batch/decorators.py b/aws_lambda_powertools/utilities/batch/decorators.py new file mode 100644 index 00000000000..17990ca8729 --- /dev/null +++ b/aws_lambda_powertools/utilities/batch/decorators.py @@ -0,0 +1,99 @@ +from typing import Any, Awaitable, Callable, Dict + +from aws_lambda_powertools.middleware_factory import lambda_handler_decorator +from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, BatchProcessor +from aws_lambda_powertools.utilities.typing import LambdaContext + + +@lambda_handler_decorator +def async_batch_processor( + handler: Callable, + event: Dict, + context: LambdaContext, + record_handler: Callable[..., Awaitable[Any]], + processor: AsyncBatchProcessor, +): + """ + Middleware to handle batch event processing + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: LambdaContext + Lambda's Context + record_handler: Callable[..., Awaitable[Any]] + Callable to process each record from the batch + processor: AsyncBatchProcessor + Batch Processor to handle partial failure cases + Examples + -------- + **Processes Lambda's event with a BasePartialProcessor** + >>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor + >>> + >>> async def async_record_handler(record): + >>> payload: str = record.body + >>> return payload + >>> + >>> processor = AsyncBatchProcessor(event_type=EventType.SQS) + >>> + >>> @async_batch_processor(record_handler=async_record_handler, processor=processor) + >>> async def lambda_handler(event, context: LambdaContext): + >>> return processor.response() + + Limitations + ----------- + * Sync batch processors. Use `batch_processor` instead. + """ + records = event["Records"] + + with processor(records, record_handler, lambda_context=context): + processor.async_process() + + return handler(event, context) + + +@lambda_handler_decorator +def batch_processor( + handler: Callable, event: Dict, context: LambdaContext, record_handler: Callable, processor: BatchProcessor +): + """ + Middleware to handle batch event processing + + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: LambdaContext + Lambda's Context + record_handler: Callable + Callable or corutine to process each record from the batch + processor: BatchProcessor + Batch Processor to handle partial failure cases + + Examples + -------- + **Processes Lambda's event with a BasePartialProcessor** + + >>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor + >>> + >>> def record_handler(record): + >>> return record["body"] + >>> + >>> @batch_processor(record_handler=record_handler, processor=BatchProcessor()) + >>> def handler(event, context): + >>> return {"StatusCode": 200} + + Limitations + ----------- + * Async batch processors. Use `async_batch_processor` instead. + """ + records = event["Records"] + + with processor(records, record_handler, lambda_context=context): + processor.process() + + return handler(event, context) From a565a6ad4d9bf141d527517338914c0cd9493ff2 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 6 Apr 2023 14:13:54 +0200 Subject: [PATCH 02/10] feat: add process_partial_response --- .../utilities/batch/__init__.py | 2 ++ .../utilities/batch/decorators.py | 35 +++++++++++++++++-- .../utilities/batch/types.py | 12 ++++++- tests/functional/test_utilities_batch.py | 33 +++++++++++++++++ 4 files changed, 79 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 24f760e092d..8143e961af5 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -16,6 +16,7 @@ from aws_lambda_powertools.utilities.batch.decorators import ( async_batch_processor, batch_processor, + process_partial_response, ) from aws_lambda_powertools.utilities.batch.exceptions import ExceptionInfo from aws_lambda_powertools.utilities.batch.sqs_fifo_partial_processor import ( @@ -26,6 +27,7 @@ __all__ = ( "async_batch_processor", "batch_processor", + "process_partial_response", "BatchProcessor", "AsyncBatchProcessor", "BasePartialProcessor", diff --git a/aws_lambda_powertools/utilities/batch/decorators.py b/aws_lambda_powertools/utilities/batch/decorators.py index 17990ca8729..bbc62d42f3a 100644 --- a/aws_lambda_powertools/utilities/batch/decorators.py +++ b/aws_lambda_powertools/utilities/batch/decorators.py @@ -1,7 +1,16 @@ -from typing import Any, Awaitable, Callable, Dict +from __future__ import annotations + +from typing import Any, Awaitable, Callable, Dict, List from aws_lambda_powertools.middleware_factory import lambda_handler_decorator -from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, BatchProcessor +from aws_lambda_powertools.utilities.batch import ( + AsyncBatchProcessor, + BasePartialBatchProcessor, + BatchProcessor, + EventType, +) +from aws_lambda_powertools.utilities.batch.types import PartialItemFailureResponse +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper from aws_lambda_powertools.utilities.typing import LambdaContext @@ -97,3 +106,25 @@ def batch_processor( processor.process() return handler(event, context) + + +def process_partial_response( + event: Dict | DictWrapper, + record_handler: Callable, + processor: BasePartialBatchProcessor, + context: LambdaContext | None = None, +) -> PartialItemFailureResponse: + try: + records: List[Dict] = event.get("Records", []) + except AttributeError: + event_types = ", ".join(list(EventType.__members__)) + docs = "https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line + raise ValueError( + f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n" + f"See sample events in our documentation for either {event_types}: \n {docs}" + ) + + with processor(records, record_handler, context): + processor.process() + + return processor.response() diff --git a/aws_lambda_powertools/utilities/batch/types.py b/aws_lambda_powertools/utilities/batch/types.py index 1fc5aba4fc4..89a5e4e3486 100644 --- a/aws_lambda_powertools/utilities/batch/types.py +++ b/aws_lambda_powertools/utilities/batch/types.py @@ -2,7 +2,9 @@ # type specifics # import sys -from typing import Optional, Type, Union +from typing import List, Optional, Type, Union + +from typing_extensions import TypedDict has_pydantic = "pydantic" in sys.modules @@ -22,3 +24,11 @@ else: BatchTypeModels = "BatchTypeModels" # type: ignore BatchSqsTypeModel = "BatchSqsTypeModel" # type: ignore + + +class PartialItemFailures(TypedDict): + itemIdentifier: str + + +class PartialItemFailureResponse(TypedDict): + batchItemFailures: List[PartialItemFailures] diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index c98d59a7042..ea9b80650e3 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -13,6 +13,7 @@ SqsFifoPartialProcessor, async_batch_processor, batch_processor, + process_partial_response, ) from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( @@ -775,3 +776,35 @@ def test_async_batch_processor_context_with_failure(sqs_event_factory, async_rec assert batch.response() == { "batchItemFailures": [{"itemIdentifier": first_record.message_id}, {"itemIdentifier": third_record.message_id}] } + + +def test_process_partial_response(sqs_event_factory, record_handler): + # GIVEN + records = [sqs_event_factory("success"), sqs_event_factory("success")] + batch = {"Records": records} + processor = BatchProcessor(event_type=EventType.SQS) + + # WHEN + ret = process_partial_response(batch, record_handler, processor) + + # THEN + assert ret == {"batchItemFailures": []} + + +@pytest.mark.parametrize( + "batch", + [ + pytest.param(123456789, id="num"), + pytest.param([], id="list"), + pytest.param(False, id="bool"), + pytest.param(object, id="object"), + pytest.param(lambda x: x, id="callable"), + ], +) +def test_process_partial_response_invalid_input(record_handler: Callable, batch: Any): + # GIVEN + processor = BatchProcessor(event_type=EventType.SQS) + + # WHEN/THEN + with pytest.raises(ValueError): + process_partial_response(batch, record_handler, processor) From db04b42ba9902ac1fa405c0697d229491adf13f0 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 6 Apr 2023 15:19:58 +0200 Subject: [PATCH 03/10] docs: add docstring --- .../utilities/batch/decorators.py | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/aws_lambda_powertools/utilities/batch/decorators.py b/aws_lambda_powertools/utilities/batch/decorators.py index bbc62d42f3a..8e7b2db5f91 100644 --- a/aws_lambda_powertools/utilities/batch/decorators.py +++ b/aws_lambda_powertools/utilities/batch/decorators.py @@ -70,6 +70,10 @@ def batch_processor( """ Middleware to handle batch event processing + NOTE + ---- + Consider using process_partial_response function for an easier experience. + Parameters ---------- handler: Callable @@ -114,6 +118,48 @@ def process_partial_response( processor: BasePartialBatchProcessor, context: LambdaContext | None = None, ) -> PartialItemFailureResponse: + """ + Higher level function to handle batch event processing. + + Parameters + ---------- + event: Dict + Lambda's original event + record_handler: Callable + Callable to process each record from the batch + processor: BasePartialBatchProcessor + Batch Processor to handle partial failure cases + context: LambdaContext + Lambda's context, used to optionally inject in record handler + + Returns + ------- + result: PartialItemFailureResponse + Lambda Partial Batch Response + + Examples + -------- + **Processes Lambda's SQS event** + + ```python + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + + processor = BatchProcessor(EventType.SQS) + + def record_handler(record: SQSRecord): + return record.body + + def handler(event, context): + return process_partial_response( + event=event, record_handler=record_handler, processor=processor, context=context + ) + ``` + + Limitations + ----------- + * Async batch processors. Use `async_process_partial_response` instead. + """ try: records: List[Dict] = event.get("Records", []) except AttributeError: From ee0537314f345ecbc05dd8ca0e03f85a52c12f4d Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 6 Apr 2023 17:07:44 +0200 Subject: [PATCH 04/10] chore: fix docstrings --- .../utilities/batch/decorators.py | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/decorators.py b/aws_lambda_powertools/utilities/batch/decorators.py index 8e7b2db5f91..d6fec28dcb8 100644 --- a/aws_lambda_powertools/utilities/batch/decorators.py +++ b/aws_lambda_powertools/utilities/batch/decorators.py @@ -10,7 +10,6 @@ EventType, ) from aws_lambda_powertools.utilities.batch.types import PartialItemFailureResponse -from aws_lambda_powertools.utilities.data_classes.common import DictWrapper from aws_lambda_powertools.utilities.typing import LambdaContext @@ -24,6 +23,11 @@ def async_batch_processor( ): """ Middleware to handle batch event processing + + Notes + ----- + Consider using async_process_partial_response function for an easier experience. + Parameters ---------- handler: Callable @@ -36,17 +40,19 @@ def async_batch_processor( Callable to process each record from the batch processor: AsyncBatchProcessor Batch Processor to handle partial failure cases + Examples -------- **Processes Lambda's event with a BasePartialProcessor** >>> from aws_lambda_powertools.utilities.batch import async_batch_processor, AsyncBatchProcessor + >>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord >>> - >>> async def async_record_handler(record): + >>> processor = AsyncBatchProcessor(event_type=EventType.SQS) + >>> + >>> async def async_record_handler(record: SQSRecord): >>> payload: str = record.body >>> return payload >>> - >>> processor = AsyncBatchProcessor(event_type=EventType.SQS) - >>> >>> @async_batch_processor(record_handler=async_record_handler, processor=processor) >>> async def lambda_handler(event, context: LambdaContext): >>> return processor.response() @@ -70,8 +76,8 @@ def batch_processor( """ Middleware to handle batch event processing - NOTE - ---- + Notes + ----- Consider using process_partial_response function for an easier experience. Parameters @@ -89,16 +95,19 @@ def batch_processor( Examples -------- - **Processes Lambda's event with a BasePartialProcessor** + **Processes Lambda's event with a BatchProcessor** - >>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor + >>> from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType + >>> from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + >>> + >>> processor = BatchProcessor(EventType.SQS) >>> >>> def record_handler(record): >>> return record["body"] >>> >>> @batch_processor(record_handler=record_handler, processor=BatchProcessor()) >>> def handler(event, context): - >>> return {"StatusCode": 200} + >>> return processor.response() Limitations ----------- @@ -113,7 +122,7 @@ def batch_processor( def process_partial_response( - event: Dict | DictWrapper, + event: Dict, record_handler: Callable, processor: BasePartialBatchProcessor, context: LambdaContext | None = None, From 42cda1f8afc7dea42e13beea3ffb27c51a58ec0a Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 6 Apr 2023 17:18:15 +0200 Subject: [PATCH 05/10] feat: add async_process_partial_response --- .../utilities/batch/__init__.py | 2 + .../utilities/batch/decorators.py | 66 ++++++++++++++++++- tests/functional/test_utilities_batch.py | 33 ++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 8143e961af5..e135655ef61 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -15,6 +15,7 @@ ) from aws_lambda_powertools.utilities.batch.decorators import ( async_batch_processor, + async_process_partial_response, batch_processor, process_partial_response, ) @@ -26,6 +27,7 @@ __all__ = ( "async_batch_processor", + "async_process_partial_response", "batch_processor", "process_partial_response", "BatchProcessor", diff --git a/aws_lambda_powertools/utilities/batch/decorators.py b/aws_lambda_powertools/utilities/batch/decorators.py index d6fec28dcb8..8f594c1479e 100644 --- a/aws_lambda_powertools/utilities/batch/decorators.py +++ b/aws_lambda_powertools/utilities/batch/decorators.py @@ -54,7 +54,7 @@ def async_batch_processor( >>> return payload >>> >>> @async_batch_processor(record_handler=async_record_handler, processor=processor) - >>> async def lambda_handler(event, context: LambdaContext): + >>> def lambda_handler(event, context): >>> return processor.response() Limitations @@ -183,3 +183,67 @@ def handler(event, context): processor.process() return processor.response() + + +def async_process_partial_response( + event: Dict, + record_handler: Callable, + processor: AsyncBatchProcessor, + context: LambdaContext | None = None, +) -> PartialItemFailureResponse: + """ + Higher level function to handle batch event processing asynchronously. + + Parameters + ---------- + event: Dict + Lambda's original event + record_handler: Callable + Callable to process each record from the batch + processor: AsyncBatchProcessor + Batch Processor to handle partial failure cases + context: LambdaContext + Lambda's context, used to optionally inject in record handler + + Returns + ------- + result: PartialItemFailureResponse + Lambda Partial Batch Response + + Examples + -------- + **Processes Lambda's SQS event** + + ```python + from aws_lambda_powertools.utilities.batch import AsyncBatchProcessor, EventType, process_partial_response + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + + processor = BatchProcessor(EventType.SQS) + + async def record_handler(record: SQSRecord): + return record.body + + def handler(event, context): + return async_process_partial_response( + event=event, record_handler=record_handler, processor=processor, context=context + ) + ``` + + Limitations + ----------- + * Sync batch processors. Use `process_partial_response` instead. + """ + try: + records: List[Dict] = event.get("Records", []) + except AttributeError: + event_types = ", ".join(list(EventType.__members__)) + docs = "https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/batch/#processing-messages-from-sqs" # noqa: E501 # long-line + raise ValueError( + f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n" + f"See sample events in our documentation for either {event_types}: \n {docs}" + ) + + with processor(records, record_handler, context): + processor.async_process() + + return processor.response() diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index ea9b80650e3..a9857c413d0 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -12,6 +12,7 @@ EventType, SqsFifoPartialProcessor, async_batch_processor, + async_process_partial_response, batch_processor, process_partial_response, ) @@ -808,3 +809,35 @@ def test_process_partial_response_invalid_input(record_handler: Callable, batch: # WHEN/THEN with pytest.raises(ValueError): process_partial_response(batch, record_handler, processor) + + +def test_async_process_partial_response(sqs_event_factory, async_record_handler): + # GIVEN + records = [sqs_event_factory("success"), sqs_event_factory("success")] + batch = {"Records": records} + processor = AsyncBatchProcessor(event_type=EventType.SQS) + + # WHEN + ret = async_process_partial_response(batch, async_record_handler, processor) + + # THEN + assert ret == {"batchItemFailures": []} + + +@pytest.mark.parametrize( + "batch", + [ + pytest.param(123456789, id="num"), + pytest.param([], id="list"), + pytest.param(False, id="bool"), + pytest.param(object, id="object"), + pytest.param(lambda x: x, id="callable"), + ], +) +def test_async_process_partial_response_invalid_input(async_record_handler: Callable, batch: Any): + # GIVEN + processor = AsyncBatchProcessor(event_type=EventType.SQS) + + # WHEN/THEN + with pytest.raises(ValueError): + async_process_partial_response(batch, record_handler, processor) From df9e8a390b5f70d19fbcff580c0a9431098f1509 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 6 Apr 2023 20:07:24 +0200 Subject: [PATCH 06/10] docs: recommend process_partial_response and necessary fixes replace all examples with batch_processor with process_partial_response and its async equivalent. renamed 'As a decorator' code sections to make it explicitly legacy. add new section 'Recommended' using the new behaviour fixed all incorrect Pydantic examples while replacing batch_processor --- docs/utilities/batch.md | 161 ++++++++++-------- .../src/advanced_accessing_lambda_context.py | 30 ++++ ..._processor.py => getting_started_async.py} | 7 +- .../src/getting_started_dynamodb.py | 30 ++++ .../src/getting_started_kinesis.py | 28 +++ .../src/getting_started_sqs.py | 29 ++++ .../src/getting_started_sqs_fifo.py | 22 +++ ...tting_started_sqs_fifo_context_manager.py} | 0 ... => getting_started_sqs_fifo_decorator.py} | 0 9 files changed, 236 insertions(+), 71 deletions(-) create mode 100644 examples/batch_processing/src/advanced_accessing_lambda_context.py rename examples/batch_processing/src/{getting_started_async_batch_processor.py => getting_started_async.py} (79%) create mode 100644 examples/batch_processing/src/getting_started_dynamodb.py create mode 100644 examples/batch_processing/src/getting_started_kinesis.py create mode 100644 examples/batch_processing/src/getting_started_sqs.py create mode 100644 examples/batch_processing/src/getting_started_sqs_fifo.py rename examples/batch_processing/src/{sqs_fifo_batch_processor_context_manager.py => getting_started_sqs_fifo_context_manager.py} (100%) rename examples/batch_processing/src/{sqs_fifo_batch_processor.py => getting_started_sqs_fifo_decorator.py} (100%) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 0f899673c2e..780baa4b081 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -229,9 +229,15 @@ Processing batches from SQS works in four stages: ???+ info This code example optionally uses Tracer and Logger for completion. -=== "As a decorator" +=== "Recommended" - ```python hl_lines="4-5 9 15 23 25" + ```python hl_lines="4 9 12 18 29" + --8<-- "examples/batch_processing/src/getting_started_sqs.py" + ``` + +=== "As a context manager" + + ```python hl_lines="4-5 9 15 24-26 28" import json from aws_lambda_powertools import Logger, Tracer @@ -254,14 +260,17 @@ Processing batches from SQS works in four stages: @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + return processor.response() ``` -=== "As a context manager" +=== "As a decorator (legacy)" - ```python hl_lines="4-5 9 15 24-26 28" + ```python hl_lines="4-5 9 15 23 25" import json from aws_lambda_powertools import Logger, Tracer @@ -284,11 +293,8 @@ Processing batches from SQS works in four stages: @logger.inject_lambda_context @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - return processor.response() ``` @@ -352,16 +358,22 @@ Processing batches from SQS works in four stages: When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. This helps preserve the ordering of messages in your queue. -=== "As a decorator" +=== "Recommended" - ```python hl_lines="5 11" - --8<-- "examples/batch_processing/src/sqs_fifo_batch_processor.py" + ```python hl_lines="3 9" + --8<-- "examples/batch_processing/src/getting_started_sqs_fifo.py" ``` === "As a context manager" - ```python hl_lines="4 8" - --8<-- "examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py" + ```python hl_lines="2 6" + --8<-- "examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py" + ``` + +=== "As a decorator (legacy)" + + ```python hl_lines="3 9" + --8<-- "examples/batch_processing/src/getting_started_sqs_fifo_decorator.py" ``` ### Processing messages from Kinesis @@ -376,9 +388,15 @@ Processing batches from Kinesis works in four stages: ???+ info This code example optionally uses Tracer and Logger for completion. -=== "As a decorator" +=== "Recommended" - ```python hl_lines="4-5 9 15 22 24" + ```python hl_lines="2 7 12 18 28" + --8<-- "examples/batch_processing/src/getting_started_kinesis.py" + ``` + +=== "As a context manager" + + ```python hl_lines="4-5 9 15 23-25 27" import json from aws_lambda_powertools import Logger, Tracer @@ -400,16 +418,17 @@ Processing batches from Kinesis works in four stages: @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + return processor.response() ``` -=== "As a context manager" - - ```python hl_lines="4-5 9 15 23-25 27" - import json +=== "As a decorator (legacy)" + ```python hl_lines="2-3 7 20 22" from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord @@ -429,11 +448,8 @@ Processing batches from Kinesis works in four stages: @logger.inject_lambda_context @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - return processor.response() ``` @@ -504,9 +520,15 @@ Processing batches from Kinesis works in four stages: ???+ info This code example optionally uses Tracer and Logger for completion. -=== "As a decorator" +=== "Recommended" + + ```python hl_lines="4 9 14 20 30" + --8<-- "examples/batch_processing/src/getting_started_dynamodb.py" + ``` + +=== "As a context manager" - ```python hl_lines="4-5 9 15 25 27" + ```python hl_lines="4-5 9 15 23-27" import json from aws_lambda_powertools import Logger, Tracer @@ -524,21 +546,21 @@ Processing batches from Kinesis works in four stages: def record_handler(record: DynamoDBRecord): logger.info(record.dynamodb.new_image) payload: dict = json.loads(record.dynamodb.new_image.get("Message")) - # alternatively: - # changes: Dict[str, Any] = record.dynamodb.new_image - # payload = change.get("Message").raw_event -> {"S": ""} ... @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, handler=record_handler): + processed_messages = processor.process() # kick off processing, return list[tuple] + return processor.response() ``` -=== "As a context manager" +=== "As a decorator (legacy)" - ```python hl_lines="4-5 9 15 26-28 30" + ```python hl_lines="4-5 9 15 22 24" import json from aws_lambda_powertools import Logger, Tracer @@ -555,19 +577,13 @@ Processing batches from Kinesis works in four stages: @tracer.capture_method def record_handler(record: DynamoDBRecord): logger.info(record.dynamodb.new_image) - payload: dict = json.loads(record.dynamodb.new_image.get("item")) - # alternatively: - # changes: Dict[str, Any] = record.dynamodb.new_image - # payload = change.get("Message") -> "" + payload: dict = json.loads(record.dynamodb.new_image.get("Message")) ... @logger.inject_lambda_context @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - batch = event["Records"] - with processor(records=batch, handler=record_handler): - processed_messages = processor.process() # kick off processing, return list[tuple] - return processor.response() ``` @@ -666,8 +682,8 @@ You can use `AsyncBatchProcessor` class and `async_batch_processor` decorator to The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order). -```python hl_lines="4 6 11 14 23" title="High-concurrency with AsyncBatchProcessor" ---8<-- "examples/batch_processing/src/getting_started_async_batch_processor.py" +```python hl_lines="3 11 14 24" title="High-concurrency with AsyncBatchProcessor" +--8<-- "examples/batch_processing/src/getting_started_async.py" ``` ???+ warning "Using tracer?" @@ -685,13 +701,14 @@ Inheritance is importance because we need to access message IDs and sequence num === "SQS" - ```python hl_lines="5 9-10 12-19 21 27" + ```python hl_lines="5 13 22 28" import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response from aws_lambda_powertools.utilities.parser.models import SqsRecordModel from aws_lambda_powertools.utilities.typing import LambdaContext + from aws_lambda_powertools.utilities.parser import validator, BaseModel class Order(BaseModel): @@ -717,25 +734,26 @@ Inheritance is importance because we need to access message IDs and sequence num @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` === "Kinesis Data Streams" - ```python hl_lines="5 9-10 12-20 22-23 26 32" + ```python hl_lines="5 14 25 29 35" import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response + from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecordPayload, KinesisDataStreamRecord + from aws_lambda_powertools.utilities.parser import BaseModel, validator from aws_lambda_powertools.utilities.typing import LambdaContext class Order(BaseModel): item: dict + class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): data: Order @@ -743,10 +761,11 @@ Inheritance is importance because we need to access message IDs and sequence num # so Pydantic can auto-initialize nested Order model @validator("data", pre=True) def transform_message_to_dict(cls, value: str): - # Powertools KinesisDataStreamRecordModel already decodes b64 to str here + # Powertools KinesisDataStreamRecord already decodes b64 to str here return json.loads(value) - class OrderKinesisRecord(KinesisDataStreamRecordModel): + + class OrderKinesisRecord(KinesisDataStreamRecord): kinesis: OrderKinesisPayloadRecord @@ -762,27 +781,28 @@ Inheritance is importance because we need to access message IDs and sequence num @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` === "DynamoDB Streams" - ```python hl_lines="7 11-12 14-21 23-25 27-28 31 37" + ```python hl_lines="7 16 26 31 35 41" import json - from typing import Dict, Literal + from typing import Dict, Literal, Optional from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor - from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response + from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel from aws_lambda_powertools.utilities.typing import LambdaContext + from aws_lambda_powertools.utilities.parser import BaseModel, validator class Order(BaseModel): item: dict + class OrderDynamoDB(BaseModel): Message: Order @@ -792,15 +812,17 @@ Inheritance is importance because we need to access message IDs and sequence num def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): return json.loads(value["S"]) + class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): NewImage: Optional[OrderDynamoDB] OldImage: Optional[OrderDynamoDB] + class OrderDynamoDBRecord(DynamoDBStreamRecordModel): dynamodb: OrderDynamoDBChangeRecord - processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderKinesisRecord) + processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) tracer = Tracer() logger = Logger() @@ -812,9 +834,8 @@ Inheritance is importance because we need to access message IDs and sequence num @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` ### Accessing processed messages @@ -873,7 +894,13 @@ Within your `record_handler` function, you might need access to the Lambda conte We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/python-context.html){target="_blank"} into your `record_handler` if your function signature has a parameter named `lambda_context`. When using a context manager, you also need to pass the Lambda context object like in the example below. -=== "As a decorator" +=== "Recommended" + + ```python hl_lines="19" + --8<-- "examples/batch_processing/src/advanced_accessing_lambda_context.py" + ``` + +=== "As a decorator (legacy)" ```python hl_lines="15" from typing import Optional @@ -952,7 +979,7 @@ from typing import Tuple from aws_lambda_powertools import Metrics from aws_lambda_powertools.metrics import MetricUnit -from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, ExceptionInfo, EventType, FailureResponse +from aws_lambda_powertools.utilities.batch import BatchProcessor, ExceptionInfo, EventType, FailureResponse, process_partial_response from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord @@ -973,9 +1000,8 @@ def record_handler(record: SQSRecord): ... @metrics.log_metrics(capture_cold_start_metric=True) -@batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` ### Create your own partial processor @@ -1157,7 +1183,7 @@ Given a SQS batch where the first batch record succeeds and the second fails pro import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -1176,9 +1202,8 @@ Given a SQS batch where the first batch record succeeds and the second fails pro @logger.inject_lambda_context @tracer.capture_lambda_handler - @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) ``` === "Sample SQS event" diff --git a/examples/batch_processing/src/advanced_accessing_lambda_context.py b/examples/batch_processing/src/advanced_accessing_lambda_context.py new file mode 100644 index 00000000000..96d95ca5445 --- /dev/null +++ b/examples/batch_processing/src/advanced_accessing_lambda_context.py @@ -0,0 +1,30 @@ +import json +from typing import Optional + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord, lambda_context: Optional[LambdaContext] = None): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/getting_started_async_batch_processor.py b/examples/batch_processing/src/getting_started_async.py similarity index 79% rename from examples/batch_processing/src/getting_started_async_batch_processor.py rename to examples/batch_processing/src/getting_started_async.py index 594be0540f3..304c01795bb 100644 --- a/examples/batch_processing/src/getting_started_async_batch_processor.py +++ b/examples/batch_processing/src/getting_started_async.py @@ -3,7 +3,7 @@ from aws_lambda_powertools.utilities.batch import ( AsyncBatchProcessor, EventType, - async_batch_processor, + async_process_partial_response, ) from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -20,6 +20,7 @@ async def async_record_handler(record: SQSRecord): return ret.status_code -@async_batch_processor(record_handler=async_record_handler, processor=processor) def lambda_handler(event, context: LambdaContext): - return processor.response() + return async_process_partial_response( + event=event, record_handler=async_record_handler, processor=processor, context=context + ) diff --git a/examples/batch_processing/src/getting_started_dynamodb.py b/examples/batch_processing/src/getting_started_dynamodb.py new file mode 100644 index 00000000000..cb877410fef --- /dev/null +++ b/examples/batch_processing/src/getting_started_dynamodb.py @@ -0,0 +1,30 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import ( + DynamoDBRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.DynamoDBStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: DynamoDBRecord): + logger.info(record.dynamodb.new_image) + payload: dict = json.loads(record.dynamodb.new_image.get("Message")) + logger.info(payload) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/getting_started_kinesis.py b/examples/batch_processing/src/getting_started_kinesis.py new file mode 100644 index 00000000000..e58222733e1 --- /dev/null +++ b/examples/batch_processing/src/getting_started_kinesis.py @@ -0,0 +1,28 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import ( + KinesisStreamRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.KinesisDataStreams) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: KinesisStreamRecord): + logger.info(record.kinesis.data_as_text) + payload: dict = record.kinesis.data_as_json() + logger.info(payload) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/getting_started_sqs.py b/examples/batch_processing/src/getting_started_sqs.py new file mode 100644 index 00000000000..15f8701f297 --- /dev/null +++ b/examples/batch_processing/src/getting_started_sqs.py @@ -0,0 +1,29 @@ +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + logger.info(item) + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/getting_started_sqs_fifo.py b/examples/batch_processing/src/getting_started_sqs_fifo.py new file mode 100644 index 00000000000..d39f8ba63f1 --- /dev/null +++ b/examples/batch_processing/src/getting_started_sqs_fifo.py @@ -0,0 +1,22 @@ +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import ( + SqsFifoPartialProcessor, + process_partial_response, +) +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + +processor = SqsFifoPartialProcessor() +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method +def record_handler(record: SQSRecord): + ... + + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +def lambda_handler(event, context: LambdaContext): + return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context) diff --git a/examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py b/examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py similarity index 100% rename from examples/batch_processing/src/sqs_fifo_batch_processor_context_manager.py rename to examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py diff --git a/examples/batch_processing/src/sqs_fifo_batch_processor.py b/examples/batch_processing/src/getting_started_sqs_fifo_decorator.py similarity index 100% rename from examples/batch_processing/src/sqs_fifo_batch_processor.py rename to examples/batch_processing/src/getting_started_sqs_fifo_decorator.py From 45fa58cb8f3fb0f645cc03beb891d01105877b51 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 6 Apr 2023 20:11:43 +0200 Subject: [PATCH 07/10] docs: mypy ignore for fictious sample --- examples/batch_processing/src/getting_started_dynamodb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/batch_processing/src/getting_started_dynamodb.py b/examples/batch_processing/src/getting_started_dynamodb.py index cb877410fef..60d8ed89f0e 100644 --- a/examples/batch_processing/src/getting_started_dynamodb.py +++ b/examples/batch_processing/src/getting_started_dynamodb.py @@ -18,8 +18,8 @@ @tracer.capture_method def record_handler(record: DynamoDBRecord): - logger.info(record.dynamodb.new_image) - payload: dict = json.loads(record.dynamodb.new_image.get("Message")) + logger.info(record.dynamodb.new_image) # type: ignore[union-attr] + payload: dict = json.loads(record.dynamodb.new_image.get("Message")) # type: ignore[union-attr,arg-type] logger.info(payload) ... From 7ee6ff2f2cf6b9bb8d61396e73a2687f309d57a0 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 14:26:32 +0200 Subject: [PATCH 08/10] docs: update call outs to batch_processor and async_batch_processor --- docs/utilities/batch.md | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 780baa4b081..a6afe1b004a 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -219,12 +219,11 @@ The remaining sections of the documentation will rely on these samples. For comp ### Processing messages from SQS -Processing batches from SQS works in four stages: +Processing batches from SQS works in three stages: 1. Instantiate **`BatchProcessor`** and choose **`EventType.SQS`** for the event type 2. Define your function to handle each batch record, and use [`SQSRecord`](data_classes.md#sqs){target="_blank"} type annotation for autocompletion -3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing -4. Return the appropriate response contract to Lambda via **`.response()`** processor method +3. Use **`process_partial_response`** to kick off processing ???+ info This code example optionally uses Tracer and Logger for completion. @@ -241,7 +240,7 @@ Processing batches from SQS works in four stages: import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -378,12 +377,11 @@ This helps preserve the ordering of messages in your queue. ### Processing messages from Kinesis -Processing batches from Kinesis works in four stages: +Processing batches from Kinesis works in three stages: 1. Instantiate **`BatchProcessor`** and choose **`EventType.KinesisDataStreams`** for the event type 2. Define your function to handle each batch record, and use [`KinesisStreamRecord`](data_classes.md#kinesis-streams){target="_blank"} type annotation for autocompletion -3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing -4. Return the appropriate response contract to Lambda via **`.response()`** processor method +3. Use **`process_partial_response`** to kick off processing ???+ info This code example optionally uses Tracer and Logger for completion. @@ -400,7 +398,7 @@ Processing batches from Kinesis works in four stages: import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -510,12 +508,11 @@ Processing batches from Kinesis works in four stages: ### Processing messages from DynamoDB -Processing batches from Kinesis works in four stages: +Processing batches from Kinesis works in three stages: 1. Instantiate **`BatchProcessor`** and choose **`EventType.DynamoDBStreams`** for the event type 2. Define your function to handle each batch record, and use [`DynamoDBRecord`](data_classes.md#dynamodb-streams){target="_blank"} type annotation for autocompletion -3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing -4. Return the appropriate response contract to Lambda via **`.response()`** processor method +3. Use **`process_partial_response`** to kick off processing ???+ info This code example optionally uses Tracer and Logger for completion. @@ -532,7 +529,7 @@ Processing batches from Kinesis works in four stages: import json from aws_lambda_powertools import Logger, Tracer - from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord from aws_lambda_powertools.utilities.typing import LambdaContext @@ -673,7 +670,7 @@ All records in the batch will be passed to this handler for processing, even if !!! tip "New to AsyncIO? Read this [comprehensive guide first](https://realpython.com/async-io-python/){target="_blank"}." -You can use `AsyncBatchProcessor` class and `async_batch_processor` decorator to process messages concurrently. +You can use `AsyncBatchProcessor` class and `async_process_partial_response` function to process messages concurrently. ???+ question "When is this useful?" Your use case might be able to process multiple records at the same time without conflicting with one another. @@ -845,7 +842,7 @@ Use the context manager to access a list of all returned values from your `recor * **When successful**. We will include a tuple with `success`, the result of `record_handler`, and the batch record * **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record -```python hl_lines="31-38" title="Accessing processed messages via context manager" +```python hl_lines="30-36" title="Accessing processed messages via context manager" import json from typing import Any, List, Literal, Union @@ -854,8 +851,7 @@ from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType, FailureResponse, - SuccessResponse, - batch_processor) + SuccessResponse) from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext From efd7665f2846290add87322cea9b1a36506c20aa Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 14:29:14 +0200 Subject: [PATCH 09/10] docs: remove duplicate banner on processed messages --- docs/utilities/batch.md | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index a6afe1b004a..90a2bc7edc7 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -661,11 +661,6 @@ All records in the batch will be passed to this handler for processing, even if * **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing * **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing -???+ warning - You will not have access to the **processed messages** within the Lambda Handler; use context manager for that. - - All processing logic will and should be performed by the `record_handler` function. - ### Processing messages asynchronously !!! tip "New to AsyncIO? Read this [comprehensive guide first](https://realpython.com/async-io-python/){target="_blank"}." From 120aab921546f609d45f3af3badae5725ee4502f Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Fri, 7 Apr 2023 14:33:43 +0200 Subject: [PATCH 10/10] docs: add FAQ for batch_processor and async_batch_processor decorators --- docs/utilities/batch.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 90a2bc7edc7..47cc2147978 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -1244,6 +1244,12 @@ Given a SQS batch where the first batch record succeeds and the second fails pro Use context manager when you want access to the processed messages or handle `BatchProcessingError` exception when all records within the batch fail to be processed. +### What's the difference between the decorator and process_partial_response functions? + +`batch_processor` and `async_batch_processor` decorators are now considered legacy. Historically, they were kept due to backwards compatibility and to minimize code changes between V1 and V2. + +As 2.12.0, `process_partial_response` and `async_process_partial_response` are the recommended instead. It reduces boilerplate, smaller memory/CPU cycles, and it makes it less error prone - e.g., decorators required an additional return. + ### Integrating exception handling with Sentry.io When using Sentry.io for error monitoring, you can override `failure_handler` to capture each processing exception with Sentry SDK: