Skip to content

Darcy.rayner/dd trace support #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ WORKDIR /build
# Install datadog_lambda and dependencies from local
COPY . .
RUN pip install . -t ./python/lib/$runtime/site-packages
RUN pip install --find-links=https://s3.amazonaws.com/pypi.datadoghq.com/trace-dev/index.html ddtrace==0.35.1.dev5+g1e11b2dd -t ./python/lib/$runtime/site-packages

# Remove *.pyc files
RUN find ./python/lib/$runtime/site-packages -name \*.pyc -delete
Expand Down
7 changes: 7 additions & 0 deletions datadog_lambda/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@ class XraySubsegment(object):
NAME = "datadog-metadata"
KEY = "trace"
NAMESPACE = "datadog"


# Source of datadog context
class Source(object):
XRAY = "xray"
EVENT = "event"
DDTRACE = "ddtrace"
106 changes: 79 additions & 27 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@
# Copyright 2019 Datadog, Inc.

import logging
import os

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.lambda_launcher import LambdaContext

from ddtrace import patch, tracer
from datadog_lambda.constants import SamplingPriority, TraceHeader, XraySubsegment
from datadog_lambda.constants import (
SamplingPriority,
TraceHeader,
XraySubsegment,
Source,
)
from ddtrace import tracer, patch

logger = logging.getLogger(__name__)

dd_trace_context = {}
dd_native_tracing_enabled = (
os.environ.get("DD_TRACE_ENABLED", "false").lower() == "true"
)


def _convert_xray_trace_id(xray_trace_id):
Expand Down Expand Up @@ -41,6 +50,42 @@ def _convert_xray_sampling(xray_sampled):
)


def _get_xray_trace_context():
if not is_lambda_context():
return None

xray_trace_entity = xray_recorder.get_trace_entity() # xray (sub)segment
return {
"trace-id": _convert_xray_trace_id(xray_trace_entity.trace_id),
"parent-id": _convert_xray_entity_id(xray_trace_entity.id),
"sampling-priority": _convert_xray_sampling(xray_trace_entity.sampled),
"source": Source.XRAY,
}


def _get_dd_trace_native_context():
span = tracer.current_span()
if not span:
return None

parent_id = span.context.span_id
trace_id = span.context.trace_id
return {
"parent-id": str(parent_id),
"trace-id": str(trace_id),
"sampling-priority": SamplingPriority.AUTO_KEEP,
"source": Source.DDTRACE,
}


def _context_obj_to_headers(obj):
return {
TraceHeader.TRACE_ID: str(obj.get("trace-id")),
TraceHeader.PARENT_ID: str(obj.get("parent-id")),
TraceHeader.SAMPLING_PRIORITY: str(obj.get("sampling-priority")),
}


def extract_dd_trace_context(event):
"""
Extract Datadog trace context from the Lambda `event` object.
Expand All @@ -61,23 +106,24 @@ def extract_dd_trace_context(event):
sampling_priority = lowercase_headers.get(TraceHeader.SAMPLING_PRIORITY)
if trace_id and parent_id and sampling_priority:
logger.debug("Extracted Datadog trace context from headers")
dd_trace_context = {
metadata = {
"trace-id": trace_id,
"parent-id": parent_id,
"sampling-priority": sampling_priority,
}
xray_recorder.begin_subsegment(XraySubsegment.NAME)
subsegment = xray_recorder.current_subsegment()
subsegment.put_metadata(
XraySubsegment.KEY, dd_trace_context, XraySubsegment.NAMESPACE
)

subsegment.put_metadata(XraySubsegment.KEY, metadata, XraySubsegment.NAMESPACE)
dd_trace_context = metadata.copy()
dd_trace_context["source"] = Source.EVENT
xray_recorder.end_subsegment()
else:
# AWS Lambda runtime caches global variables between invocations,
# reset to avoid using the context from the last invocation.
dd_trace_context = {}

dd_trace_context = _get_xray_trace_context()
logger.debug("extracted dd trace context %s", dd_trace_context)
return dd_trace_context


def get_dd_trace_context():
Expand All @@ -92,26 +138,29 @@ def get_dd_trace_context():
automatically, but this function can be used to manually inject the trace
context to an outgoing request.
"""
if not is_lambda_context():
logger.debug("get_dd_trace_context is only supported in LambdaContext")
return {}

global dd_trace_context
xray_trace_entity = xray_recorder.get_trace_entity() # xray (sub)segment
if dd_trace_context:
return {
TraceHeader.TRACE_ID: dd_trace_context["trace-id"],
TraceHeader.PARENT_ID: _convert_xray_entity_id(xray_trace_entity.id),
TraceHeader.SAMPLING_PRIORITY: dd_trace_context["sampling-priority"],
}
else:
return {
TraceHeader.TRACE_ID: _convert_xray_trace_id(xray_trace_entity.trace_id),
TraceHeader.PARENT_ID: _convert_xray_entity_id(xray_trace_entity.id),
TraceHeader.SAMPLING_PRIORITY: _convert_xray_sampling(
xray_trace_entity.sampled
),
}

if dd_native_tracing_enabled:
native_trace_context = _get_dd_trace_native_context()
if native_trace_context is not None:
logger.info("get_dd_trace_context using dd-trace context")
return _context_obj_to_headers(native_trace_context)

try:
trace_headers = _context_obj_to_headers(dd_trace_context)
xray_context = _get_xray_trace_context() # xray (sub)segment
if xray_context and not trace_headers:
return _context_obj_to_headers(xray_context)
if xray_context and trace_headers:
trace_headers[TraceHeader.PARENT_ID] = xray_context["parent-id"]
return trace_headers
except Exception as e:
logger.debug(
"get_dd_trace_context couldn't read from segment from x-ray, with error %s"
% e
)

return {}


def set_correlation_ids():
Expand All @@ -125,6 +174,9 @@ def set_correlation_ids():
if not is_lambda_context():
logger.debug("set_correlation_ids is only supported in LambdaContext")
return
if dd_native_tracing_enabled:
logger.debug("using ddtrace implementation for spans")
return

context = get_dd_trace_context()

Expand Down
49 changes: 44 additions & 5 deletions datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
import traceback

from datadog_lambda.cold_start import set_cold_start
from datadog_lambda.cold_start import set_cold_start, is_cold_start
from datadog_lambda.metric import (
lambda_stats,
submit_invocations_metric,
Expand All @@ -16,9 +16,14 @@
from datadog_lambda.patch import patch_all
from datadog_lambda.tracing import (
extract_dd_trace_context,
set_correlation_ids,
inject_correlation_ids,
get_dd_trace_context,
dd_native_tracing_enabled,
set_correlation_ids,
)
from datadog_lambda.constants import Source
from ddtrace import tracer
from ddtrace.propagation.http import HTTPPropagator


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,6 +86,12 @@ def __init__(self, func):
self.logs_injection = (
os.environ.get("DD_LOGS_INJECTION", "true").lower() == "true"
)
self.merge_xray_traces = (
os.environ.get("DD_MERGE_XRAY_TRACES", "false").lower() == "true"
)
self.handler_name = os.environ.get("_HANDLER", "handler")
self.function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "function")
self.propagator = HTTPPropagator()

# Inject trace correlation ids to logs
if self.logs_injection:
Expand All @@ -105,19 +116,47 @@ def __call__(self, event, context, **kwargs):

def _before(self, event, context):
try:

set_cold_start()
submit_invocations_metric(context)
# Extract Datadog trace context from incoming requests
extract_dd_trace_context(event)
dd_context = extract_dd_trace_context(event)
span_context = None
if dd_context["source"] == Source.EVENT or self.merge_xray_traces:
headers = get_dd_trace_context()
span_context = self.propagator.extract(headers)

tags = {}
if context:
tags = {
"cold_start": is_cold_start(),
"function_arn": context.invoked_function_arn,
"request_id": context.aws_request_id,
"resource_names": context.function_name,
}
args = {
"service": self.function_name,
"resource": self.handler_name,
"span_type": "serverless",
"child_of": span_context,
}

self.span = None
if dd_native_tracing_enabled:
self.span = tracer.start_span("aws.lambda", **args)
if self.span:
self.span.set_tags(tags)
else:
set_correlation_ids()

# Set log correlation ids using extracted trace context
set_correlation_ids()
logger.debug("datadog_lambda_wrapper _before() done")
except Exception:
traceback.print_exc()

def _after(self, event, context):
try:
if self.span:
self.span.finish()
if not self.flush_to_log:
lambda_stats.flush(float("inf"))
logger.debug("datadog_lambda_wrapper _after() done")
Expand Down
5 changes: 5 additions & 0 deletions scripts/publish_staging.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
set -e

./scripts/build_layers.sh
./scripts/publish_layers.sh us-east-1
3 changes: 2 additions & 1 deletion tests/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ WORKDIR /test

# Install datadog-lambda with dev dependencies from local
COPY . .
RUN pip install .[dev]
RUN pip install .[dev]
RUN pip install --find-links=https://s3.amazonaws.com/pypi.datadoghq.com/trace-dev/index.html ddtrace==0.35.1.dev5+g1e11b2dd -t .[dev]
4 changes: 4 additions & 0 deletions tests/integration/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

from datadog_lambda.metric import lambda_metric
from datadog_lambda.wrapper import datadog_lambda_wrapper
from ddtrace import tracer
from ddtrace.internal.writer import LogWriter

tracer.writer = LogWriter()


@datadog_lambda_wrapper
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,31 @@ functions:
http-requests_python27:
handler: http_requests.handle
runtime: python2.7
environment:
DD_TRACE_ENABLED: true
layers:
- { Ref: Python27LambdaLayer }

http-requests_python36:
handler: http_requests.handle
runtime: python3.6
environment:
DD_TRACE_ENABLED: true
layers:
- { Ref: Python36LambdaLayer }

http-requests_python37:
handler: http_requests.handle
runtime: python3.7
environment:
DD_TRACE_ENABLED: true
layers:
- { Ref: Python37LambdaLayer }

http-requests_python38:
handler: http_requests.handle
runtime: python3.8
environment:
DD_TRACE_ENABLED: true
layers:
- { Ref: Python38LambdaLayer }
3 changes: 3 additions & 0 deletions tests/integration/snapshots/logs/http-requests_python27.log
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ START RequestId: XXXX Version: $LATEST
{"e": XXXX, "m": "aws.lambda.enhanced.invocations", "t": ["region:us-east-1", "account_id:XXXX", "functionname:integration-tester-dev-http-requests_python27", "cold_start:true", "memorysize:1024", "runtime:python2.7", "dd_lambda_layer:datadog-python27_2.XX.0"], "v": 1}
HTTP GET https://ip-ranges.datadoghq.com/ Headers: ["x-datadog-parent-id:XXXX", "x-datadog-sampling-priority:2", "x-datadog-trace-id:XXXX"] Data: {}
HTTP GET https://ip-ranges.datadoghq.eu/ Headers: ["x-datadog-parent-id:XXXX", "x-datadog-sampling-priority:2", "x-datadog-trace-id:XXXX"] Data: {}
{"traces": [[{"resource": "http_requests.handle", "name": "aws.lambda", "service": "integration-tester-dev-http-requests_python27", "start": 1585077483318449000, "trace_id": "06C0653FC881D935", "metrics": {"_sampling_priority_v1": 1, "system.pid": 1, "_dd.agent_psr": 1.0}, "parent_id": "0000000000000000", "meta": {"function_arn": "arn:aws:lambda:us-east-1:601427279990:function:integration-tester-dev-http-requests_python27", "request_id": "da026ab0-1d6c-4980-9720-1b4bdd843314", "cold_start": "True", "resource_names": "integration-tester-dev-http-requests_python27"}, "error": 0, "duration": 389814000, "type": "serverless", "span_id": "3DA8AA128268BFA1"}]]}
HTTP POST https://api.datadoghq.com/api/v1/distribution_points Headers: ["Content-Type:application/json", "x-datadog-parent-id:XXXX", "x-datadog-sampling-priority:2", "x-datadog-trace-id:XXXX"] Data: {"series": [{"tags": ["team:serverless", "role:hello", "dd_lambda_layer:datadog-python27_2.XX.0"], "metric": "hello.dog", "interval": 10, "host": null, "points": [[XXXX, [1.0]]], "device": null, "type": "distribution"}, {"tags": ["test:integration", "role:hello", "dd_lambda_layer:datadog-python27_2.XX.0"], "metric": "tests.integration.count", "interval": 10, "host": null, "points": [[XXXX, [21.0]]], "device": null, "type": "distribution"}]}
END RequestId: XXXX
REPORT RequestId: XXXX Duration: XXXX ms Billed Duration: XXXX ms Memory Size: 1024 MB Max Memory Used: XXXX MB Init Duration: XXXX ms
Expand All @@ -10,6 +11,7 @@ START RequestId: XXXX Version: $LATEST
{"e": XXXX, "m": "aws.lambda.enhanced.invocations", "t": ["region:us-east-1", "account_id:XXXX", "functionname:integration-tester-dev-http-requests_python27", "cold_start:false", "memorysize:1024", "runtime:python2.7", "dd_lambda_layer:datadog-python27_2.XX.0"], "v": 1}
HTTP GET https://ip-ranges.datadoghq.com/ Headers: ["x-datadog-parent-id:XXXX", "x-datadog-sampling-priority:2", "x-datadog-trace-id:XXXX"] Data: {}
HTTP GET https://ip-ranges.datadoghq.eu/ Headers: ["x-datadog-parent-id:XXXX", "x-datadog-sampling-priority:2", "x-datadog-trace-id:XXXX"] Data: {}
{"traces": [[{"resource": "http_requests.handle", "name": "aws.lambda", "service": "integration-tester-dev-http-requests_python27", "start": 1585077485080991000, "trace_id": "6BDE1FAE35B84C6C", "metrics": {"_sampling_priority_v1": 1, "system.pid": 1, "_dd.agent_psr": 1.0}, "parent_id": "0000000000000000", "meta": {"function_arn": "arn:aws:lambda:us-east-1:601427279990:function:integration-tester-dev-http-requests_python27", "request_id": "f1b1655b-ce14-4540-bf54-8e128af7b8ab", "cold_start": "False", "resource_names": "integration-tester-dev-http-requests_python27"}, "error": 0, "duration": 80971000, "type": "serverless", "span_id": "393ECB596ED13F61"}]]}
HTTP POST https://api.datadoghq.com/api/v1/distribution_points Headers: ["Content-Type:application/json", "x-datadog-parent-id:XXXX", "x-datadog-sampling-priority:2", "x-datadog-trace-id:XXXX"] Data: {"series": [{"tags": ["team:serverless", "role:hello", "dd_lambda_layer:datadog-python27_2.XX.0"], "metric": "hello.dog", "interval": 10, "host": null, "points": [[XXXX, [1.0]]], "device": null, "type": "distribution"}, {"tags": ["test:integration", "role:hello", "dd_lambda_layer:datadog-python27_2.XX.0"], "metric": "tests.integration.count", "interval": 10, "host": null, "points": [[XXXX, [21.0]]], "device": null, "type": "distribution"}]}
END RequestId: XXXX
REPORT RequestId: XXXX Duration: XXXX ms Billed Duration: XXXX ms Memory Size: 1024 MB Max Memory Used: XXXX MB
Expand All @@ -18,6 +20,7 @@ START RequestId: XXXX Version: $LATEST
{"e": XXXX, "m": "aws.lambda.enhanced.invocations", "t": ["region:us-east-1", "account_id:XXXX", "functionname:integration-tester-dev-http-requests_python27", "cold_start:false", "memorysize:1024", "runtime:python2.7", "dd_lambda_layer:datadog-python27_2.XX.0"], "v": 1}
HTTP GET https://ip-ranges.datadoghq.com/ Headers: ["x-datadog-parent-id:XXXX", "x-datadog-sampling-priority:2", "x-datadog-trace-id:XXXX"] Data: {}
HTTP GET https://ip-ranges.datadoghq.eu/ Headers: ["x-datadog-parent-id:XXXX", "x-datadog-sampling-priority:2", "x-datadog-trace-id:XXXX"] Data: {}
{"traces": [[{"resource": "http_requests.handle", "name": "aws.lambda", "service": "integration-tester-dev-http-requests_python27", "start": 1585077486596975000, "trace_id": "0A7F7B686FDCC310", "metrics": {"_sampling_priority_v1": 1, "system.pid": 1, "_dd.agent_psr": 1.0}, "parent_id": "0000000000000000", "meta": {"function_arn": "arn:aws:lambda:us-east-1:601427279990:function:integration-tester-dev-http-requests_python27", "request_id": "6f90344e-1fa5-48cb-8933-d7f523868746", "cold_start": "False", "resource_names": "integration-tester-dev-http-requests_python27"}, "error": 0, "duration": 100314000, "type": "serverless", "span_id": "58E9AB1D40B41342"}]]}
HTTP POST https://api.datadoghq.com/api/v1/distribution_points Headers: ["Content-Type:application/json", "x-datadog-parent-id:XXXX", "x-datadog-sampling-priority:2", "x-datadog-trace-id:XXXX"] Data: {"series": [{"tags": ["team:serverless", "role:hello", "dd_lambda_layer:datadog-python27_2.XX.0"], "metric": "hello.dog", "interval": 10, "host": null, "points": [[XXXX, [1.0]]], "device": null, "type": "distribution"}, {"tags": ["test:integration", "role:hello", "dd_lambda_layer:datadog-python27_2.XX.0"], "metric": "tests.integration.count", "interval": 10, "host": null, "points": [[XXXX, [21.0]]], "device": null, "type": "distribution"}]}
END RequestId: XXXX
REPORT RequestId: XXXX Duration: XXXX ms Billed Duration: XXXX ms Memory Size: 1024 MB Max Memory Used: XXXX MB
Expand Down
Loading