diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 2e3f5f1e88d..89cd3003b77 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -132,75 +132,6 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo DynamoDB Persistency layer uses a Resource client [which is not thread-safe](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-or-multiprocessing-with-resources){target="_blank"}. -=== "batch_sample.py" - - This example also demonstrates how you can integrate with [Batch utility](batch.md), so you can process each record in an idempotent manner. - - ```python hl_lines="4-5 16 21 30" - from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType, - batch_processor) - from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord - from aws_lambda_powertools.utilities.idempotency import ( - DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function) - - - processor = BatchProcessor(event_type=EventType.SQS) - dynamodb = DynamoDBPersistenceLayer(table_name="idem") - config = IdempotencyConfig( - event_key_jmespath="messageId", # see Choosing a payload subset section - use_local_cache=True, - ) - - - @idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb) - def record_handler(record: SQSRecord): - return {"message": record["body"]} - - - @idempotent_function(data_keyword_argument="data", config=config, persistence_store=dynamodb) - def dummy(arg_one, arg_two, data: dict, **kwargs): - return {"data": data} - - - @batch_processor(record_handler=record_handler, processor=processor) - def lambda_handler(event, context): - config.register_lambda_context(context) # see Lambda timeouts section - # `data` parameter must be called as a keyword argument to work - dummy("hello", "universe", data="test") - return processor.response() - ``` - -=== "Batch event" - - ```json hl_lines="4" - { - "Records": [ - { - "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", - "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", - "body": "Test message.", - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1545082649183", - "SenderId": "AIDAIENQZJOLO23YVJ4VO", - "ApproximateFirstReceiveTimestamp": "1545082649185" - }, - "messageAttributes": { - "testAttr": { - "stringValue": "100", - "binaryValue": "base64Str", - "dataType": "Number" - } - }, - "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", - "awsRegion": "us-east-2" - } - ] - } - ``` - === "dataclass_sample.py" ```python hl_lines="3-4 23 33" @@ -276,6 +207,82 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo process_order(order=order) ``` +#### Batch integration + +You can can easily integrate with [Batch utility](batch.md) via context manager. This ensures that you process each record in an idempotent manner, and guard against a [Lambda timeout](#lambda-timeouts) idempotent situation. + +???+ "Choosing an unique batch record attribute" + In this example, we choose `messageId` as our idempotency token since we know it'll be unique. + + Depending on your use case, it might be more accurate [to choose another field](#choosing-a-payload-subset-for-idempotency) your producer intentionally set to define uniqueness. + +=== "batch_sample.py" + + ```python hl_lines="3-4 10 15 21 25-26 29 31" + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + from aws_lambda_powertools.utilities.idempotency import ( + DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function) + + + processor = BatchProcessor(event_type=EventType.SQS) + dynamodb = DynamoDBPersistenceLayer(table_name="idem") + config = IdempotencyConfig( + event_key_jmespath="messageId", # see Choosing a payload subset section + use_local_cache=True, + ) + + + @idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb) + def record_handler(record: SQSRecord): + return {"message": record.body} + + + def lambda_handler(event, context): + config.register_lambda_context(context) # see Lambda timeouts section + + # with Lambda context registered for Idempotency + # we can now kick in the Bach processing logic + batch = event["Records"] + with processor(records=batch, handler=record_handler): + # in case you want to access each record processed by your record_handler + # otherwise ignore the result variable assignment + processed_messages = processor.process() + + return processor.response() + ``` + +=== "batch_event.json" + + ```json hl_lines="4" + { + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": { + "testAttr": { + "stringValue": "100", + "binaryValue": "base64Str", + "dataType": "Number" + } + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } + ] + } + ``` + ### Choosing a payload subset for idempotency ???+ tip "Tip: Dealing with always changing payloads" @@ -982,6 +989,10 @@ class DynamoDBPersistenceLayer(BasePersistenceLayer): ## Compatibility with other utilities +### Batch + +See [Batch integration](#batch-integration) above. + ### Validation utility The idempotency utility can be used with the `validator` decorator. Ensure that idempotency is the innermost decorator.