Skip to content

Commit 25d311e

Browse files
authored
Extract span attrs from RQ job object & fix tests (#3786)
1 parent a674788 commit 25d311e

File tree

5 files changed

+85
-34
lines changed

5 files changed

+85
-34
lines changed

MIGRATION_GUIDE.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,18 @@ Looking to upgrade from Sentry SDK 2.x to 3.x? Here's a comprehensive list of wh
7171
| `client` | `client.address`, `client.port` |
7272
| full URL | `url.full` |
7373

74+
- If you're using the RQ integration, the `sampling_context` argument of `traces_sampler` doesn't contain the `rq_job` object anymore. Instead, the individual properties of the scope, if available, are accessible as follows:
75+
76+
| RQ property | Sampling context key(s) |
77+
| --------------- | ---------------------------- |
78+
| `rq_job.args` | `rq.job.args` |
79+
| `rq_job.kwargs` | `rq.job.kwargs` |
80+
| `rq_job.func` | `rq.job.func` |
81+
| `queue.name` | `messaging.destination.name` |
82+
| `job.id` | `messaging.message.id` |
83+
84+
Note that `rq.job.args`, `rq.job.kwargs`, and `rq.job.func` are serialized and not the actual objects on the job.
85+
7486
### Removed
7587

7688
- Spans no longer have a `description`. Use `name` instead.

sentry_sdk/integrations/opentelemetry/potel_span_processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,15 @@ def _root_span_to_transaction_event(self, span):
179179

180180
transaction_name, transaction_source = extract_transaction_name_source(span)
181181
span_data = extract_span_data(span)
182-
(_, description, status, http_status, _) = span_data
183-
184182
trace_context = get_trace_context(span, span_data=span_data)
185183
contexts = {"trace": trace_context}
186184

187185
profile_context = get_profile_context(span)
188186
if profile_context:
189187
contexts["profile"] = profile_context
190188

189+
(_, description, _, http_status, _) = span_data
190+
191191
if http_status:
192192
contexts["response"] = {"status_code": http_status}
193193

sentry_sdk/integrations/opentelemetry/utils.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ def extract_span_data(span):
114114
description = span.name
115115
status, http_status = extract_span_status(span)
116116
origin = None
117-
118117
if span.attributes is None:
119118
return (op, description, status, http_status, origin)
120119

@@ -133,11 +132,23 @@ def extract_span_data(span):
133132

134133
rpc_service = span.attributes.get(SpanAttributes.RPC_SERVICE)
135134
if rpc_service:
136-
return ("rpc", description, status, http_status, origin)
135+
return (
136+
span.attributes.get(SentrySpanAttribute.OP) or "rpc",
137+
description,
138+
status,
139+
http_status,
140+
origin,
141+
)
137142

138143
messaging_system = span.attributes.get(SpanAttributes.MESSAGING_SYSTEM)
139144
if messaging_system:
140-
return ("message", description, status, http_status, origin)
145+
return (
146+
span.attributes.get(SentrySpanAttribute.OP) or "message",
147+
description,
148+
status,
149+
http_status,
150+
origin,
151+
)
141152

142153
faas_trigger = span.attributes.get(SpanAttributes.FAAS_TRIGGER)
143154
if faas_trigger:

sentry_sdk/integrations/rq.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from sentry_sdk.integrations.logging import ignore_logger
77
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
88
from sentry_sdk.utils import (
9+
_serialize_span_attribute,
910
capture_internal_exceptions,
1011
ensure_integration_enabled,
1112
event_from_exception,
@@ -35,6 +36,15 @@
3536
DEFAULT_TRANSACTION_NAME = "unknown RQ task"
3637

3738

39+
JOB_PROPERTY_TO_ATTRIBUTE = {
40+
"id": "messaging.message.id",
41+
}
42+
43+
QUEUE_PROPERTY_TO_ATTRIBUTE = {
44+
"name": "messaging.destination.name",
45+
}
46+
47+
3848
class RqIntegration(Integration):
3949
identifier = "rq"
4050
origin = f"auto.queue.{identifier}"
@@ -54,8 +64,8 @@ def setup_once():
5464
old_perform_job = Worker.perform_job
5565

5666
@ensure_integration_enabled(RqIntegration, old_perform_job)
57-
def sentry_patched_perform_job(self, job, *args, **kwargs):
58-
# type: (Any, Job, *Queue, **Any) -> bool
67+
def sentry_patched_perform_job(self, job, queue, *args, **kwargs):
68+
# type: (Any, Job, Queue, *Any, **Any) -> bool
5969
with sentry_sdk.new_scope() as scope:
6070
try:
6171
transaction_name = job.func_name or DEFAULT_TRANSACTION_NAME
@@ -76,9 +86,9 @@ def sentry_patched_perform_job(self, job, *args, **kwargs):
7686
name=transaction_name,
7787
source=TRANSACTION_SOURCE_TASK,
7888
origin=RqIntegration.origin,
79-
custom_sampling_context={"rq_job": job},
89+
attributes=_prepopulate_attributes(job, queue),
8090
):
81-
rv = old_perform_job(self, job, *args, **kwargs)
91+
rv = old_perform_job(self, job, queue, *args, **kwargs)
8292

8393
if self.is_horse:
8494
# We're inside of a forked process and RQ is
@@ -167,3 +177,30 @@ def _capture_exception(exc_info, **kwargs):
167177
)
168178

169179
sentry_sdk.capture_event(event, hint=hint)
180+
181+
182+
def _prepopulate_attributes(job, queue):
183+
# type: (Job, Queue) -> dict[str, Any]
184+
attributes = {
185+
"messaging.system": "rq",
186+
}
187+
188+
for prop, attr in JOB_PROPERTY_TO_ATTRIBUTE.items():
189+
if getattr(job, prop, None) is not None:
190+
attributes[attr] = getattr(job, prop)
191+
192+
for prop, attr in QUEUE_PROPERTY_TO_ATTRIBUTE.items():
193+
if getattr(queue, prop, None) is not None:
194+
attributes[attr] = getattr(queue, prop)
195+
196+
for key in ("args", "kwargs"):
197+
if getattr(job, key, None):
198+
attributes[f"rq.job.{key}"] = _serialize_span_attribute(getattr(job, key))
199+
200+
func = job.func
201+
if callable(func):
202+
func = func.__name__
203+
204+
attributes["rq.job.func"] = _serialize_span_attribute(func)
205+
206+
return attributes

tests/integrations/rq/test_rq.py

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ def test_transaction_with_error(
118118
)
119119

120120
assert envelope["type"] == "transaction"
121-
assert envelope["contexts"]["trace"] == error_event["contexts"]["trace"]
121+
assert envelope["contexts"]["trace"] == DictionaryContaining(
122+
error_event["contexts"]["trace"]
123+
)
122124
assert envelope["transaction"] == error_event["transaction"]
123125
assert envelope["extra"]["rq-job"] == DictionaryContaining(
124126
{
@@ -148,10 +150,7 @@ def test_error_has_trace_context_if_tracing_disabled(
148150
assert error_event["contexts"]["trace"]
149151

150152

151-
def test_tracing_enabled(
152-
sentry_init,
153-
capture_events,
154-
):
153+
def test_tracing_enabled(sentry_init, capture_events, DictionaryContaining):
155154
sentry_init(integrations=[RqIntegration()], traces_sample_rate=1.0)
156155
events = capture_events()
157156

@@ -165,7 +164,10 @@ def test_tracing_enabled(
165164

166165
assert error_event["transaction"] == "tests.integrations.rq.test_rq.crashing_job"
167166
assert transaction["transaction"] == "tests.integrations.rq.test_rq.crashing_job"
168-
assert transaction["contexts"]["trace"] == error_event["contexts"]["trace"]
167+
assert (
168+
DictionaryContaining(error_event["contexts"]["trace"])
169+
== transaction["contexts"]["trace"]
170+
)
169171

170172

171173
def test_tracing_disabled(
@@ -218,9 +220,7 @@ def test_transaction_no_error(
218220
)
219221

220222

221-
def test_traces_sampler_gets_correct_values_in_sampling_context(
222-
sentry_init, DictionaryContaining, ObjectDescribedBy # noqa:N803
223-
):
223+
def test_traces_sampler_gets_correct_values_in_sampling_context(sentry_init):
224224
traces_sampler = mock.Mock(return_value=True)
225225
sentry_init(integrations=[RqIntegration()], traces_sampler=traces_sampler)
226226

@@ -230,22 +230,13 @@ def test_traces_sampler_gets_correct_values_in_sampling_context(
230230
queue.enqueue(do_trick, "Bodhi", trick="roll over")
231231
worker.work(burst=True)
232232

233-
traces_sampler.assert_any_call(
234-
DictionaryContaining(
235-
{
236-
"rq_job": ObjectDescribedBy(
237-
type=rq.job.Job,
238-
attrs={
239-
"description": "tests.integrations.rq.test_rq.do_trick('Bodhi', trick='roll over')",
240-
"result": "Bodhi, can you roll over? Good dog!",
241-
"func_name": "tests.integrations.rq.test_rq.do_trick",
242-
"args": ("Bodhi",),
243-
"kwargs": {"trick": "roll over"},
244-
},
245-
),
246-
}
247-
)
248-
)
233+
sampling_context = traces_sampler.call_args_list[0][0][0]
234+
assert sampling_context["messaging.system"] == "rq"
235+
assert sampling_context["rq.job.args"] == ["Bodhi"]
236+
assert sampling_context["rq.job.kwargs"] == '{"trick": "roll over"}'
237+
assert sampling_context["rq.job.func"] == "do_trick"
238+
assert sampling_context["messaging.message.id"]
239+
assert sampling_context["messaging.destination.name"] == "default"
249240

250241

251242
@pytest.mark.skipif(

0 commit comments

Comments
 (0)