Skip to content

[SLS-1824] SNS trace extractor #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
[![Slack](https://chat.datadoghq.com/badge.svg?bg=632CA6)](https://chat.datadoghq.com/)
[![License](https://img.shields.io/badge/license-Apache--2.0-blue)](https://github.com/DataDog/datadog-lambda-python/blob/main/LICENSE)

Datadog Lambda Library for Python (3.6, 3.7, 3.8, and 3.9) enables enhanced Lambda metrics, distributed tracing, and custom metric submission from AWS Lambda functions.
Datadog Lambda Library for Python (3.6, 3.7, 3.8, and 3.9) enables enhanced Lambda metrics, distributed tracing, and custom metric submission from AWS Lambda functions.

**IMPORTANT NOTE:** AWS Lambda is expected to receive a [breaking change](https://aws.amazon.com/blogs/compute/upcoming-changes-to-the-python-sdk-in-aws-lambda/) on **March 31, 2021**. If you are using Datadog Python Lambda layer version 7 or below, please upgrade to the latest.

Expand Down Expand Up @@ -89,11 +89,12 @@ Set to `true` to merge the X-Ray trace and the Datadog trace, when using both th

### DD_INFERRED_SPANS (experimental)

Inferred Spans are spans that Datadog can create based on incoming event metadata.
Inferred Spans are spans that Datadog can create based on incoming event metadata.
Set `DD_INFERRED_SPANS` to `true` to infer spans based on Lambda events.
Inferring upstream spans is only supported if you are using the [Datadog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/).
Inferring upstream spans is only supported if you are using the [Datadog Lambda Extension](https://docs.datadoghq.com/serverless/libraries_integrations/extension/).
Defaults to `false`.
Infers spans for:

- API Gateway REST events
- API Gateway websocket events
- HTTP API events
Expand Down
148 changes: 134 additions & 14 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import json
import base64
from datetime import datetime, timezone
from typing import Optional, Dict

Expand Down Expand Up @@ -184,23 +185,98 @@ def extract_context_from_http_event_or_context(event, lambda_context):
return trace_id, parent_id, sampling_priority


def extract_context_from_sqs_event_or_context(event, lambda_context):
def create_sns_event(message):
return {
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"Sns": message,
}
]
}


def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
"""
Extract Datadog trace context from the first SQS message attributes.

Falls back to lambda context if no trace data is found in the SQS message attributes.
"""
try:
first_record = event["Records"][0]
msg_attributes = first_record.get("messageAttributes", {})
dd_json_data = msg_attributes.get("_datadog", {}).get("stringValue", r"{}")

# logic to deal with SNS => SQS event
if "body" in first_record:
body_str = first_record.get("body", {})
try:
body = json.loads(body_str)
if body.get("Type", "") == "Notification" and "TopicArn" in body:
logger.debug("Found SNS message inside SQS event")
first_record = get_first_record(create_sns_event(body))
except Exception:
first_record = event["Records"][0]
pass

msg_attributes = first_record.get(
"messageAttributes",
first_record.get("Sns", {}).get("MessageAttributes", {}),
)
dd_payload = msg_attributes.get("_datadog", {})
dd_json_data = dd_payload.get("stringValue", dd_payload.get("Value", r"{}"))
dd_data = json.loads(dd_json_data)
trace_id = dd_data.get(TraceHeader.TRACE_ID)
parent_id = dd_data.get(TraceHeader.PARENT_ID)
sampling_priority = dd_data.get(TraceHeader.SAMPLING_PRIORITY)

return trace_id, parent_id, sampling_priority
except Exception:
except Exception as e:
logger.debug("The trace extractor returned with error %s", e)
return extract_context_from_lambda_context(lambda_context)


def extract_context_from_eventbridge_event(event, lambda_context):
"""
Extract datadog trace context from an EventBridge message's Details.
Details is often a weirdly escaped almost-JSON string. Here we have to correct for that.
"""
try:
detail = event["detail"]
dd_context = detail.get("_datadog")
if not dd_context:
return extract_context_from_lambda_context(lambda_context)
trace_id = dd_context.get(TraceHeader.TRACE_ID)
parent_id = dd_context.get(TraceHeader.PARENT_ID)
sampling_priority = dd_context.get(TraceHeader.SAMPLING_PRIORITY)
return trace_id, parent_id, sampling_priority
except Exception as e:
logger.debug("The trace extractor returned with error %s", e)
return extract_context_from_lambda_context(lambda_context)


def extract_context_from_kinesis_event(event, lambda_context):
"""
Extract datadog trace context from a Kinesis Stream's base64 encoded data string
"""
try:
record = get_first_record(event)
data = record.get("kinesis", {}).get("data", None)
if data:
b64_bytes = data.encode("ascii")
str_bytes = base64.b64decode(b64_bytes)
data_str = str_bytes.decode("ascii")
data_obj = json.loads(data_str)
dd_ctx = data_obj.get("_datadog")

if not dd_ctx:
return extract_context_from_lambda_context(lambda_context)

trace_id = dd_ctx.get(TraceHeader.TRACE_ID)
parent_id = dd_ctx.get(TraceHeader.PARENT_ID)
sampling_priority = dd_ctx.get(TraceHeader.SAMPLING_PRIORITY)
return trace_id, parent_id, sampling_priority
except Exception as e:
logger.debug("The trace extractor returned with error %s", e)
return extract_context_from_lambda_context(lambda_context)


Expand Down Expand Up @@ -230,6 +306,7 @@ def extract_dd_trace_context(event, lambda_context, extractor=None):
"""
global dd_trace_context
trace_context_source = None
event_source = parse_event_source(event)

if extractor is not None:
(
Expand All @@ -243,12 +320,24 @@ def extract_dd_trace_context(event, lambda_context, extractor=None):
parent_id,
sampling_priority,
) = extract_context_from_http_event_or_context(event, lambda_context)
elif "Records" in event:
elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS):
(
trace_id,
parent_id,
sampling_priority,
) = extract_context_from_sqs_event_or_context(event, lambda_context)
) = extract_context_from_sqs_or_sns_event_or_context(event, lambda_context)
elif event_source.equals(EventTypes.EVENTBRIDGE):
(
trace_id,
parent_id,
sampling_priority,
) = extract_context_from_eventbridge_event(event, lambda_context)
elif event_source.equals(EventTypes.KINESIS):
(
trace_id,
parent_id,
sampling_priority,
) = extract_context_from_kinesis_event(event, lambda_context)
else:
trace_id, parent_id, sampling_priority = extract_context_from_lambda_context(
lambda_context
Expand Down Expand Up @@ -556,6 +645,8 @@ def create_inferred_span_from_http_api_event(event, context):


def create_inferred_span_from_sqs_event(event, context):
trace_ctx = tracer.current_trace_context()

event_record = get_first_record(event)
event_source_arn = event_record["eventSourceARN"]
queue_name = event_source_arn.split(":")[-1]
Expand All @@ -574,11 +665,37 @@ def create_inferred_span_from_sqs_event(event, context):
"resource": queue_name,
"span_type": "web",
}
start_time = int(request_time_epoch) / 1000

# logic to deal with SNS => SQS event
sns_span = None
if "body" in event_record:
body_str = event_record.get("body", {})
try:
body = json.loads(body_str)
if body.get("Type", "") == "Notification" and "TopicArn" in body:
logger.debug("Found SNS message inside SQS event")
sns_span = create_inferred_span_from_sns_event(
create_sns_event(body), context
)
sns_span.finish(finish_time=start_time)
except Exception as e:
logger.debug(
"Unable to create SNS span from SQS message, with error %s" % e
)
pass

# trace context needs to be set again as it is reset
# when sns_span.finish executes
tracer.context_provider.activate(trace_ctx)
tracer.set_tags({"_dd.origin": "lambda"})
span = tracer.trace("aws.sqs", **args)
if span:
span.set_tags(tags)
span.start = int(request_time_epoch) / 1000
span.start = start_time
if sns_span:
span.parent_id = sns_span.span_id

return span


Expand All @@ -594,9 +711,12 @@ def create_inferred_span_from_sns_event(event, context):
"topic_arn": topic_arn,
"message_id": sns_message["MessageId"],
"type": sns_message["Type"],
"subject": sns_message["Subject"],
"event_subscription_arn": event_record["EventSubscriptionArn"],
}

# Subject not available in SNS => SQS scenario
if "Subject" in sns_message and sns_message["Subject"]:
tags["subject"] = sns_message["Subject"]

InferredSpanInfo.set_tags(tags, tag_source="self", synchronicity="async")
sns_dt_format = "%Y-%m-%dT%H:%M:%S.%fZ"
timestamp = event_record["Sns"]["Timestamp"]
Expand Down Expand Up @@ -644,7 +764,7 @@ def create_inferred_span_from_kinesis_event(event, context):
span = tracer.trace("aws.kinesis", **args)
if span:
span.set_tags(tags)
span.start = int(request_time_epoch)
span.start = request_time_epoch
return span


Expand All @@ -662,7 +782,7 @@ def create_inferred_span_from_dynamodb_event(event, context):
"event_name": event_record["eventName"],
"event_version": event_record["eventVersion"],
"stream_view_type": dynamodb_message["StreamViewType"],
"size_bytes": dynamodb_message["SizeBytes"],
"size_bytes": str(dynamodb_message["SizeBytes"]),
}
InferredSpanInfo.set_tags(tags, synchronicity="async", tag_source="self")
request_time_epoch = event_record["dynamodb"]["ApproximateCreationDateTime"]
Expand Down Expand Up @@ -690,8 +810,8 @@ def create_inferred_span_from_s3_event(event, context):
"bucketname": bucket_name,
"bucket_arn": event_record["s3"]["bucket"]["arn"],
"object_key": event_record["s3"]["object"]["key"],
"object_size": event_record["s3"]["object"]["size"],
"object_etag": event_record["s3"]["etag"],
"object_size": str(event_record["s3"]["object"]["size"]),
"object_etag": event_record["s3"]["object"]["eTag"],
}
InferredSpanInfo.set_tags(tags, synchronicity="async", tag_source="self")
dt_format = "%Y-%m-%dT%H:%M:%S.%fZ"
Expand Down Expand Up @@ -786,7 +906,7 @@ def create_function_execution_span(


class InferredSpanInfo(object):
BASE_NAME = "inferred_span"
BASE_NAME = "_inferred_span"
SYNCHRONICITY = f"{BASE_NAME}.synchronicity"
TAG_SOURCE = f"{BASE_NAME}.tag_source"

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ datadog = "^0.41.0"
wrapt = "^1.11.2"
ddtrace = "^0.50.0"
importlib_metadata = {version = "^1.0", python = "<3.8"}
typing_extensions = {version = "^4.0", python = "<3.8"}
boto3 = { version = "^1.10.33", optional = true }
typing_extensions = {version = "^4.0", python = "<3.8"}
requests = { version ="^2.22.0", optional = true }
nose2 = { version= "^0.9.1", optional = true }
flake8 = { version = "^3.7.9", optional = true }
Expand Down
4 changes: 3 additions & 1 deletion scripts/run_integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ for handler_name in "${LAMBDA_HANDLERS[@]}"; do
sed -E "s/(\"system\.pid\"\: )[0-9\.\-]+/\1\"XXXX\"/g" |
sed -E "s/(\"runtime-id\"\: \")[a-z0-9\.\-]+/\1XXXX/g" |
sed -E "s/(\"datadog_lambda\"\: \")([0-9]+\.[0-9]+\.[0-9])/\1X.X.X/g" |
sed -E "s/(\"dd_trace\"\: \")([0-9]+\.[0-9]+\.[0-9])/\1X.X.X/g"
sed -E "s/(\"dd_trace\"\: \")([0-9]+\.[0-9]+\.[0-9])/\1X.X.X/g" |
sed -E "/init complete at epoch/d" |
sed -E "/main started at epoch/d"
)

if [ ! -f $function_snapshot_path ]; then
Expand Down
5 changes: 4 additions & 1 deletion tests/event_samples/api-gateway-non-proxy-async.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
"X-Forwarded-For": "38.122.226.210, 70.132.52.143",
"X-Forwarded-Port": "443",
"X-Forwarded-Proto": "https",
"X-Amz-Invocation-Type": "Event"
"X-Amz-Invocation-Type": "Event",
"x-datadog-trace-id": "12345",
"x-datadog-parent-id": "67890",
"x-datadog-sampling-priority": "2"
},
"multiValueHeaders": {
"Accept": [
Expand Down
5 changes: 4 additions & 1 deletion tests/event_samples/api-gateway-non-proxy.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
"X-Amzn-Trace-Id": "Root=1-613a4da3-5012576973e2e5670d4c549a",
"X-Forwarded-For": "38.122.226.210, 70.132.52.143",
"X-Forwarded-Port": "443",
"X-Forwarded-Proto": "https"
"X-Forwarded-Proto": "https",
"x-datadog-trace-id": "12345",
"x-datadog-parent-id": "67890",
"x-datadog-sampling-priority": "2"
},
"multiValueHeaders": {
"Accept": [
Expand Down
5 changes: 4 additions & 1 deletion tests/event_samples/api-gateway-websocket-connect.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
"X-Amzn-Trace-Id": "Root=1-613b6b23-34ae0ce37f8d09ae19095835",
"X-Forwarded-For": "38.122.226.210",
"X-Forwarded-Port": "443",
"X-Forwarded-Proto": "https"
"X-Forwarded-Proto": "https",
"x-datadog-trace-id": "12345",
"x-datadog-parent-id": "67890",
"x-datadog-sampling-priority": "2"
},
"multiValueHeaders": {
"Host": [
Expand Down
23 changes: 23 additions & 0 deletions tests/event_samples/api-gateway-websocket-default.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
{
"headers": {
"Host": "p62c47itsb.execute-api.sa-east-1.amazonaws.com",
"x-api-key": "",
"X-Forwarded-For": "",
"x-restapi": "",
"x-datadog-trace-id": "12345",
"x-datadog-parent-id": "67890",
"x-datadog-sampling-priority": "2"
},
"multiValueHeaders": {
"Host": [
"p62c47itsb.execute-api.sa-east-1.amazonaws.com"
],
"x-api-key": [
""
],
"X-Forwarded-For": [
""
],
"x-restapi": [
""
]
},
"requestContext": {
"routeKey": "$default",
"messageId": "Fc5S3coemjQCJlg=",
Expand Down
7 changes: 5 additions & 2 deletions tests/event_samples/api-gateway-websocket-disconnect.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
"Host": "p62c47itsb.execute-api.sa-east-1.amazonaws.com",
"x-api-key": "",
"X-Forwarded-For": "",
"x-restapi": ""
"x-restapi": "",
"x-datadog-trace-id": "12345",
"x-datadog-parent-id": "67890",
"x-datadog-sampling-priority": "2"
},
"multiValueHeaders": {
"Host": [
Expand Down Expand Up @@ -39,4 +42,4 @@
"apiId": "p62c47itsb"
},
"isBase64Encoded": false
}
}
3 changes: 2 additions & 1 deletion tests/event_samples/api-gateway.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"X-Forwarded-Port": "443",
"X-Forwarded-Proto": "https",
"X-Datadog-Trace-Id": "12345",
"X-Datadog-Parent-Id": "67890"
"X-Datadog-Parent-Id": "67890",
"x-datadog-sampling-priority": "2"
},
"multiValueHeaders": {
"Accept": [
Expand Down
5 changes: 4 additions & 1 deletion tests/event_samples/application-load-balancer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
"x-forwarded-for": "72.12.164.125",
"x-forwarded-port": "80",
"x-forwarded-proto": "http",
"x-imforwards": "20"
"x-imforwards": "20",
"x-datadog-trace-id": "12345",
"x-datadog-parent-id": "67890",
"x-datadog-sampling-priority": "2"
},
"body": "",
"isBase64Encoded": false
Expand Down
Loading