Skip to content

feat: lambda support for DSM #622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 43 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
18bcf3c
removed current dsm implementation
michael-zhao459 Jun 19, 2025
af174d4
new dsm lambda implementation
michael-zhao459 Jun 19, 2025
eeea762
check env variable in proper way
michael-zhao459 Jun 19, 2025
1c78072
add tests
michael-zhao459 Jun 19, 2025
7a39290
add more detailed comment to function
michael-zhao459 Jun 20, 2025
e7d8a72
fixed lint for tests
michael-zhao459 Jun 20, 2025
6e7e5eb
remove not needed tests
michael-zhao459 Jun 20, 2025
4733bb0
fix
michael-zhao459 Jun 20, 2025
fd395e3
error handling
michael-zhao459 Jun 20, 2025
9835573
fix
michael-zhao459 Jun 20, 2025
3bbe48d
some fixes
michael-zhao459 Jun 20, 2025
f533c09
renamed to extract_context_with_datastreams
michael-zhao459 Jun 20, 2025
86cf5ff
changed to explicit check of dsm context
michael-zhao459 Jun 20, 2025
3954714
move dsm tests to test_tracing.py
michael-zhao459 Jun 20, 2025
6f9d42c
add wanted tests
michael-zhao459 Jun 20, 2025
33cdc6c
caught sns -> sqs bug
michael-zhao459 Jun 20, 2025
3672f39
revert back to original tracing.py implementation
michael-zhao459 Jun 30, 2025
12deafc
fix lint
michael-zhao459 Jun 30, 2025
d8a4379
revert spacing stuff to original
michael-zhao459 Jun 30, 2025
b5af84f
remove unneccessary checks, still set checkpoints even when dsm conte…
michael-zhao459 Jun 30, 2025
3ffd16f
remove not needed comment
michael-zhao459 Jun 30, 2025
c84ea14
fixes
michael-zhao459 Jun 30, 2025
4ad2bab
lambda functions not allowed by lint
michael-zhao459 Jun 30, 2025
9996fbb
use lambda function, add checks before checkpoint
michael-zhao459 Jul 1, 2025
8b0313b
remove unneccesary import
michael-zhao459 Jul 1, 2025
cfbb1f8
move if statement with least work first
michael-zhao459 Jul 1, 2025
0b01978
changed function name to original, arn exception handle w test, retur…
michael-zhao459 Jul 1, 2025
9dca396
some fixes
michael-zhao459 Jul 2, 2025
d849b75
remove comments that are not needed
michael-zhao459 Jul 2, 2025
a375d81
fix
michael-zhao459 Jul 2, 2025
f465407
fix
michael-zhao459 Jul 2, 2025
ec5dfe5
extra check
michael-zhao459 Jul 2, 2025
8dfab67
Merge branch 'main' into michael.zhao/dsm-lambda
michael-zhao459 Jul 2, 2025
123e5a3
remove unneccesary work associated with event_source
michael-zhao459 Jul 3, 2025
e0497d0
fix lint
michael-zhao459 Jul 3, 2025
81237b3
add tests for empty arn logic
michael-zhao459 Jul 3, 2025
0463862
more descriptive name
michael-zhao459 Jul 3, 2025
1a9df0a
fix lint
michael-zhao459 Jul 3, 2025
b8ff3fb
fix
michael-zhao459 Jul 3, 2025
021c8f8
moved arn check to checkpoint, remove comments, add variable dec
michael-zhao459 Jul 3, 2025
ef98427
kinesis fix, tests fix
michael-zhao459 Jul 3, 2025
0f71f84
remove not needed test
michael-zhao459 Jul 3, 2025
c4fa49b
formatting fix
michael-zhao459 Jul 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 0 additions & 38 deletions datadog_lambda/dsm.py

This file was deleted.

50 changes: 44 additions & 6 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@
LOWER_64_BITS = "LOWER_64_BITS"


def _dsm_set_checkpoint(context_json, event_type, arn):
if not config.data_streams_enabled:
return

if not arn:
return

try:
from ddtrace.data_streams import set_consume_checkpoint

carrier_get = lambda k: context_json and context_json.get(k) # noqa: E731
set_consume_checkpoint(event_type, arn, carrier_get, manual_checkpoint=False)
except Exception as e:
logger.debug(
f"DSM:Failed to set consume checkpoint for {event_type} {arn}: {e}"
)


def _convert_xray_trace_id(xray_trace_id):
"""
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
Expand Down Expand Up @@ -202,7 +220,9 @@ def create_sns_event(message):
}


def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
def extract_context_from_sqs_or_sns_event_or_context(
event, lambda_context, event_source
):
"""
Extract Datadog trace context from an SQS event.

Expand All @@ -214,7 +234,10 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
Lambda Context.

Falls back to lambda context if no trace data is found in the SQS message attributes.
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
"""
source_arn = ""
event_type = "sqs" if event_source.equals(EventTypes.SQS) else "sns"

# EventBridge => SQS
try:
Expand All @@ -226,6 +249,7 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):

try:
first_record = event.get("Records")[0]
source_arn = first_record.get("eventSourceARN", "")

# logic to deal with SNS => SQS event
if "body" in first_record:
Expand All @@ -241,6 +265,9 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
msg_attributes = first_record.get("messageAttributes")
if msg_attributes is None:
sns_record = first_record.get("Sns") or {}
# SNS->SQS event would extract SNS arn without this check
if event_source.equals(EventTypes.SNS):
source_arn = sns_record.get("TopicArn", "")
msg_attributes = sns_record.get("MessageAttributes") or {}
dd_payload = msg_attributes.get("_datadog")
if dd_payload:
Expand Down Expand Up @@ -272,8 +299,9 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
logger.debug(
"Failed to extract Step Functions context from SQS/SNS event."
)

return propagator.extract(dd_data)
context = propagator.extract(dd_data)
_dsm_set_checkpoint(dd_data, event_type, source_arn)
return context
else:
# Handle case where trace context is injected into attributes.AWSTraceHeader
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
Expand All @@ -296,9 +324,13 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
span_id=int(x_ray_context["parent_id"], 16),
sampling_priority=float(x_ray_context["sampled"]),
)
# Still want to set a DSM checkpoint even if DSM context not propagated
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to set a checkpoint even when context is not propagated because we can still track the consume call, even if the produce call wasn't tracked.
So you would see:
{ queue } --> { service }
Instead of seeing nothing. It's still useful

_dsm_set_checkpoint(None, event_type, source_arn)
return extract_context_from_lambda_context(lambda_context)
except Exception as e:
logger.debug("The trace extractor returned with error %s", e)
# Still want to set a DSM checkpoint even if DSM context not propagated
_dsm_set_checkpoint(None, event_type, source_arn)
return extract_context_from_lambda_context(lambda_context)


Expand Down Expand Up @@ -357,9 +389,12 @@ def extract_context_from_eventbridge_event(event, lambda_context):
def extract_context_from_kinesis_event(event, lambda_context):
"""
Extract datadog trace context from a Kinesis Stream's base64 encoded data string
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
"""
source_arn = ""
try:
record = get_first_record(event)
source_arn = record.get("eventSourceARN", "")
kinesis = record.get("kinesis")
if not kinesis:
return extract_context_from_lambda_context(lambda_context)
Expand All @@ -373,10 +408,13 @@ def extract_context_from_kinesis_event(event, lambda_context):
data_obj = json.loads(data_str)
dd_ctx = data_obj.get("_datadog")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would that be set for Kinesis?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if dd_ctx:
return propagator.extract(dd_ctx)
context = propagator.extract(dd_ctx)
_dsm_set_checkpoint(dd_ctx, "kinesis", source_arn)
return context
except Exception as e:
logger.debug("The trace extractor returned with error %s", e)

# Still want to set a DSM checkpoint even if DSM context not propagated
_dsm_set_checkpoint(None, "kinesis", source_arn)
return extract_context_from_lambda_context(lambda_context)


Expand Down Expand Up @@ -594,7 +632,7 @@ def extract_dd_trace_context(
)
elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS):
context = extract_context_from_sqs_or_sns_event_or_context(
event, lambda_context
event, lambda_context, event_source
)
elif event_source.equals(EventTypes.EVENTBRIDGE):
context = extract_context_from_eventbridge_event(event, lambda_context)
Expand Down
3 changes: 0 additions & 3 deletions datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from time import time_ns

from datadog_lambda.asm import asm_start_response, asm_start_request
from datadog_lambda.dsm import set_dsm_context
from datadog_lambda.extension import should_use_extension, flush_extension
from datadog_lambda.cold_start import (
set_cold_start,
Expand Down Expand Up @@ -237,8 +236,6 @@ def _before(self, event, context):
self.inferred_span = create_inferred_span(
event, context, event_source, config.decode_authorizer_context
)
if config.data_streams_enabled:
set_dsm_context(event, event_source)
self.span = create_function_execution_span(
context=context,
function_name=config.function_name,
Expand Down
112 changes: 0 additions & 112 deletions tests/test_dsm.py

This file was deleted.

Loading
Loading