Skip to content

Refactoring to allow lazy loading of datadog.api #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 39 additions & 122 deletions datadog_lambda/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
import logging

from botocore.exceptions import ClientError
import boto3
from datadog import api, initialize, statsd
from datadog.threadstats import ThreadStats
from datadog_lambda.extension import should_use_extension
from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer

Expand All @@ -24,100 +21,15 @@
lambda_stats = None


class StatsWriter:
def distribution(self, metric_name, value, tags=[], timestamp=None):
raise NotImplementedError()

def flush(self):
raise NotImplementedError()

def stop(self):
raise NotImplementedError()


class StatsDWriter(StatsWriter):
"""
Writes distribution metrics using StatsD protocol
"""

def __init__(self):
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
initialize(**options)

def distribution(self, metric_name, value, tags=[], timestamp=None):
statsd.distribution(metric_name, value, tags=tags)

def flush(self):
pass

def stop(self):
pass


class ThreadStatsWriter(StatsWriter):
"""
Writes distribution metrics using the ThreadStats class
"""

def __init__(self, flush_in_thread):
self.thread_stats = ThreadStats(compress_payload=True)
self.thread_stats.start(flush_in_thread=flush_in_thread)

def distribution(self, metric_name, value, tags=[], timestamp=None):
self.thread_stats.distribution(
metric_name, value, tags=tags, timestamp=timestamp
)

def flush(self):
""" "Flush distributions from ThreadStats to Datadog.
Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
to gain better control over exception handling.
"""
_, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf"))
count_dists = len(dists)
if not count_dists:
logger.debug("No distributions to flush. Continuing.")

self.thread_stats.flush_count += 1
logger.debug(
"Flush #%s sending %s distributions",
self.thread_stats.flush_count,
count_dists,
)
try:
self.thread_stats.reporter.flush_distributions(dists)
except Exception as e:
# The nature of the root issue https://bugs.python.org/issue41345 is complex,
# but comprehensive tests suggest that it is safe to retry on this specific error.
if isinstance(
e, api.exceptions.ClientError
) and "RemoteDisconnected" in str(e):
logger.debug(
"Retry flush #%s due to RemoteDisconnected",
self.thread_stats.flush_count,
)
try:
self.thread_stats.reporter.flush_distributions(dists)
except Exception:
logger.debug(
"Flush #%s failed after retry",
self.thread_stats.flush_count,
exc_info=True,
)
else:
logger.debug(
"Flush #%s failed", self.thread_stats.flush_count, exc_info=True
)

def stop(self):
self.thread_stats.stop()


def init_lambda_stats():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed before, might make sense to call init_lambda_stats() in the __init__ method of the wrapper? So it only creates the writer on cold start?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively you could check lambda_stats is None before creating a new writer? But I think it's probably the best that we have both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reverted the PR #147 as it had no effect on /hello route

global lambda_stats
if should_use_extension:
from datadog_lambda.statsd_writer import StatsDWriter

lambda_stats = StatsDWriter()
else:
from datadog_lambda.thread_stats_writer import ThreadStatsWriter

# Periodical flushing in a background thread is NOT guaranteed to succeed
# and leads to data loss. When disabled, metrics are only flushed at the
# end of invocation. To make metrics submitted from a long-running Lambda
Expand Down Expand Up @@ -257,34 +169,39 @@ def decrypt_kms_api_key(kms_client, ciphertext):
return plaintext


# Set API Key
if not api._api_key:
DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "")
DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "")
DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "")
DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", ""))

if DD_API_KEY_SECRET_ARN:
api._api_key = boto3.client("secretsmanager").get_secret_value(
SecretId=DD_API_KEY_SECRET_ARN
)["SecretString"]
elif DD_API_KEY_SSM_NAME:
api._api_key = boto3.client("ssm").get_parameter(
Name=DD_API_KEY_SSM_NAME, WithDecryption=True
)["Parameter"]["Value"]
elif DD_KMS_API_KEY:
kms_client = boto3.client("kms")
api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY)
else:
api._api_key = DD_API_KEY

logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key))

# Set DATADOG_HOST, to send data to a non-default Datadog datacenter
api._api_host = os.environ.get(
"DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com")
)
logger.debug("Setting DATADOG_HOST to %s", api._api_host)
# Set API Key only if extension is not here
if not should_use_extension:
Copy link
Collaborator

@tianchu tianchu Aug 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should put them into a function? and call it in the wrapper's __init__ method? I think we can further reduce the scope here, the API key is ONLY needed when DD_FLUSH_TO_LOG is false (we send metrics synchronously from the library)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the condition

from datadog import api

if not api._api_key:
import boto3

DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "")
DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "")
DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "")
DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", ""))

if DD_API_KEY_SECRET_ARN:
api._api_key = boto3.client("secretsmanager").get_secret_value(
SecretId=DD_API_KEY_SECRET_ARN
)["SecretString"]
elif DD_API_KEY_SSM_NAME:
api._api_key = boto3.client("ssm").get_parameter(
Name=DD_API_KEY_SSM_NAME, WithDecryption=True
)["Parameter"]["Value"]
elif DD_KMS_API_KEY:
kms_client = boto3.client("kms")
api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY)
else:
api._api_key = DD_API_KEY

logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key))

# Set DATADOG_HOST, to send data to a non-default Datadog datacenter
api._api_host = os.environ.get(
"DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com")
)
logger.debug("Setting DATADOG_HOST to %s", api._api_host)

# Unmute exceptions from datadog api client, so we can catch and handle them
api._mute = False
# Unmute exceptions from datadog api client, so we can catch and handle them
api._mute = False
9 changes: 9 additions & 0 deletions datadog_lambda/stats_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class StatsWriter:
def distribution(self, metric_name, value, tags=[], timestamp=None):
raise NotImplementedError()

def flush(self):
raise NotImplementedError()

def stop(self):
raise NotImplementedError()
21 changes: 21 additions & 0 deletions datadog_lambda/statsd_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from datadog_lambda.stats_writer import StatsWriter
from datadog import initialize, statsd


class StatsDWriter(StatsWriter):
"""
Writes distribution metrics using StatsD protocol
"""

def __init__(self):
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
initialize(**options)

def distribution(self, metric_name, value, tags=[], timestamp=None):
statsd.distribution(metric_name, value, tags=tags)

def flush(self):
pass

def stop(self):
pass
62 changes: 62 additions & 0 deletions datadog_lambda/thread_stats_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging
from datadog.threadstats import ThreadStats
from datadog_lambda.stats_writer import StatsWriter

logger = logging.getLogger(__name__)


class ThreadStatsWriter(StatsWriter):
"""
Writes distribution metrics using the ThreadStats class
"""

def __init__(self, flush_in_thread):
self.thread_stats = ThreadStats(compress_payload=True)
self.thread_stats.start(flush_in_thread=flush_in_thread)

def distribution(self, metric_name, value, tags=[], timestamp=None):
self.thread_stats.distribution(
metric_name, value, tags=tags, timestamp=timestamp
)

def flush(self):
""" "Flush distributions from ThreadStats to Datadog.
Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
to gain better control over exception handling.
"""
_, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf"))
count_dists = len(dists)
if not count_dists:
logger.debug("No distributions to flush. Continuing.")

self.thread_stats.flush_count += 1
logger.debug(
"Flush #%s sending %s distributions",
self.thread_stats.flush_count,
count_dists,
)
try:
self.thread_stats.reporter.flush_distributions(dists)
except Exception as e:
# The nature of the root issue https://bugs.python.org/issue41345 is complex,
# but comprehensive tests suggest that it is safe to retry on this specific error.
if type(e).__name__ == "ClientError" and "RemoteDisconnected" in str(e):
logger.debug(
"Retry flush #%s due to RemoteDisconnected",
self.thread_stats.flush_count,
)
try:
self.thread_stats.reporter.flush_distributions(dists)
except Exception:
logger.debug(
"Flush #%s failed after retry",
self.thread_stats.flush_count,
exc_info=True,
)
else:
logger.debug(
"Flush #%s failed", self.thread_stats.flush_count, exc_info=True
)

def stop(self):
self.thread_stats.stop()
2 changes: 1 addition & 1 deletion tests/test_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from datadog_lambda.metric import (
decrypt_kms_api_key,
lambda_metric,
ThreadStatsWriter,
KMS_ENCRYPTION_CONTEXT_KEY,
)
from datadog_lambda.thread_stats_writer import ThreadStatsWriter
from datadog_lambda.tags import _format_dd_lambda_layer_tag


Expand Down
3 changes: 2 additions & 1 deletion tests/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from mock import patch, call, ANY, MagicMock

from datadog_lambda.wrapper import datadog_lambda_wrapper
from datadog_lambda.metric import lambda_metric, ThreadStatsWriter
from datadog_lambda.metric import lambda_metric
from datadog_lambda.thread_stats_writer import ThreadStatsWriter


def get_mock_context(
Expand Down