-
Notifications
You must be signed in to change notification settings - Fork 45
Infer API Gateway spans #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 25 commits
19f9da3
c4cedfa
2cfaee7
e077667
c1f11ee
0cd45fd
40c79ca
c4d751e
96b7844
586033d
7d91108
2605f2e
4ecc91d
ceea1cc
f180439
ef9439c
07d2e97
99ca466
dd7840e
33274e3
8503277
279ffe4
a580e45
df376c9
3a3cc4e
e277053
657ee36
a6170ff
e75a3a0
5bd5cb5
38b6987
a1c0b9d
b325ee7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -6,6 +6,7 @@ | |||
import logging | ||||
import os | ||||
import json | ||||
from enum import Enum | ||||
|
||||
from datadog_lambda.constants import ( | ||||
SamplingPriority, | ||||
|
@@ -19,17 +20,48 @@ | |||
) | ||||
from ddtrace import tracer, patch | ||||
from ddtrace import __version__ as ddtrace_version | ||||
from ddtrace.filters import TraceFilter | ||||
from ddtrace.propagation.http import HTTPPropagator | ||||
from datadog_lambda import __version__ as datadog_lambda_version | ||||
|
||||
logger = logging.getLogger(__name__) | ||||
|
||||
|
||||
SPAN_TYPE_TAG = "_dd.span_type" | ||||
SPAN_TYPE_INFERRED = "inferred" | ||||
|
||||
|
||||
class InferredSpanFilter(TraceFilter): | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this class definitely deserve a comprehensive docstring to explain why it's needed and the background. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this warrants a comment explaining why we are using this. |
||||
def process_trace(self, trace): | ||||
logger.debug("InferredSpanFilter got a trace of length {}".format(len(trace))) | ||||
trace_to_send = [] | ||||
for span in trace: | ||||
if span.get_tag(SPAN_TYPE_TAG) == SPAN_TYPE_INFERRED and len(trace) > 1: | ||||
logger.debug( | ||||
"Found an inferred span. Filtering it out and writing it separately." | ||||
) | ||||
tracer.write([span]) | ||||
else: | ||||
logger.debug("Appending a span to the returned trace") | ||||
trace_to_send.append(span) | ||||
return trace_to_send | ||||
|
||||
|
||||
tracer.configure(settings={"FILTERS": [InferredSpanFilter()]}) | ||||
dd_trace_context = {} | ||||
dd_tracing_enabled = os.environ.get("DD_TRACE_ENABLED", "false").lower() == "true" | ||||
|
||||
propagator = HTTPPropagator() | ||||
|
||||
|
||||
class ManagedService(Enum): | ||||
UNKNOWN = 0 | ||||
API_GATEWAY = 1 | ||||
API_GATEWAY_WEBSOCKET = 2 | ||||
HTTP_API = 3 | ||||
APPSYNC = 4 | ||||
|
||||
|
||||
def _convert_xray_trace_id(xray_trace_id): | ||||
""" | ||||
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int). | ||||
|
@@ -377,13 +409,125 @@ def set_dd_trace_py_root(trace_context_source, merge_xray_traces): | |||
) | ||||
|
||||
|
||||
def create_inferred_span(event, context, function_name): | ||||
managed_service = detect_inferrable_span_type(event) | ||||
try: | ||||
if managed_service == ManagedService.API_GATEWAY: | ||||
logger.debug("API Gateway event detected. Inferring a span") | ||||
return create_inferred_span_from_api_gateway_event(event, context) | ||||
elif managed_service == ManagedService.HTTP_API: | ||||
logger.debug("HTTP API event detected. Inferring a span") | ||||
return create_inferred_span_from_http_api_event(event, context) | ||||
elif managed_service == ManagedService.API_GATEWAY_WEBSOCKET: | ||||
logger.debug("API Gateway Websocket event detected. Inferring a span") | ||||
return create_inferred_span_from_api_gateway_websocket_event(event, context) | ||||
except Exception as e: | ||||
logger.debug( | ||||
"Unable to infer span. Detected type: {}. Reason: {}", managed_service, e | ||||
) | ||||
return None | ||||
logger.debug("Unable to infer a span: unknown event type") | ||||
return None | ||||
|
||||
|
||||
def detect_inferrable_span_type(event): | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you reuse (maybe some refactoring required) the same logic from https://github.com/DataDog/datadog-lambda-python/blob/main/datadog_lambda/trigger.py? Want to avoid inconsistency between what's shown by the trace and the invocation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way we can plug this into the existing trigger detection logic. I'm wondering if we are duplicating the functionality somewhat. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You and Tian are really on the same brain wave here 😄 |
||||
if "httpMethod" in event: # likely some kind of API Gateway event | ||||
return ManagedService.API_GATEWAY | ||||
if "routeKey" in event: # likely HTTP API | ||||
return ManagedService.HTTP_API | ||||
if ( | ||||
"requestContext" in event and "messageDirection" in event["requestContext"] | ||||
): # likely a websocket API | ||||
return ManagedService.API_GATEWAY_WEBSOCKET | ||||
return ManagedService.UNKNOWN | ||||
|
||||
|
||||
def create_inferred_span_from_api_gateway_websocket_event(event, context): | ||||
domain = event["requestContext"]["domainName"] | ||||
endpoint = event["requestContext"]["routeKey"] | ||||
tags = { | ||||
"operation_name": "aws.apigateway.websocket", | ||||
"service.name": domain, | ||||
"url": domain + endpoint, | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
http.url ?
|
||||
"endpoint": endpoint, | ||||
"resource_name": domain + endpoint, | ||||
"request_id": context.aws_request_id, | ||||
"connection_id": event["requestContext"]["connectionId"], | ||||
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED, | ||||
} | ||||
request_time_epoch = event["requestContext"]["requestTimeEpoch"] | ||||
args = { | ||||
"resource": domain + endpoint, | ||||
"span_type": "web", | ||||
} | ||||
tracer.set_tags({"_dd.origin": "lambda"}) | ||||
span = tracer.trace("aws.apigateway.websocket", **args) | ||||
if span: | ||||
span.set_tags(tags) | ||||
span.start = request_time_epoch / 1000 | ||||
return span | ||||
|
||||
|
||||
def create_inferred_span_from_api_gateway_event(event, context): | ||||
domain = event["requestContext"]["domainName"] | ||||
path = event["path"] | ||||
tags = { | ||||
"operation_name": "aws.apigateway.rest", | ||||
"service.name": domain, | ||||
"url": domain + path, | ||||
"endpoint": path, | ||||
"http.method": event["httpMethod"], | ||||
"resource_name": domain + path, | ||||
"request_id": context.aws_request_id, | ||||
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED, | ||||
} | ||||
request_time_epoch = event["requestContext"]["requestTimeEpoch"] | ||||
args = { | ||||
"resource": domain + path, | ||||
"span_type": "http", | ||||
} | ||||
tracer.set_tags({"_dd.origin": "lambda"}) | ||||
span = tracer.trace("aws.apigateway", **args) | ||||
if span: | ||||
span.set_tags(tags) | ||||
span.start = request_time_epoch / 1000 | ||||
return span | ||||
|
||||
|
||||
def create_inferred_span_from_http_api_event(event, context): | ||||
domain = event["requestContext"]["domainName"] | ||||
path = event["rawPath"] | ||||
tags = { | ||||
"operation_name": "aws.httpapi", | ||||
"service.name": domain, | ||||
"url": domain + path, | ||||
"endpoint": path, | ||||
"http.method": event["requestContext"]["http"]["method"], | ||||
"resource_name": domain + path, | ||||
"request_id": context.aws_request_id, | ||||
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED, | ||||
} | ||||
request_time_epoch = event["requestContext"]["timeEpoch"] | ||||
args = { | ||||
"resource": domain + path, | ||||
"span_type": "http", | ||||
} | ||||
tracer.set_tags({"_dd.origin": "lambda"}) | ||||
span = tracer.trace("aws.httpapi", **args) | ||||
if span: | ||||
span.set_tags(tags) | ||||
span.start = request_time_epoch / 1000 | ||||
return span | ||||
|
||||
|
||||
def create_function_execution_span( | ||||
context, | ||||
function_name, | ||||
is_cold_start, | ||||
trace_context_source, | ||||
merge_xray_traces, | ||||
trigger_tags, | ||||
upstream=None, | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is "parent" a better name? |
||||
): | ||||
tags = {} | ||||
if context: | ||||
|
@@ -402,6 +546,7 @@ def create_function_execution_span( | |||
else None, | ||||
"datadog_lambda": datadog_lambda_version, | ||||
"dd_trace": ddtrace_version, | ||||
"span.name": "aws.lambda", | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need this? Doesn't |
||||
} | ||||
if trace_context_source == TraceContextSource.XRAY and merge_xray_traces: | ||||
tags["_dd.parent_source"] = trace_context_source | ||||
|
@@ -415,4 +560,6 @@ def create_function_execution_span( | |||
span = tracer.trace("aws.lambda", **args) | ||||
if span: | ||||
span.set_tags(tags) | ||||
if upstream: | ||||
span.parent_id = upstream.span_id | ||||
return span |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,12 +26,12 @@ | |
set_correlation_ids, | ||
set_dd_trace_py_root, | ||
create_function_execution_span, | ||
create_inferred_span, | ||
) | ||
from datadog_lambda.trigger import extract_trigger_tags, extract_http_status_code_tag | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
""" | ||
Usage: | ||
|
||
|
@@ -96,6 +96,11 @@ def __init__(self, func): | |
self.extractor_env = os.environ.get("DD_TRACE_EXTRACTOR", None) | ||
self.trace_extractor = None | ||
self.span = None | ||
self.inferred_span = None | ||
self.make_inferred_span = ( | ||
os.environ.get("DD_INFERRED_SPANS", "false").lower() == "true" | ||
and should_use_extension | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to not support inferred spans when using the forwarder? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The forwarder adds tags from the tag cache to the trace payload(s) being sent back to the trace intake endpoint. This is an additional variable and an additional thing we would need to fix before the inferred spans show up as their own services. We can always remove this check and add forwarder support later. |
||
) | ||
self.response = None | ||
|
||
if self.extractor_env: | ||
|
@@ -138,6 +143,7 @@ def __call__(self, event, context, **kwargs): | |
|
||
def _before(self, event, context): | ||
try: | ||
|
||
set_cold_start() | ||
submit_invocations_metric(context) | ||
self.trigger_tags = extract_trigger_tags(event, context) | ||
|
@@ -153,13 +159,18 @@ def _before(self, event, context): | |
|
||
if dd_tracing_enabled: | ||
set_dd_trace_py_root(trace_context_source, self.merge_xray_traces) | ||
if self.make_inferred_span: | ||
self.inferred_span = create_inferred_span( | ||
event, context, self.function_name | ||
) | ||
self.span = create_function_execution_span( | ||
context, | ||
self.function_name, | ||
is_cold_start(), | ||
trace_context_source, | ||
self.merge_xray_traces, | ||
self.trigger_tags, | ||
upstream=self.inferred_span, | ||
) | ||
else: | ||
set_correlation_ids() | ||
|
@@ -180,16 +191,20 @@ def _after(self, event, context): | |
self.trigger_tags, XraySubsegment.LAMBDA_FUNCTION_TAGS_KEY | ||
) | ||
|
||
if not self.flush_to_log or should_use_extension: | ||
flush_stats() | ||
if should_use_extension: | ||
flush_extension() | ||
|
||
if self.span: | ||
if status_code: | ||
self.span.set_tag("http.status_code", status_code) | ||
self.span.finish() | ||
if self.inferred_span: | ||
if status_code: | ||
self.inferred_span.set_tag("http.status_code", status_code) | ||
self.inferred_span.finish() | ||
logger.debug("datadog_lambda_wrapper _after() done") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This log line is no longer at the right place. |
||
|
||
if not self.flush_to_log or should_use_extension: | ||
flush_stats() | ||
if should_use_extension: | ||
flush_extension() | ||
except Exception: | ||
traceback.print_exc() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually true? I know we said we wouldn't do any specific work to make it function in the forwarder, but the span will still get generated. I thin originally I said the forwarder might remap the service name, but I don't know if we verified that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We did verify it. The forwarder adds tags from the tags cache to the trace payloads it sends back. If we want, we can always fix this and add forwarder support later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, i guess the forwarder applies the service tag on all the trace payloads from the same event object? if so, even the trace payload carrying the inferred span has no lambda function arn, it still assumes it's from the same function with other payloads and apply the tag? If yes, we would need to make some changes in the forwarder, hopefully a small one.