@@ -30,8 +30,8 @@ class EventType(Enum):
30
30
# type specifics
31
31
#
32
32
has_pydantic = "pydantic" in sys .modules
33
- _ExcInfo = Tuple [Type [BaseException ], BaseException , TracebackType ]
34
- _OptExcInfo = Union [_ExcInfo , Tuple [None , None , None ]]
33
+ ExceptionInfo = Tuple [Type [BaseException ], BaseException , TracebackType ]
34
+ OptExcInfo = Union [ExceptionInfo , Tuple [None , None , None ]]
35
35
36
36
# For IntelliSense and Mypy to work, we need to account for possible SQS, Kinesis and DynamoDB subclasses
37
37
# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation
@@ -114,24 +114,38 @@ def __call__(self, records: List[dict], handler: Callable):
114
114
115
115
def success_handler (self , record , result : Any ) -> SuccessResponse :
116
116
"""
117
- Success callback
117
+ Keeps track of batch records that were processed successfully
118
+
119
+ Parameters
120
+ ----------
121
+ record: Any
122
+ record that failed processing
123
+ result: Any
124
+ result from record handler
118
125
119
126
Returns
120
127
-------
121
- tuple
128
+ SuccessResponse
122
129
"success", result, original record
123
130
"""
124
131
entry = ("success" , result , record )
125
132
self .success_messages .append (record )
126
133
return entry
127
134
128
- def failure_handler (self , record , exception : _OptExcInfo ) -> FailureResponse :
135
+ def failure_handler (self , record , exception : OptExcInfo ) -> FailureResponse :
129
136
"""
130
- Failure callback
137
+ Keeps track of batch records that failed processing
138
+
139
+ Parameters
140
+ ----------
141
+ record: Any
142
+ record that failed processing
143
+ exception: OptExcInfo
144
+ Exception information containing type, value, and traceback (sys.exc_info())
131
145
132
146
Returns
133
147
-------
134
- tuple
148
+ FailureResponse
135
149
"fail", exceptions args, original record
136
150
"""
137
151
exception_string = f"{ exception [0 ]} :{ exception [1 ]} "
@@ -189,6 +203,114 @@ def batch_processor(
189
203
190
204
191
205
class BatchProcessor (BasePartialProcessor ):
206
+ """Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB.
207
+
208
+
209
+ Example
210
+ -------
211
+
212
+ ## Process batch triggered by SQS
213
+
214
+ ```python
215
+ import json
216
+
217
+ from aws_lambda_powertools import Logger, Tracer
218
+ from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
219
+ from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
220
+ from aws_lambda_powertools.utilities.typing import LambdaContext
221
+
222
+
223
+ processor = BatchProcessor(event_type=EventType.SQS)
224
+ tracer = Tracer()
225
+ logger = Logger()
226
+
227
+
228
+ @tracer.capture_method
229
+ def record_handler(record: SQSRecord):
230
+ payload: str = record.body
231
+ if payload:
232
+ item: dict = json.loads(payload)
233
+ ...
234
+
235
+ @logger.inject_lambda_context
236
+ @tracer.capture_lambda_handler
237
+ @batch_processor(record_handler=record_handler, processor=processor)
238
+ def lambda_handler(event, context: LambdaContext):
239
+ return processor.response()
240
+ ```
241
+
242
+ ## Process batch triggered by Kinesis Data Streams
243
+
244
+ ```python
245
+ import json
246
+
247
+ from aws_lambda_powertools import Logger, Tracer
248
+ from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
249
+ from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
250
+ from aws_lambda_powertools.utilities.typing import LambdaContext
251
+
252
+
253
+ processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
254
+ tracer = Tracer()
255
+ logger = Logger()
256
+
257
+
258
+ @tracer.capture_method
259
+ def record_handler(record: KinesisStreamRecord):
260
+ logger.info(record.kinesis.data_as_text)
261
+ payload: dict = record.kinesis.data_as_json()
262
+ ...
263
+
264
+ @logger.inject_lambda_context
265
+ @tracer.capture_lambda_handler
266
+ @batch_processor(record_handler=record_handler, processor=processor)
267
+ def lambda_handler(event, context: LambdaContext):
268
+ return processor.response()
269
+ ```
270
+
271
+
272
+ ## Process batch triggered by DynamoDB Data Streams
273
+
274
+ ```python
275
+ import json
276
+
277
+ from aws_lambda_powertools import Logger, Tracer
278
+ from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
279
+ from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
280
+ from aws_lambda_powertools.utilities.typing import LambdaContext
281
+
282
+
283
+ processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
284
+ tracer = Tracer()
285
+ logger = Logger()
286
+
287
+
288
+ @tracer.capture_method
289
+ def record_handler(record: DynamoDBRecord):
290
+ logger.info(record.dynamodb.new_image)
291
+ payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value)
292
+ # alternatively:
293
+ # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image # noqa: E800
294
+ # payload = change.get("Message").raw_event -> {"S": "<payload>"}
295
+ ...
296
+
297
+ @logger.inject_lambda_context
298
+ @tracer.capture_lambda_handler
299
+ def lambda_handler(event, context: LambdaContext):
300
+ batch = event["Records"]
301
+ with processor(records=batch, processor=processor):
302
+ processed_messages = processor.process() # kick off processing, return list[tuple]
303
+
304
+ return processor.response()
305
+ ```
306
+
307
+
308
+ Raises
309
+ ------
310
+ BatchProcessingError
311
+ When all batch records fail processing
312
+ """
313
+
192
314
DEFAULT_RESPONSE : Dict [str , List [Optional [dict ]]] = {"batchItemFailures" : []}
193
315
194
316
def __init__ (self , event_type : EventType , model : Optional ["BatchTypeModels" ] = None ):
@@ -232,7 +354,7 @@ def _prepare(self):
232
354
"""
233
355
self .success_messages .clear ()
234
356
self .fail_messages .clear ()
235
- self .batch_response = self .DEFAULT_RESPONSE
357
+ self .batch_response = copy . deepcopy ( self .DEFAULT_RESPONSE )
236
358
237
359
def _process_record (self , record : dict ) -> Union [SuccessResponse , FailureResponse ]:
238
360
"""
0 commit comments