Skip to content

Statsd integration #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datadog_lambda/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# The minor version corresponds to the Lambda layer version.
# E.g.,, version 0.5.0 gets packaged into layer version 5.
__version__ = "2.21.0"
__version__ = "2.22.0"


import os
Expand Down
29 changes: 29 additions & 0 deletions datadog_lambda/extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging
import requests

AGENT_URL = "http://127.0.0.1:8124"
HELLO_PATH = "/lambda/hello"
FLUSH_PATH = "/lambda/flush"

logger = logging.getLogger(__name__)


def is_extension_running():
try:
requests.get(AGENT_URL + HELLO_PATH)
except Exception as e:
logger.debug("Extension is not running, returned with error %s", e)
return False
return True


def flush_extension():
try:
requests.post(AGENT_URL + FLUSH_PATH, data={})
except Exception as e:
logger.debug("Failed to flush extension, returned with error %s", e)
return False
return True


use_extension = is_extension_running()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, use_extension sounds like a verb, i.e. if I call it, then the extension will be used. But it seems like it represents whether the extension should be used. Maybe rename this to is_using_extension or should_use_extension or something along those lines?

49 changes: 35 additions & 14 deletions datadog_lambda/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,42 @@
import logging

import boto3
from datadog import api
from datadog import api, initialize, statsd
from datadog.threadstats import ThreadStats
from datadog_lambda.extension import use_extension
from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer


ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced"

logger = logging.getLogger(__name__)

lambda_stats = ThreadStats()
lambda_stats.start()

class StatsDWrapper:
"""
Wraps StatsD calls, to give an identical to ThreadStats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to give an identical what?

"""

def __init__(self):
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
initialize(**options)

def distribution(self, metric_name, value, tags=[], timestamp=None):
statsd.distribution(metric_name, value, tags=tags)

def flush(self, value):
pass


def lambda_metric(metric_name, value, timestamp=None, tags=None):
lambda_stats = None
if use_extension:
lambda_stats = StatsDWrapper()
else:
lambda_stats = ThreadStats()
lambda_stats.start()


def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False):
"""
Submit a data point to Datadog distribution metrics.
https://docs.datadoghq.com/graphing/metrics/distributions/
Expand All @@ -36,12 +58,14 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None):
periodically and at the end of the function execution in a
background thread.
"""
flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true"
tags = tag_dd_lambda_layer(tags)
if os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true":
write_metric_point_to_stdout(metric_name, value, timestamp, tags)

if flush_to_logs or (force_async and not use_extension):
write_metric_point_to_stdout(metric_name, value, timestamp=timestamp, tags=tags)
else:
logger.debug("Sending metric %s to Datadog via lambda layer", metric_name)
lambda_stats.distribution(metric_name, value, timestamp=timestamp, tags=tags)
lambda_stats.distribution(metric_name, value, tags=tags, timestamp=timestamp)


def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
Expand Down Expand Up @@ -85,13 +109,10 @@ def submit_enhanced_metric(metric_name, lambda_context):
metric_name,
)
return

# Enhanced metrics are always written to logs
write_metric_point_to_stdout(
"{}.{}".format(ENHANCED_METRICS_NAMESPACE_PREFIX, metric_name),
1,
tags=get_enhanced_metrics_tags(lambda_context),
)
tags = get_enhanced_metrics_tags(lambda_context)
metric_name = "aws.lambda.enhanced." + metric_name
# Enhanced metrics always use an async submission method, (eg logs or extension).
lambda_metric(metric_name, 1, timestamp=None, tags=tags, force_async=True)


def submit_invocations_metric(lambda_context):
Expand Down
1 change: 0 additions & 1 deletion datadog_lambda/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def get_enhanced_metrics_tags(lambda_context):
get_cold_start_tag(),
"memorysize:{}".format(lambda_context.memory_limit_in_mb),
get_runtime_tag(),
_format_dd_lambda_layer_tag(),
]


Expand Down
6 changes: 5 additions & 1 deletion datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import traceback

from datadog_lambda.extension import use_extension, flush_extension
from datadog_lambda.cold_start import set_cold_start, is_cold_start
from datadog_lambda.metric import (
lambda_stats,
Expand Down Expand Up @@ -137,8 +138,11 @@ def _before(self, event, context):

def _after(self, event, context):
try:
if not self.flush_to_log:
if not self.flush_to_log or use_extension:
lambda_stats.flush(float("inf"))
if use_extension:
flush_extension()

if self.span:
self.span.finish()
logger.debug("datadog_lambda_wrapper _after() done")
Expand Down
1 change: 0 additions & 1 deletion scripts/publish_staging.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/bin/bash
set -e

./scripts/list_layers.sh
./scripts/build_layers.sh
./scripts/publish_layers.sh us-east-1
13 changes: 10 additions & 3 deletions tests/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ def setUp(self):
self.mock_wrapper_lambda_stats = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("datadog_lambda.metric.lambda_metric")
self.mock_lambda_metric = patcher.start()
self.addCleanup(patcher.stop)
# patcher = patch("datadog_lambda.metric.lambda_metric")
# self.mock_lambda_metric = patcher.start()
# self.addCleanup(patcher.stop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were patching the datadog_lambda.metric.lambda_metric out for testing, but the new code uses it internally, so I unpatched it. I'll remove this snippet.


patcher = patch("datadog_lambda.wrapper.extract_dd_trace_context")
self.mock_extract_dd_trace_context = patcher.start()
Expand Down Expand Up @@ -152,6 +152,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
)
]
)
Expand Down Expand Up @@ -181,6 +182,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
),
call(
"aws.lambda.enhanced.errors",
Expand All @@ -195,6 +197,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
),
]
)
Expand Down Expand Up @@ -229,6 +232,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
),
call(
"aws.lambda.enhanced.invocations",
Expand All @@ -243,6 +247,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
),
]
)
Expand Down Expand Up @@ -275,6 +280,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
)
]
)
Expand Down Expand Up @@ -307,6 +313,7 @@ def lambda_handler(event, context):
"runtime:python2.7",
"dd_lambda_layer:datadog-python27_0.1.0",
],
timestamp=None,
)
]
)
Expand Down