From 0f923341a9a1f838c3ffbcc044a7930cffa6d28b Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 8 Mar 2024 15:32:03 -0500 Subject: [PATCH 1/4] java upstream sqs trace context propagation --- datadog_lambda/tracing.py | 70 +++++++++++++++------- tests/event_samples/sqs-java-upstream.json | 22 +++++++ tests/test_cold_start.py | 1 - tests/test_tracing.py | 11 ++++ 4 files changed, 82 insertions(+), 22 deletions(-) create mode 100644 tests/event_samples/sqs-java-upstream.json diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index f032059b..5543d9e3 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -23,6 +23,7 @@ TraceContextSource, XrayDaemon, Headers, + TraceHeader, ) from datadog_lambda.xray import ( send_segment, @@ -248,28 +249,55 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): first_record.get("Sns", {}).get("MessageAttributes", {}), ) dd_payload = msg_attributes.get("_datadog", {}) - # SQS uses dataType and binaryValue/stringValue - # SNS uses Type and Value - dd_json_data_type = dd_payload.get("Type", dd_payload.get("dataType", "")) - if dd_json_data_type == "Binary": - dd_json_data = dd_payload.get( - "binaryValue", - dd_payload.get("Value", r"{}"), - ) - dd_json_data = base64.b64decode(dd_json_data) - elif dd_json_data_type == "String": - dd_json_data = dd_payload.get( - "stringValue", - dd_payload.get("Value", r"{}"), - ) + if dd_payload: + # SQS uses dataType and binaryValue/stringValue + # SNS uses Type and Value + dd_json_data = None + dd_json_data_type = dd_payload.get("Type", dd_payload.get("dataType", "")) + if dd_json_data_type == "Binary": + dd_json_data = dd_payload.get( + "binaryValue", + dd_payload.get("Value", r"{}"), + ) + dd_json_data = base64.b64decode(dd_json_data) + elif dd_json_data_type == "String": + dd_json_data = dd_payload.get( + "stringValue", + dd_payload.get("Value", r"{}"), + ) + else: + logger.debug( + "Datadog Lambda Python only supports extracting trace" + "context from String or Binary SQS/SNS message attributes" + ) + + if dd_json_data: + dd_data = json.loads(dd_json_data) + return propagator.extract(dd_data) else: - logger.debug( - "Datadog Lambda Python only supports extracting trace" - "context from String or Binary SQS/SNS message attributes" - ) - return extract_context_from_lambda_context(lambda_context) - dd_data = json.loads(dd_json_data) - return propagator.extract(dd_data) + # Handle case where trace context is injected into attributes.AWSTraceHeader + # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 + x_ray_header = first_record.get("attributes", {}).get("AWSTraceHeader") + if x_ray_header: + x_ray_context = parse_xray_header(x_ray_header) + trace_id_parts = x_ray_context.get("trace_id", "").split("-") + if len(trace_id_parts) > 2 and trace_id_parts[2].startswith("00000000"): + # If it starts with eight 0's padding, + # then this AWSTraceHeader contains Datadog injected trace context + logger.debug( + "Found dd-trace injected trace context from AWSTraceHeader" + ) + dd_data = { + TraceHeader.TRACE_ID: str( + int(trace_id_parts[2][8:], 16) + ), # remove padding and convert the hex str to a decimal str + TraceHeader.PARENT_ID: str( + int(x_ray_context["parent_id"], 16) + ), # convert the hex str to a decimal str + TraceHeader.SAMPLING_PRIORITY: x_ray_context["sampled"], + } + return propagator.extract(dd_data) + return extract_context_from_lambda_context(lambda_context) except Exception as e: logger.debug("The trace extractor returned with error %s", e) return extract_context_from_lambda_context(lambda_context) diff --git a/tests/event_samples/sqs-java-upstream.json b/tests/event_samples/sqs-java-upstream.json new file mode 100644 index 00000000..23ab6881 --- /dev/null +++ b/tests/event_samples/sqs-java-upstream.json @@ -0,0 +1,22 @@ +{ + "Records": [ + { + "messageId": "f7e888aa-1368-484c-8e15-fc3f0f7c6fea", + "receiptHandle": "AQEBN1aYTQ1c5huZh9bkhBYqcMMnqTUMRh8MfUPyGXkEolcn23rvM9saGEg3wTK/7JnJ1s3Uk107uLjaP6yV6+zS3oQRU0vMG2LfyTgHovWhYQ8TnrpC7XpYL+Uf+oc9KoILQopiYi4wsFnOWQqy82yQmlOA3W+CZ3Rvq8N6rNcmyaZEXVdozHG+FyMCMQ8QdTcCHhzR9YKnkZ87Y40+LhysUR57VNPVtRwENI8H1uMEfmxaCkW+CAkdCGoXeX+KioT7pHJDZaEutXM3VRmGXDDzCXvfUJQ9JQIlP5xe66JO8/cpCyl5sDoHsCjLy6X/XCmfG2+XclPObGHBzcMSjG1RQtHsEGTOAJrLREucqf/oj0Ab4svpxz6lR4UXrICygZ2x0NZcNFXcZx3GV2QL9nHmJxzrO2lnNTEOMuYB4SnqtIhsaDTcmkYHumaAJdRHl5BksFcU5qpS7BQrnRvXn5Sz3hYdR2KuYKN5Oq6W1vuT16o=", + "body": "{\"hello\":\"world\"}", + "attributes": { + "ApproximateReceiveCount": "1", + "AWSTraceHeader": "Root=1-65eb7350-000000006dfd06bf489aa4e5;Parent=48cc02b6aafae897;Sampled=1", + "SentTimestamp": "1709929297382", + "SenderId": "AROAWGCM4HXUTHYJKOM7M:DdTraceXLambda-sqsjavache-sqsjavaproducerforPython-moi7s7Hu7Ppy", + "ApproximateFirstReceiveTimestamp": "1709929297387" + }, + "messageAttributes": {}, + "md5OfMessageAttributes": "", + "md5OfBody": "fbc24bcc7a1794758fc1327fcfebdaf6", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-west-2:444442222111:DdTraceXLambda-sqsjavachecksNeste-sqsJavaProducer2sqsjavaproducerfo-dwpHQF6fcZT4", + "awsRegion": "us-west-2" + } + ] +} diff --git a/tests/test_cold_start.py b/tests/test_cold_start.py index 2ce37e7c..c7444c49 100644 --- a/tests/test_cold_start.py +++ b/tests/test_cold_start.py @@ -240,7 +240,6 @@ def test_trace_ignore_libs(self): def test_lazy_loaded_package_imports(monkeypatch): - spans = [] def finish(span): diff --git a/tests/test_tracing.py b/tests/test_tracing.py index d51f7b3f..e30d202c 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -1963,6 +1963,17 @@ def test_extract_context_from_sqs_batch_event(self): self.assertEqual(context.span_id, 7431398482019833808) self.assertEqual(context.sampling_priority, 1) + def test_extract_context_from_sqs_java_upstream_event(self): + event_sample_source = "sqs-java-upstream" + test_file = event_samples + event_sample_source + ".json" + with open(test_file, "r") as event: + event = json.load(event) + ctx = get_mock_context() + context, source, event_type = extract_dd_trace_context(event, ctx) + self.assertEqual(context.trace_id, 7925498337868555493) + self.assertEqual(context.span_id, 5245570649555658903) + self.assertEqual(context.sampling_priority, 1) + def test_extract_context_from_sns_event_with_string_msg_attr(self): event_sample_source = "sns-string-msg-attribute" test_file = event_samples + event_sample_source + ".json" From 8b9143412ee80c4354e3875221804fe6875a49a8 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 27 Mar 2024 14:16:36 -0400 Subject: [PATCH 2/4] add DD_TRACE_JAVA_TRACE_ID_PADDING --- datadog_lambda/tracing.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 5543d9e3..b8a4aed7 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -69,6 +69,8 @@ propagator = HTTPPropagator() +DD_TRACE_JAVA_TRACE_ID_PADDING = "00000000" + def _convert_xray_trace_id(xray_trace_id): """ @@ -281,7 +283,9 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): if x_ray_header: x_ray_context = parse_xray_header(x_ray_header) trace_id_parts = x_ray_context.get("trace_id", "").split("-") - if len(trace_id_parts) > 2 and trace_id_parts[2].startswith("00000000"): + if len(trace_id_parts) > 2 and trace_id_parts[2].startswith( + DD_TRACE_JAVA_TRACE_ID_PADDING + ): # If it starts with eight 0's padding, # then this AWSTraceHeader contains Datadog injected trace context logger.debug( From 496a673829cd2f105bbe3452ab1c4519bc23d34d Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 27 Mar 2024 14:50:01 -0400 Subject: [PATCH 3/4] fix support for w3c case --- datadog_lambda/tracing.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index b8a4aed7..a4865d79 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -291,16 +291,11 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): logger.debug( "Found dd-trace injected trace context from AWSTraceHeader" ) - dd_data = { - TraceHeader.TRACE_ID: str( - int(trace_id_parts[2][8:], 16) - ), # remove padding and convert the hex str to a decimal str - TraceHeader.PARENT_ID: str( - int(x_ray_context["parent_id"], 16) - ), # convert the hex str to a decimal str - TraceHeader.SAMPLING_PRIORITY: x_ray_context["sampled"], - } - return propagator.extract(dd_data) + return Context( + trace_id=int(trace_id_parts[2][8:], 16), + span_id=int(int(x_ray_context["parent_id"], 16)), + sampling_priority=float(x_ray_context["sampled"]), + ) return extract_context_from_lambda_context(lambda_context) except Exception as e: logger.debug("The trace extractor returned with error %s", e) From 564fb67704c6745df18a68cb1a30433bbb96f75b Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 27 Mar 2024 14:53:56 -0400 Subject: [PATCH 4/4] lint --- datadog_lambda/tracing.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index a4865d79..5e338253 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -23,7 +23,6 @@ TraceContextSource, XrayDaemon, Headers, - TraceHeader, ) from datadog_lambda.xray import ( send_segment,