From c43b5b53a489f3baf13dc3f77a57860b7e39da4a Mon Sep 17 00:00:00 2001 From: DarcyRaynerDD Date: Fri, 25 Sep 2020 10:14:32 -0400 Subject: [PATCH 1/4] Implement statsd protocol --- datadog_lambda/extension.py | 30 +++++++++++++++++++++++ datadog_lambda/metric.py | 49 ++++++++++++++++++++++++++----------- datadog_lambda/tags.py | 1 - datadog_lambda/wrapper.py | 6 ++++- scripts/publish_staging.sh | 1 - tests/test_wrapper.py | 13 +++++++--- 6 files changed, 80 insertions(+), 20 deletions(-) create mode 100644 datadog_lambda/extension.py diff --git a/datadog_lambda/extension.py b/datadog_lambda/extension.py new file mode 100644 index 00000000..bc5d9b99 --- /dev/null +++ b/datadog_lambda/extension.py @@ -0,0 +1,30 @@ +import logging +import requests + +AGENT_URL = "http://127.0.0.1:8124" +HELLO_PATH = "/lambda/hello" +FLUSH_PATH = "/lambda/flush" + +logger = logging.getLogger(__name__) + + +def is_extension_running(): + try: + requests.get(AGENT_URL + HELLO_PATH) + except Exception as e: + logger.debug("Extension is not running, returned with error %s", e) + return False + return True + + +def flush_extension(): + try: + requests.post(AGENT_URL + FLUSH_PATH, data={}) + except Exception as e: + logger.debug("Failed to flush extension, returned with error %s", e) + return False + return True + + +use_extension = is_extension_running() + diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 0c95650c..6a561cf6 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -10,8 +10,9 @@ import logging import boto3 -from datadog import api +from datadog import api, initialize, statsd from datadog.threadstats import ThreadStats +from datadog_lambda.extension import use_extension from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer @@ -19,11 +20,32 @@ logger = logging.getLogger(__name__) -lambda_stats = ThreadStats() -lambda_stats.start() + +class StatsDWrapper: + """ + Wraps StatsD calls, to give an identical to ThreadStats + """ + + def __init__(self): + options = {"statsd_host": "127.0.0.1", "statsd_port": 8125} + initialize(**options) + + def distribution(self, metric_name, value, tags=[], timestamp=None): + statsd.distribution(metric_name, value, tags=tags) + + def flush(self, value): + pass -def lambda_metric(metric_name, value, timestamp=None, tags=None): +lambda_stats = None +if use_extension: + lambda_stats = StatsDWrapper() +else: + lambda_stats = ThreadStats() + lambda_stats.start() + + +def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False): """ Submit a data point to Datadog distribution metrics. https://docs.datadoghq.com/graphing/metrics/distributions/ @@ -36,12 +58,14 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None): periodically and at the end of the function execution in a background thread. """ + flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" tags = tag_dd_lambda_layer(tags) - if os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true": - write_metric_point_to_stdout(metric_name, value, timestamp, tags) + + if flush_to_logs or (force_async and not use_extension): + write_metric_point_to_stdout(metric_name, value, timestamp=timestamp, tags=tags) else: logger.debug("Sending metric %s to Datadog via lambda layer", metric_name) - lambda_stats.distribution(metric_name, value, timestamp=timestamp, tags=tags) + lambda_stats.distribution(metric_name, value, tags=tags, timestamp=timestamp) def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]): @@ -85,13 +109,10 @@ def submit_enhanced_metric(metric_name, lambda_context): metric_name, ) return - - # Enhanced metrics are always written to logs - write_metric_point_to_stdout( - "{}.{}".format(ENHANCED_METRICS_NAMESPACE_PREFIX, metric_name), - 1, - tags=get_enhanced_metrics_tags(lambda_context), - ) + tags = get_enhanced_metrics_tags(lambda_context) + metric_name = "aws.lambda.enhanced." + metric_name + # Enhanced metrics always use an async submission method, (eg logs or extension). + lambda_metric(metric_name, 1, timestamp=None, tags=tags, force_async=True) def submit_invocations_metric(lambda_context): diff --git a/datadog_lambda/tags.py b/datadog_lambda/tags.py index 0d85ed73..acef9691 100644 --- a/datadog_lambda/tags.py +++ b/datadog_lambda/tags.py @@ -85,7 +85,6 @@ def get_enhanced_metrics_tags(lambda_context): get_cold_start_tag(), "memorysize:{}".format(lambda_context.memory_limit_in_mb), get_runtime_tag(), - _format_dd_lambda_layer_tag(), ] diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 7ddb4340..cdb35ee6 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -7,6 +7,7 @@ import logging import traceback +from datadog_lambda.extension import use_extension, flush_extension from datadog_lambda.cold_start import set_cold_start, is_cold_start from datadog_lambda.metric import ( lambda_stats, @@ -137,8 +138,11 @@ def _before(self, event, context): def _after(self, event, context): try: - if not self.flush_to_log: + if not self.flush_to_log or use_extension: lambda_stats.flush(float("inf")) + if use_extension: + flush_extension() + if self.span: self.span.finish() logger.debug("datadog_lambda_wrapper _after() done") diff --git a/scripts/publish_staging.sh b/scripts/publish_staging.sh index 4f3ccebf..4d4787ed 100755 --- a/scripts/publish_staging.sh +++ b/scripts/publish_staging.sh @@ -1,6 +1,5 @@ #!/bin/bash set -e -./scripts/list_layers.sh ./scripts/build_layers.sh ./scripts/publish_layers.sh us-east-1 \ No newline at end of file diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index ca4fad69..e3de2000 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -38,9 +38,9 @@ def setUp(self): self.mock_wrapper_lambda_stats = patcher.start() self.addCleanup(patcher.stop) - patcher = patch("datadog_lambda.metric.lambda_metric") - self.mock_lambda_metric = patcher.start() - self.addCleanup(patcher.stop) + # patcher = patch("datadog_lambda.metric.lambda_metric") + # self.mock_lambda_metric = patcher.start() + # self.addCleanup(patcher.stop) patcher = patch("datadog_lambda.wrapper.extract_dd_trace_context") self.mock_extract_dd_trace_context = patcher.start() @@ -152,6 +152,7 @@ def lambda_handler(event, context): "runtime:python2.7", "dd_lambda_layer:datadog-python27_0.1.0", ], + timestamp=None, ) ] ) @@ -181,6 +182,7 @@ def lambda_handler(event, context): "runtime:python2.7", "dd_lambda_layer:datadog-python27_0.1.0", ], + timestamp=None, ), call( "aws.lambda.enhanced.errors", @@ -195,6 +197,7 @@ def lambda_handler(event, context): "runtime:python2.7", "dd_lambda_layer:datadog-python27_0.1.0", ], + timestamp=None, ), ] ) @@ -229,6 +232,7 @@ def lambda_handler(event, context): "runtime:python2.7", "dd_lambda_layer:datadog-python27_0.1.0", ], + timestamp=None, ), call( "aws.lambda.enhanced.invocations", @@ -243,6 +247,7 @@ def lambda_handler(event, context): "runtime:python2.7", "dd_lambda_layer:datadog-python27_0.1.0", ], + timestamp=None, ), ] ) @@ -275,6 +280,7 @@ def lambda_handler(event, context): "runtime:python2.7", "dd_lambda_layer:datadog-python27_0.1.0", ], + timestamp=None, ) ] ) @@ -307,6 +313,7 @@ def lambda_handler(event, context): "runtime:python2.7", "dd_lambda_layer:datadog-python27_0.1.0", ], + timestamp=None, ) ] ) From 041bb51af2db83f4c9a6461cc4f889c64231814e Mon Sep 17 00:00:00 2001 From: DarcyRaynerDD Date: Thu, 8 Oct 2020 13:24:55 -0400 Subject: [PATCH 2/4] Bump version to 2.22.0 --- datadog_lambda/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/__init__.py b/datadog_lambda/__init__.py index 758d3fac..89798587 100644 --- a/datadog_lambda/__init__.py +++ b/datadog_lambda/__init__.py @@ -1,6 +1,6 @@ # The minor version corresponds to the Lambda layer version. # E.g.,, version 0.5.0 gets packaged into layer version 5. -__version__ = "2.21.0" +__version__ = "2.22.0" import os From 866a75e8fdf2494127e607c986a6a9f0c96305cf Mon Sep 17 00:00:00 2001 From: DarcyRaynerDD Date: Thu, 8 Oct 2020 13:34:18 -0400 Subject: [PATCH 3/4] Remove redundant whitespace --- datadog_lambda/extension.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog_lambda/extension.py b/datadog_lambda/extension.py index bc5d9b99..95546569 100644 --- a/datadog_lambda/extension.py +++ b/datadog_lambda/extension.py @@ -27,4 +27,3 @@ def flush_extension(): use_extension = is_extension_running() - From 7883a6d0d88dc43034bf610640ffa16363d77912 Mon Sep 17 00:00:00 2001 From: DarcyRaynerDD Date: Thu, 8 Oct 2020 14:20:54 -0400 Subject: [PATCH 4/4] Clean up from PR review --- datadog_lambda/extension.py | 2 +- datadog_lambda/metric.py | 8 ++++---- datadog_lambda/wrapper.py | 6 +++--- tests/test_wrapper.py | 4 ---- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/datadog_lambda/extension.py b/datadog_lambda/extension.py index 95546569..f897eab6 100644 --- a/datadog_lambda/extension.py +++ b/datadog_lambda/extension.py @@ -26,4 +26,4 @@ def flush_extension(): return True -use_extension = is_extension_running() +should_use_extension = is_extension_running() diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 6a561cf6..90e30ca4 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -12,7 +12,7 @@ import boto3 from datadog import api, initialize, statsd from datadog.threadstats import ThreadStats -from datadog_lambda.extension import use_extension +from datadog_lambda.extension import should_use_extension from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer @@ -23,7 +23,7 @@ class StatsDWrapper: """ - Wraps StatsD calls, to give an identical to ThreadStats + Wraps StatsD calls, to give an identical interface to ThreadStats """ def __init__(self): @@ -38,7 +38,7 @@ def flush(self, value): lambda_stats = None -if use_extension: +if should_use_extension: lambda_stats = StatsDWrapper() else: lambda_stats = ThreadStats() @@ -61,7 +61,7 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" tags = tag_dd_lambda_layer(tags) - if flush_to_logs or (force_async and not use_extension): + if flush_to_logs or (force_async and not should_use_extension): write_metric_point_to_stdout(metric_name, value, timestamp=timestamp, tags=tags) else: logger.debug("Sending metric %s to Datadog via lambda layer", metric_name) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index cdb35ee6..d0709d3a 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -7,7 +7,7 @@ import logging import traceback -from datadog_lambda.extension import use_extension, flush_extension +from datadog_lambda.extension import should_use_extension, flush_extension from datadog_lambda.cold_start import set_cold_start, is_cold_start from datadog_lambda.metric import ( lambda_stats, @@ -138,9 +138,9 @@ def _before(self, event, context): def _after(self, event, context): try: - if not self.flush_to_log or use_extension: + if not self.flush_to_log or should_use_extension: lambda_stats.flush(float("inf")) - if use_extension: + if should_use_extension: flush_extension() if self.span: diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index e3de2000..a79297ce 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -38,10 +38,6 @@ def setUp(self): self.mock_wrapper_lambda_stats = patcher.start() self.addCleanup(patcher.stop) - # patcher = patch("datadog_lambda.metric.lambda_metric") - # self.mock_lambda_metric = patcher.start() - # self.addCleanup(patcher.stop) - patcher = patch("datadog_lambda.wrapper.extract_dd_trace_context") self.mock_extract_dd_trace_context = patcher.start() self.addCleanup(patcher.stop)