-
Notifications
You must be signed in to change notification settings - Fork 436
feat(event_source): add Kinesis Firehose Data Transformation data class #3029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
leandrodamascena
merged 52 commits into
aws-powertools:develop
from
roger-zhangg:kinesis
Sep 14, 2023
Merged
Changes from 2 commits
Commits
Show all changes
52 commits
Select commit
Hold shift + click to select a range
040358d
support kinesis response
roger-zhangg 6eb8530
Merge branch 'develop' into kinesis
leandrodamascena 7ca016e
fix lint, address Leandro suggestions
roger-zhangg 785a144
Merge branch 'develop' of https://github.com/aws-powertools/powertool…
roger-zhangg 708686f
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg 8c9db32
remove deleted const
roger-zhangg f02319f
fix Literal import in 3.7
roger-zhangg 5f55aa7
change to use data-classes
roger-zhangg 2566f62
fix mypy
roger-zhangg 636e9d1
fix typo, make asdict a function
roger-zhangg d3114c4
Merge branch 'develop' into kinesis
leandrodamascena 3fa7b2d
Merge branch 'develop' into kinesis
leandrodamascena 370e156
address Troy/Leandro suggestions
roger-zhangg 99837fc
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg 029a55c
Merge branch 'develop' into kinesis
leandrodamascena af1abfe
remove 6MB comment
roger-zhangg 2032146
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg 4168e21
Merge branch 'develop' into kinesis
leandrodamascena 312830b
fix comments
roger-zhangg a6c05a5
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg a3ed9f9
address Heitor's suggestion
roger-zhangg bfbee60
data class default optimization
roger-zhangg 4016446
remove slot for static check
roger-zhangg 5dbb3ff
fix doc, example
roger-zhangg 5b8a9c6
Merge branch 'develop' into kinesis
leandrodamascena 95b3958
Merge branch 'develop' into kinesis
leandrodamascena e4d75d7
rename r->record
roger-zhangg f5ca27d
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg 46fbe98
Merge branch 'develop' into kinesis
leandrodamascena ddd49d2
Merge branch 'develop' into kinesis
leandrodamascena ce6ed61
Addressing Heitor's feedback
leandrodamascena d8be53e
Addressing Heitor's feedback
leandrodamascena 3a11563
Addressing Heitor's feedback
leandrodamascena 17c6763
add result warning, add asdict test, metadata test
roger-zhangg 00051a8
Merge branch 'develop' into kinesis
roger-zhangg 455402a
Merge branch 'develop' into kinesis
leandrodamascena cad6c09
refactor: initial refactoring
heitorlessa 6809e0e
chore: branding
heitorlessa 42a0de9
refactor: use classvar and tuple for perf
heitorlessa d05ca07
chore: fix rebase issue
heitorlessa b20137a
chore: fix mypy tuple exactness type
heitorlessa ef12496
remove Ok in example response,add failure example
roger-zhangg f2d5e63
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg 3e43a25
chore: clean up docs example
heitorlessa 4ed0e66
chore: lower cognitive overhead; add example docstring
heitorlessa d1fc1c5
add drop example
roger-zhangg eb39c03
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg c039480
docs: give info upfront, name examples
heitorlessa e11c718
docs: improve transforming records example
heitorlessa 745492c
docs: improve dropping records example
heitorlessa 61ddee9
docs: improve exception example
heitorlessa 6f95e8f
Merge branch 'develop' into kinesis
leandrodamascena File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
125 changes: 125 additions & 0 deletions
125
aws_lambda_powertools/utilities/data_classes/kinesis_firehose_response.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
from __future__ import annotations | ||
|
||
import base64 | ||
from typing import Callable, Iterator, List, Optional, Union | ||
roger-zhangg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper | ||
|
||
FirehoseStateOk = "Ok" | ||
FirehoseStateDropped = "Dropped" | ||
FirehoseStateFailed = "ProcessingFailed" | ||
roger-zhangg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class KinesisFirehoseResponseRecordMetadata(DictWrapper): | ||
""" | ||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html | ||
""" | ||
|
||
@property | ||
def _metadata(self) -> Optional[dict]: | ||
roger-zhangg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Optional: metadata associated with this record; present only when Kinesis Stream is source""" | ||
return self["metadata"] # could raise KeyError | ||
|
||
@property | ||
def partition_keys(self) -> Optional[dict[str, str]]: | ||
roger-zhangg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Kinesis stream partition key; present only when Kinesis Stream is source""" | ||
return self._metadata["partitionKeys"] | ||
|
||
|
||
def KinesisFirehoseResponseRecordMetadataFactory( | ||
partition_keys: dict[str, str], | ||
roger-zhangg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
json_deserializer: Optional[Callable] = None, | ||
) -> KinesisFirehoseResponseRecordMetadata: | ||
data = { | ||
"metadata": { | ||
"partitionKeys": partition_keys, | ||
}, | ||
} | ||
return KinesisFirehoseResponseRecordMetadata(data=data, json_deserializer=json_deserializer) | ||
|
||
|
||
class KinesisFirehoseResponceRecord(DictWrapper): | ||
"""Record in Kinesis Data Firehose event | ||
|
||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html | ||
""" | ||
|
||
@property | ||
def record_id(self) -> str: | ||
"""Record ID; uniquely identifies this record within the current batch""" | ||
return self["recordId"] | ||
|
||
@property | ||
def result(self) -> Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed]: | ||
roger-zhangg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""processing result, supported value: Ok, Dropped, ProcessingFailed""" | ||
return self["result"] | ||
|
||
@property | ||
def data(self) -> str: | ||
"""The data blob, base64-encoded""" | ||
return self["data"] | ||
|
||
@property | ||
def metadata(self) -> Optional[KinesisFirehoseResponseRecordMetadata]: | ||
"""Optional: metadata associated with this record; present only when Kinesis Stream is source""" | ||
return KinesisFirehoseResponseRecordMetadata(self._data) if self.get("metadata") else None | ||
|
||
@property | ||
def data_as_bytes(self) -> bytes: | ||
"""Decoded base64-encoded data as bytes""" | ||
return base64.b64decode(self.data) | ||
|
||
@property | ||
def data_as_text(self) -> str: | ||
"""Decoded base64-encoded data as text""" | ||
return self.data_as_bytes.decode("utf-8") | ||
|
||
@property | ||
def data_as_json(self) -> dict: | ||
roger-zhangg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Decoded base64-encoded data loaded to json""" | ||
if self._json_data is None: | ||
self._json_data = self._json_deserializer(self.data_as_text) | ||
return self._json_data | ||
|
||
|
||
def KinesisFirehoseResponceRecordFactory( | ||
record_id: str, | ||
result: Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed], | ||
roger-zhangg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
data: str, | ||
metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None, | ||
json_deserializer: Optional[Callable] = None, | ||
) -> KinesisFirehoseResponceRecord: | ||
pass_data = { | ||
"recordId": record_id, | ||
"result": result, | ||
"data": base64.b64encode(data.encode("utf-8")).decode("utf-8"), | ||
} | ||
if metadata: | ||
data["metadata"] = metadata | ||
return KinesisFirehoseResponceRecord(data=pass_data, json_deserializer=json_deserializer) | ||
|
||
|
||
class KinesisFirehoseResponce(DictWrapper): | ||
"""Kinesis Data Firehose event | ||
|
||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html | ||
""" | ||
|
||
@property | ||
def records(self) -> Iterator[KinesisFirehoseResponceRecord]: | ||
for record in self["records"]: | ||
yield KinesisFirehoseResponceRecord(data=record, json_deserializer=self._json_deserializer) | ||
|
||
|
||
def KinesisFirehoseResponceFactory( | ||
records: List[KinesisFirehoseResponceRecord], | ||
json_deserializer: Optional[Callable] = None, | ||
) -> KinesisFirehoseResponce: | ||
pass_data = {"records": records} | ||
return KinesisFirehoseResponce(data=pass_data, json_deserializer=json_deserializer) |
20 changes: 11 additions & 9 deletions
20
examples/event_sources/src/kinesis_firehose_delivery_stream.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,30 @@ | ||
import base64 | ||
import json | ||
|
||
from aws_lambda_powertools.utilities.data_classes import ( | ||
FirehoseStateOk, | ||
KinesisFirehoseEvent, | ||
KinesisFirehoseResponce, | ||
KinesisFirehoseResponceRecordFactory, | ||
event_source, | ||
) | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
|
||
@event_source(data_class=KinesisFirehoseEvent) | ||
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): | ||
result = [] | ||
result = KinesisFirehoseResponce({}) | ||
|
||
for record in event.records: | ||
# if data was delivered as json; caches loaded value | ||
data = record.data_as_json | ||
|
||
processed_record = { | ||
"recordId": record.record_id, | ||
"data": base64.b64encode(json.dumps(data).encode("utf-8")), | ||
"result": "Ok", | ||
} | ||
processed_record = KinesisFirehoseResponceRecordFactory( | ||
record_id=record.record_id, | ||
result=FirehoseStateOk, | ||
data=(json.dumps(data)), | ||
) | ||
|
||
result.append(processed_record) | ||
result.add_record(processed_record) | ||
|
||
# return transformed records | ||
return {"records": result} | ||
return result |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from aws_lambda_powertools.utilities.data_classes import ( | ||
FirehoseStateOk, | ||
KinesisFirehoseEvent, | ||
KinesisFirehoseResponceFactory, | ||
KinesisFirehoseResponceRecordFactory, | ||
) | ||
from tests.functional.utils import load_event | ||
|
||
|
||
def test_kinesis_firehose_response(): | ||
leandrodamascena marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raw_event = load_event("kinesisFirehoseKinesisEvent.json") | ||
parsed_event = KinesisFirehoseEvent(raw_event) | ||
|
||
result = [] | ||
for record in parsed_event.records: | ||
# if data was delivered as json; caches loaded value | ||
data = record.data_as_text | ||
|
||
processed_record = KinesisFirehoseResponceRecordFactory( | ||
record_id=record.record_id, | ||
result=FirehoseStateOk, | ||
data=(data), | ||
) | ||
|
||
result.append(processed_record) | ||
response = KinesisFirehoseResponceFactory(result) | ||
|
||
res_records = list(response.records) | ||
assert len(res_records) == 2 | ||
record_01, record_02 = res_records[:] | ||
record01_raw = raw_event["records"][0] | ||
assert record_01.result == FirehoseStateOk | ||
assert record_01.record_id == record01_raw["recordId"] | ||
assert record_01.data_as_bytes == b"Hello World" | ||
assert record_01.data_as_text == "Hello World" | ||
assert record_01.data == record01_raw["data"] |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.