Skip to content

Commit 1623dc0

Browse files
privatwolkexrmx
andauthored
fixed: asyncpg connection params are a namedtuple (#3253)
Follow-up on the apparently abbandonned #2114. The asyncpg instrumentation attempts to fall back on using the database name as the span name in case the first argument to the instrumented method is falsey. This has probably never worked since asyncpg defines the `_params` attribute as an instance of `ConnectionParams` (https://github.com/MagicStack/asyncpg/blob/master/asyncpg/connection.py#L62) which is a NamedTuple instance and thus don't define `get`. The proper way of safely accessing properties on a NamedTuple is using `getattr`. The only case that I've actually found which triggers this branch is if the supplied query is an empty string. This is something that causes an `AttributeError` for `Connection.execute` but is fine for `fetch()`, `fetchval()`, `fetchrow()` and `executemany()`. The tests have been expanded to check these cases. Also, more status code validation has been added where it was missing. Co-authored-by: Riccardo Magliocchetti <[email protected]>
1 parent 63e43d5 commit 1623dc0

File tree

3 files changed

+203
-41
lines changed

3 files changed

+203
-41
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2020

2121
- `opentelemetry-instrumentation-redis` Add missing entry in doc string for `def _instrument`
2222
([#3247](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3247))
23+
- `opentelemetry-instrumentation-asyncpg` Fix fallback for empty queries.
24+
([#3253](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3253))
2325

2426
## Version 1.30.0/0.51b0 (2025-02-03)
2527

instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,10 @@ def _uninstrument(self, **__):
150150

151151
async def _do_execute(self, func, instance, args, kwargs):
152152
exception = None
153-
params = getattr(instance, "_params", {})
154-
name = args[0] if args[0] else params.get("database", "postgresql")
153+
params = getattr(instance, "_params", None)
154+
name = (
155+
args[0] if args[0] else getattr(params, "database", "postgresql")
156+
)
155157

156158
try:
157159
# Strip leading comments so we get the operation name.
@@ -185,11 +187,11 @@ async def _do_execute(self, func, instance, args, kwargs):
185187
async def _do_cursor_execute(self, func, instance, args, kwargs):
186188
"""Wrap cursor based functions. For every call this will generate a new span."""
187189
exception = None
188-
params = getattr(instance._connection, "_params", {})
190+
params = getattr(instance._connection, "_params", None)
189191
name = (
190192
instance._query
191193
if instance._query
192-
else params.get("database", "postgresql")
194+
else getattr(params, "database", "postgresql")
193195
)
194196

195197
try:

tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py

+195-37
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import asyncio
22
import os
3+
from collections import namedtuple
4+
from unittest.mock import patch
35

46
import asyncpg
57

@@ -20,7 +22,26 @@ def async_call(coro):
2022
return loop.run_until_complete(coro)
2123

2224

23-
class TestFunctionalAsyncPG(TestBase):
25+
class CheckSpanMixin:
26+
def check_span(self, span, expected_db_name=POSTGRES_DB_NAME):
27+
self.assertEqual(
28+
span.attributes[SpanAttributes.DB_SYSTEM], "postgresql"
29+
)
30+
self.assertEqual(
31+
span.attributes[SpanAttributes.DB_NAME], expected_db_name
32+
)
33+
self.assertEqual(
34+
span.attributes[SpanAttributes.DB_USER], POSTGRES_USER
35+
)
36+
self.assertEqual(
37+
span.attributes[SpanAttributes.NET_PEER_NAME], POSTGRES_HOST
38+
)
39+
self.assertEqual(
40+
span.attributes[SpanAttributes.NET_PEER_PORT], POSTGRES_PORT
41+
)
42+
43+
44+
class TestFunctionalAsyncPG(TestBase, CheckSpanMixin):
2445
def setUp(self):
2546
super().setUp()
2647
self._tracer = self.tracer_provider.get_tracer(__name__)
@@ -39,25 +60,54 @@ def tearDown(self):
3960
AsyncPGInstrumentor().uninstrument()
4061
super().tearDown()
4162

42-
def check_span(self, span):
43-
self.assertEqual(
44-
span.attributes[SpanAttributes.DB_SYSTEM], "postgresql"
45-
)
46-
self.assertEqual(
47-
span.attributes[SpanAttributes.DB_NAME], POSTGRES_DB_NAME
48-
)
49-
self.assertEqual(
50-
span.attributes[SpanAttributes.DB_USER], POSTGRES_USER
51-
)
63+
def test_instrumented_execute_method_without_arguments(self, *_, **__):
64+
"""Should create a span for execute()."""
65+
async_call(self._connection.execute("SELECT 42;"))
66+
spans = self.memory_exporter.get_finished_spans()
67+
self.assertEqual(len(spans), 1)
68+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
69+
self.check_span(spans[0])
70+
self.assertEqual(spans[0].name, "SELECT")
5271
self.assertEqual(
53-
span.attributes[SpanAttributes.NET_PEER_NAME], POSTGRES_HOST
72+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
5473
)
74+
75+
def test_instrumented_execute_method_error(self, *_, **__):
76+
"""Should create an error span for execute() with the database name as the span name."""
77+
with self.assertRaises(AttributeError):
78+
async_call(self._connection.execute(""))
79+
spans = self.memory_exporter.get_finished_spans()
80+
self.assertEqual(len(spans), 1)
81+
self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
82+
self.check_span(spans[0])
83+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
84+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
85+
86+
def test_instrumented_fetch_method_without_arguments(self, *_, **__):
87+
"""Should create a span from fetch()."""
88+
async_call(self._connection.fetch("SELECT 42;"))
89+
spans = self.memory_exporter.get_finished_spans()
90+
self.assertEqual(len(spans), 1)
91+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
92+
self.check_span(spans[0])
93+
self.assertEqual(spans[0].name, "SELECT")
5594
self.assertEqual(
56-
span.attributes[SpanAttributes.NET_PEER_PORT], POSTGRES_PORT
95+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
5796
)
5897

59-
def test_instrumented_execute_method_without_arguments(self, *_, **__):
60-
async_call(self._connection.execute("SELECT 42;"))
98+
def test_instrumented_fetch_method_empty_query(self, *_, **__):
99+
"""Should create an error span for fetch() with the database name as the span name."""
100+
async_call(self._connection.fetch(""))
101+
spans = self.memory_exporter.get_finished_spans()
102+
self.assertEqual(len(spans), 1)
103+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
104+
self.check_span(spans[0])
105+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
106+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
107+
108+
def test_instrumented_fetchval_method_without_arguments(self, *_, **__):
109+
"""Should create a span for fetchval()."""
110+
async_call(self._connection.fetchval("SELECT 42;"))
61111
spans = self.memory_exporter.get_finished_spans()
62112
self.assertEqual(len(spans), 1)
63113
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
@@ -67,17 +117,105 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__):
67117
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
68118
)
69119

70-
def test_instrumented_fetch_method_without_arguments(self, *_, **__):
71-
async_call(self._connection.fetch("SELECT 42;"))
120+
def test_instrumented_fetchval_method_empty_query(self, *_, **__):
121+
"""Should create an error span for fetchval() with the database name as the span name."""
122+
async_call(self._connection.fetchval(""))
72123
spans = self.memory_exporter.get_finished_spans()
73124
self.assertEqual(len(spans), 1)
125+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
126+
self.check_span(spans[0])
127+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
128+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
129+
130+
def test_instrumented_fetchrow_method_without_arguments(self, *_, **__):
131+
"""Should create a span for fetchrow()."""
132+
async_call(self._connection.fetchrow("SELECT 42;"))
133+
spans = self.memory_exporter.get_finished_spans()
134+
self.assertEqual(len(spans), 1)
135+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
74136
self.check_span(spans[0])
75137
self.assertEqual(spans[0].name, "SELECT")
76138
self.assertEqual(
77139
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
78140
)
79141

142+
def test_instrumented_fetchrow_method_empty_query(self, *_, **__):
143+
"""Should create an error span for fetchrow() with the database name as the span name."""
144+
async_call(self._connection.fetchrow(""))
145+
spans = self.memory_exporter.get_finished_spans()
146+
self.assertEqual(len(spans), 1)
147+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
148+
self.check_span(spans[0])
149+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
150+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
151+
152+
def test_instrumented_cursor_execute_method_without_arguments(
153+
self, *_, **__
154+
):
155+
"""Should create spans for the transaction as well as the cursor fetches."""
156+
157+
async def _cursor_execute():
158+
async with self._connection.transaction():
159+
async for record in self._connection.cursor(
160+
"SELECT generate_series(0, 5);"
161+
):
162+
pass
163+
164+
async_call(_cursor_execute())
165+
spans = self.memory_exporter.get_finished_spans()
166+
167+
self.check_span(spans[0])
168+
self.assertEqual(spans[0].name, "BEGIN;")
169+
self.assertEqual(
170+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
171+
)
172+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
173+
174+
for span in spans[1:-1]:
175+
self.check_span(span)
176+
self.assertEqual(span.name, "CURSOR: SELECT")
177+
self.assertEqual(
178+
span.attributes[SpanAttributes.DB_STATEMENT],
179+
"SELECT generate_series(0, 5);",
180+
)
181+
self.assertIs(StatusCode.UNSET, span.status.status_code)
182+
183+
self.check_span(spans[-1])
184+
self.assertEqual(spans[-1].name, "COMMIT;")
185+
self.assertEqual(
186+
spans[-1].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
187+
)
188+
189+
def test_instrumented_cursor_execute_method_empty_query(self, *_, **__):
190+
"""Should create spans for the transaction and cursor fetches with the database name as the span name."""
191+
192+
async def _cursor_execute():
193+
async with self._connection.transaction():
194+
async for record in self._connection.cursor(""):
195+
pass
196+
197+
async_call(_cursor_execute())
198+
spans = self.memory_exporter.get_finished_spans()
199+
self.assertEqual(len(spans), 3)
200+
201+
self.check_span(spans[0])
202+
self.assertEqual(
203+
spans[0].attributes[SpanAttributes.DB_STATEMENT], "BEGIN;"
204+
)
205+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
206+
207+
self.check_span(spans[1])
208+
self.assertEqual(spans[1].name, f"CURSOR: {POSTGRES_DB_NAME}")
209+
self.assertEqual(spans[1].attributes[SpanAttributes.DB_STATEMENT], "")
210+
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
211+
212+
self.check_span(spans[2])
213+
self.assertEqual(
214+
spans[2].attributes[SpanAttributes.DB_STATEMENT], "COMMIT;"
215+
)
216+
80217
def test_instrumented_remove_comments(self, *_, **__):
218+
"""Should remove comments from the query and set the span name correctly."""
81219
async_call(self._connection.fetch("/* leading comment */ SELECT 42;"))
82220
async_call(
83221
self._connection.fetch(
@@ -88,25 +226,30 @@ def test_instrumented_remove_comments(self, *_, **__):
88226
spans = self.memory_exporter.get_finished_spans()
89227
self.assertEqual(len(spans), 3)
90228
self.check_span(spans[0])
229+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
91230
self.assertEqual(spans[0].name, "SELECT")
92231
self.assertEqual(
93232
spans[0].attributes[SpanAttributes.DB_STATEMENT],
94233
"/* leading comment */ SELECT 42;",
95234
)
96235
self.check_span(spans[1])
236+
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
97237
self.assertEqual(spans[1].name, "SELECT")
98238
self.assertEqual(
99239
spans[1].attributes[SpanAttributes.DB_STATEMENT],
100240
"/* leading comment */ SELECT 42; /* trailing comment */",
101241
)
102242
self.check_span(spans[2])
243+
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
103244
self.assertEqual(spans[2].name, "SELECT")
104245
self.assertEqual(
105246
spans[2].attributes[SpanAttributes.DB_STATEMENT],
106247
"SELECT 42; /* trailing comment */",
107248
)
108249

109250
def test_instrumented_transaction_method(self, *_, **__):
251+
"""Should create spans for the transaction and the inner execute()."""
252+
110253
async def _transaction_execute():
111254
async with self._connection.transaction():
112255
await self._connection.execute("SELECT 42;")
@@ -134,6 +277,8 @@ async def _transaction_execute():
134277
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
135278

136279
def test_instrumented_failed_transaction_method(self, *_, **__):
280+
"""Should create spans for the transaction as well as an error span for execute()."""
281+
137282
async def _transaction_execute():
138283
async with self._connection.transaction():
139284
await self._connection.execute("SELECT 42::uuid;")
@@ -164,6 +309,7 @@ async def _transaction_execute():
164309
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
165310

166311
def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
312+
"""Should not capture parameters when capture_parameters is False."""
167313
async_call(self._connection.execute("SELECT $1;", "1"))
168314
spans = self.memory_exporter.get_finished_spans()
169315
self.assertEqual(len(spans), 1)
@@ -174,7 +320,7 @@ def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
174320
)
175321

176322

177-
class TestFunctionalAsyncPG_CaptureParameters(TestBase):
323+
class TestFunctionalAsyncPG_CaptureParameters(TestBase, CheckSpanMixin):
178324
def setUp(self):
179325
super().setUp()
180326
self._tracer = self.tracer_provider.get_tracer(__name__)
@@ -195,24 +341,8 @@ def tearDown(self):
195341
AsyncPGInstrumentor().uninstrument()
196342
super().tearDown()
197343

198-
def check_span(self, span):
199-
self.assertEqual(
200-
span.attributes[SpanAttributes.DB_SYSTEM], "postgresql"
201-
)
202-
self.assertEqual(
203-
span.attributes[SpanAttributes.DB_NAME], POSTGRES_DB_NAME
204-
)
205-
self.assertEqual(
206-
span.attributes[SpanAttributes.DB_USER], POSTGRES_USER
207-
)
208-
self.assertEqual(
209-
span.attributes[SpanAttributes.NET_PEER_NAME], POSTGRES_HOST
210-
)
211-
self.assertEqual(
212-
span.attributes[SpanAttributes.NET_PEER_PORT], POSTGRES_PORT
213-
)
214-
215344
def test_instrumented_execute_method_with_arguments(self, *_, **__):
345+
"""Should create a span for execute() with captured parameters."""
216346
async_call(self._connection.execute("SELECT $1;", "1"))
217347
spans = self.memory_exporter.get_finished_spans()
218348
self.assertEqual(len(spans), 1)
@@ -228,6 +358,7 @@ def test_instrumented_execute_method_with_arguments(self, *_, **__):
228358
)
229359

230360
def test_instrumented_fetch_method_with_arguments(self, *_, **__):
361+
"""Should create a span for fetch() with captured parameters."""
231362
async_call(self._connection.fetch("SELECT $1;", "1"))
232363
spans = self.memory_exporter.get_finished_spans()
233364
self.assertEqual(len(spans), 1)
@@ -242,10 +373,11 @@ def test_instrumented_fetch_method_with_arguments(self, *_, **__):
242373
)
243374

244375
def test_instrumented_executemany_method_with_arguments(self, *_, **__):
376+
"""Should create a span for executemany with captured parameters."""
245377
async_call(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
246378
spans = self.memory_exporter.get_finished_spans()
247379
self.assertEqual(len(spans), 1)
248-
380+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
249381
self.check_span(spans[0])
250382
self.assertEqual(
251383
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT $1;"
@@ -255,15 +387,41 @@ def test_instrumented_executemany_method_with_arguments(self, *_, **__):
255387
)
256388

257389
def test_instrumented_execute_interface_error_method(self, *_, **__):
390+
"""Should create an error span for execute() with captured parameters."""
258391
with self.assertRaises(asyncpg.InterfaceError):
259392
async_call(self._connection.execute("SELECT 42;", 1, 2, 3))
260393
spans = self.memory_exporter.get_finished_spans()
261394
self.assertEqual(len(spans), 1)
262-
395+
self.assertIs(StatusCode.ERROR, spans[0].status.status_code)
263396
self.check_span(spans[0])
264397
self.assertEqual(
265398
spans[0].attributes[SpanAttributes.DB_STATEMENT], "SELECT 42;"
266399
)
267400
self.assertEqual(
268401
spans[0].attributes["db.statement.parameters"], "(1, 2, 3)"
269402
)
403+
404+
def test_instrumented_executemany_method_empty_query(self, *_, **__):
405+
"""Should create a span for executemany() with captured parameters."""
406+
async_call(self._connection.executemany("", []))
407+
spans = self.memory_exporter.get_finished_spans()
408+
self.assertEqual(len(spans), 1)
409+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
410+
self.check_span(spans[0])
411+
self.assertEqual(spans[0].name, POSTGRES_DB_NAME)
412+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")
413+
self.assertEqual(
414+
spans[0].attributes["db.statement.parameters"], "([],)"
415+
)
416+
417+
def test_instrumented_fetch_method_broken_asyncpg(self, *_, **__):
418+
"""Should create a span for fetch() with "postgresql" as the span name."""
419+
with patch.object(
420+
self._connection, "_params", namedtuple("ConnectionParams", [])
421+
):
422+
async_call(self._connection.fetch(""))
423+
spans = self.memory_exporter.get_finished_spans()
424+
self.assertEqual(len(spans), 1)
425+
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
426+
self.assertEqual(spans[0].name, "postgresql")
427+
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")

0 commit comments

Comments
 (0)