-
Notifications
You must be signed in to change notification settings - Fork 45
Refactoring to allow lazy loading of datadog.api #163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
d89c9e6
2b8fa00
3938148
6c65191
47cb8da
e549c86
80375ec
40e60e0
245a1c6
7ab79cb
ec2b96e
fd79f8e
01afee8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,9 +10,6 @@ | |
import logging | ||
|
||
from botocore.exceptions import ClientError | ||
import boto3 | ||
from datadog import api, initialize, statsd | ||
from datadog.threadstats import ThreadStats | ||
from datadog_lambda.extension import should_use_extension | ||
from datadog_lambda.tags import get_enhanced_metrics_tags, tag_dd_lambda_layer | ||
|
||
|
@@ -24,100 +21,15 @@ | |
lambda_stats = None | ||
|
||
|
||
class StatsWriter: | ||
def distribution(self, metric_name, value, tags=[], timestamp=None): | ||
raise NotImplementedError() | ||
|
||
def flush(self): | ||
raise NotImplementedError() | ||
|
||
def stop(self): | ||
raise NotImplementedError() | ||
|
||
|
||
class StatsDWriter(StatsWriter): | ||
""" | ||
Writes distribution metrics using StatsD protocol | ||
""" | ||
|
||
def __init__(self): | ||
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125} | ||
initialize(**options) | ||
|
||
def distribution(self, metric_name, value, tags=[], timestamp=None): | ||
statsd.distribution(metric_name, value, tags=tags) | ||
|
||
def flush(self): | ||
pass | ||
|
||
def stop(self): | ||
pass | ||
|
||
|
||
class ThreadStatsWriter(StatsWriter): | ||
""" | ||
Writes distribution metrics using the ThreadStats class | ||
""" | ||
|
||
def __init__(self, flush_in_thread): | ||
self.thread_stats = ThreadStats(compress_payload=True) | ||
self.thread_stats.start(flush_in_thread=flush_in_thread) | ||
|
||
def distribution(self, metric_name, value, tags=[], timestamp=None): | ||
self.thread_stats.distribution( | ||
metric_name, value, tags=tags, timestamp=timestamp | ||
) | ||
|
||
def flush(self): | ||
""" "Flush distributions from ThreadStats to Datadog. | ||
Modified based on `datadog.threadstats.base.ThreadStats.flush()`, | ||
to gain better control over exception handling. | ||
""" | ||
_, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf")) | ||
count_dists = len(dists) | ||
if not count_dists: | ||
logger.debug("No distributions to flush. Continuing.") | ||
|
||
self.thread_stats.flush_count += 1 | ||
logger.debug( | ||
"Flush #%s sending %s distributions", | ||
self.thread_stats.flush_count, | ||
count_dists, | ||
) | ||
try: | ||
self.thread_stats.reporter.flush_distributions(dists) | ||
except Exception as e: | ||
# The nature of the root issue https://bugs.python.org/issue41345 is complex, | ||
# but comprehensive tests suggest that it is safe to retry on this specific error. | ||
if isinstance( | ||
e, api.exceptions.ClientError | ||
) and "RemoteDisconnected" in str(e): | ||
logger.debug( | ||
"Retry flush #%s due to RemoteDisconnected", | ||
self.thread_stats.flush_count, | ||
) | ||
try: | ||
self.thread_stats.reporter.flush_distributions(dists) | ||
except Exception: | ||
logger.debug( | ||
"Flush #%s failed after retry", | ||
self.thread_stats.flush_count, | ||
exc_info=True, | ||
) | ||
else: | ||
logger.debug( | ||
"Flush #%s failed", self.thread_stats.flush_count, exc_info=True | ||
) | ||
|
||
def stop(self): | ||
self.thread_stats.stop() | ||
|
||
|
||
def init_lambda_stats(): | ||
global lambda_stats | ||
if should_use_extension: | ||
from datadog_lambda.statsd_writer import StatsDWriter | ||
|
||
lambda_stats = StatsDWriter() | ||
else: | ||
from datadog_lambda.thread_stats_writer import ThreadStatsWriter | ||
|
||
# Periodical flushing in a background thread is NOT guaranteed to succeed | ||
# and leads to data loss. When disabled, metrics are only flushed at the | ||
# end of invocation. To make metrics submitted from a long-running Lambda | ||
|
@@ -257,34 +169,39 @@ def decrypt_kms_api_key(kms_client, ciphertext): | |
return plaintext | ||
|
||
|
||
# Set API Key | ||
if not api._api_key: | ||
DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") | ||
DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") | ||
DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") | ||
DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", "")) | ||
|
||
if DD_API_KEY_SECRET_ARN: | ||
api._api_key = boto3.client("secretsmanager").get_secret_value( | ||
SecretId=DD_API_KEY_SECRET_ARN | ||
)["SecretString"] | ||
elif DD_API_KEY_SSM_NAME: | ||
api._api_key = boto3.client("ssm").get_parameter( | ||
Name=DD_API_KEY_SSM_NAME, WithDecryption=True | ||
)["Parameter"]["Value"] | ||
elif DD_KMS_API_KEY: | ||
kms_client = boto3.client("kms") | ||
api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY) | ||
else: | ||
api._api_key = DD_API_KEY | ||
|
||
logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key)) | ||
|
||
# Set DATADOG_HOST, to send data to a non-default Datadog datacenter | ||
api._api_host = os.environ.get( | ||
"DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com") | ||
) | ||
logger.debug("Setting DATADOG_HOST to %s", api._api_host) | ||
# Set API Key only if extension is not here | ||
if not should_use_extension: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we should put them into a function? and call it in the wrapper's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've updated the condition |
||
from datadog import api | ||
|
||
if not api._api_key: | ||
import boto3 | ||
|
||
DD_API_KEY_SECRET_ARN = os.environ.get("DD_API_KEY_SECRET_ARN", "") | ||
DD_API_KEY_SSM_NAME = os.environ.get("DD_API_KEY_SSM_NAME", "") | ||
DD_KMS_API_KEY = os.environ.get("DD_KMS_API_KEY", "") | ||
DD_API_KEY = os.environ.get("DD_API_KEY", os.environ.get("DATADOG_API_KEY", "")) | ||
|
||
if DD_API_KEY_SECRET_ARN: | ||
api._api_key = boto3.client("secretsmanager").get_secret_value( | ||
SecretId=DD_API_KEY_SECRET_ARN | ||
)["SecretString"] | ||
elif DD_API_KEY_SSM_NAME: | ||
api._api_key = boto3.client("ssm").get_parameter( | ||
Name=DD_API_KEY_SSM_NAME, WithDecryption=True | ||
)["Parameter"]["Value"] | ||
elif DD_KMS_API_KEY: | ||
kms_client = boto3.client("kms") | ||
api._api_key = decrypt_kms_api_key(kms_client, DD_KMS_API_KEY) | ||
else: | ||
api._api_key = DD_API_KEY | ||
|
||
logger.debug("Setting DATADOG_API_KEY of length %d", len(api._api_key)) | ||
|
||
# Set DATADOG_HOST, to send data to a non-default Datadog datacenter | ||
api._api_host = os.environ.get( | ||
"DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com") | ||
) | ||
logger.debug("Setting DATADOG_HOST to %s", api._api_host) | ||
|
||
# Unmute exceptions from datadog api client, so we can catch and handle them | ||
api._mute = False | ||
# Unmute exceptions from datadog api client, so we can catch and handle them | ||
api._mute = False |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
class StatsWriter: | ||
def distribution(self, metric_name, value, tags=[], timestamp=None): | ||
raise NotImplementedError() | ||
|
||
def flush(self): | ||
raise NotImplementedError() | ||
|
||
def stop(self): | ||
raise NotImplementedError() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from datadog_lambda.stats_writer import StatsWriter | ||
from datadog import initialize, statsd | ||
|
||
|
||
class StatsDWriter(StatsWriter): | ||
""" | ||
Writes distribution metrics using StatsD protocol | ||
""" | ||
|
||
def __init__(self): | ||
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125} | ||
initialize(**options) | ||
|
||
def distribution(self, metric_name, value, tags=[], timestamp=None): | ||
statsd.distribution(metric_name, value, tags=tags) | ||
|
||
def flush(self): | ||
pass | ||
|
||
def stop(self): | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import logging | ||
from datadog.threadstats import ThreadStats | ||
from datadog_lambda.stats_writer import StatsWriter | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ThreadStatsWriter(StatsWriter): | ||
""" | ||
Writes distribution metrics using the ThreadStats class | ||
""" | ||
|
||
def __init__(self, flush_in_thread): | ||
self.thread_stats = ThreadStats(compress_payload=True) | ||
self.thread_stats.start(flush_in_thread=flush_in_thread) | ||
|
||
def distribution(self, metric_name, value, tags=[], timestamp=None): | ||
self.thread_stats.distribution( | ||
metric_name, value, tags=tags, timestamp=timestamp | ||
) | ||
|
||
def flush(self): | ||
""" "Flush distributions from ThreadStats to Datadog. | ||
Modified based on `datadog.threadstats.base.ThreadStats.flush()`, | ||
to gain better control over exception handling. | ||
""" | ||
_, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf")) | ||
count_dists = len(dists) | ||
if not count_dists: | ||
logger.debug("No distributions to flush. Continuing.") | ||
|
||
self.thread_stats.flush_count += 1 | ||
logger.debug( | ||
"Flush #%s sending %s distributions", | ||
self.thread_stats.flush_count, | ||
count_dists, | ||
) | ||
try: | ||
self.thread_stats.reporter.flush_distributions(dists) | ||
except Exception as e: | ||
# The nature of the root issue https://bugs.python.org/issue41345 is complex, | ||
# but comprehensive tests suggest that it is safe to retry on this specific error. | ||
if type(e).__name__ == "ClientError" and "RemoteDisconnected" in str(e): | ||
logger.debug( | ||
"Retry flush #%s due to RemoteDisconnected", | ||
self.thread_stats.flush_count, | ||
) | ||
try: | ||
self.thread_stats.reporter.flush_distributions(dists) | ||
except Exception: | ||
logger.debug( | ||
"Flush #%s failed after retry", | ||
self.thread_stats.flush_count, | ||
exc_info=True, | ||
) | ||
else: | ||
logger.debug( | ||
"Flush #%s failed", self.thread_stats.flush_count, exc_info=True | ||
) | ||
|
||
def stop(self): | ||
self.thread_stats.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed before, might make sense to call
init_lambda_stats()
in the__init__
method of the wrapper? So it only creates the writer on cold start?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively you could check
lambda_stats is None
before creating a new writer? But I think it's probably the best that we have both?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reverted the PR #147 as it had no effect on /hello route