@@ -102,6 +102,8 @@ def response_hook(span, instance, response):
102
102
from opentelemetry .instrumentation .redis .util import (
103
103
_extract_conn_attributes ,
104
104
_format_command_args ,
105
+ _set_span_attribute_if_value ,
106
+ _value_or_none ,
105
107
)
106
108
from opentelemetry .instrumentation .redis .version import __version__
107
109
from opentelemetry .instrumentation .utils import unwrap
@@ -126,6 +128,8 @@ def response_hook(span, instance, response):
126
128
_REDIS_CLUSTER_VERSION = (4 , 1 , 0 )
127
129
_REDIS_ASYNCIO_CLUSTER_VERSION = (4 , 3 , 2 )
128
130
131
+ _FIELD_TYPES = ["NUMERIC" , "TEXT" , "GEO" , "TAG" , "VECTOR" ]
132
+
129
133
130
134
def _set_connection_attributes (span , conn ):
131
135
if not span .is_recording () or not hasattr (conn , "connection_pool" ):
@@ -138,7 +142,12 @@ def _set_connection_attributes(span, conn):
138
142
139
143
def _build_span_name (instance , cmd_args ):
140
144
if len (cmd_args ) > 0 and cmd_args [0 ]:
141
- name = cmd_args [0 ]
145
+ if cmd_args [0 ] == "FT.SEARCH" :
146
+ name = "redis.search"
147
+ elif cmd_args [0 ] == "FT.CREATE" :
148
+ name = "redis.create_index"
149
+ else :
150
+ name = cmd_args [0 ]
142
151
else :
143
152
name = instance .connection_pool .connection_kwargs .get ("db" , 0 )
144
153
return name
@@ -181,17 +190,21 @@ def _instrument(
181
190
def _traced_execute_command (func , instance , args , kwargs ):
182
191
query = _format_command_args (args )
183
192
name = _build_span_name (instance , args )
184
-
185
193
with tracer .start_as_current_span (
186
194
name , kind = trace .SpanKind .CLIENT
187
195
) as span :
188
196
if span .is_recording ():
189
197
span .set_attribute (SpanAttributes .DB_STATEMENT , query )
190
198
_set_connection_attributes (span , instance )
191
199
span .set_attribute ("db.redis.args_length" , len (args ))
200
+ if span .name == "redis.create_index" :
201
+ _add_create_attributes (span , args )
192
202
if callable (request_hook ):
193
203
request_hook (span , instance , args , kwargs )
194
204
response = func (* args , ** kwargs )
205
+ if span .is_recording ():
206
+ if span .name == "redis.search" :
207
+ _add_search_attributes (span , response , args )
195
208
if callable (response_hook ):
196
209
response_hook (span , instance , response )
197
210
return response
@@ -202,9 +215,7 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
202
215
resource ,
203
216
span_name ,
204
217
) = _build_span_meta_data_for_pipeline (instance )
205
-
206
218
exception = None
207
-
208
219
with tracer .start_as_current_span (
209
220
span_name , kind = trace .SpanKind .CLIENT
210
221
) as span :
@@ -230,6 +241,60 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
230
241
231
242
return response
232
243
244
+ def _add_create_attributes (span , args ):
245
+ _set_span_attribute_if_value (
246
+ span , "redis.create_index.index" , _value_or_none (args , 1 )
247
+ )
248
+ # According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
249
+ try :
250
+ schema_index = args .index ("SCHEMA" )
251
+ except ValueError :
252
+ return
253
+ schema = args [schema_index :]
254
+ field_attribute = ""
255
+ # Schema in format:
256
+ # [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
257
+ field_attribute = "" .join (
258
+ f"Field(name: { schema [index - 1 ]} , type: { schema [index ]} );"
259
+ for index in range (1 , len (schema ))
260
+ if schema [index ] in _FIELD_TYPES
261
+ )
262
+ _set_span_attribute_if_value (
263
+ span ,
264
+ "redis.create_index.fields" ,
265
+ field_attribute ,
266
+ )
267
+
268
+ def _add_search_attributes (span , response , args ):
269
+ _set_span_attribute_if_value (
270
+ span , "redis.search.index" , _value_or_none (args , 1 )
271
+ )
272
+ _set_span_attribute_if_value (
273
+ span , "redis.search.query" , _value_or_none (args , 2 )
274
+ )
275
+ # Parse response from search
276
+ # https://redis.io/docs/latest/commands/ft.search/
277
+ # Response in format:
278
+ # [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
279
+ # Returned documents in array format:
280
+ # [first_field_name, first_field_value, second_field_name, second_field_value ...]
281
+ number_of_returned_documents = _value_or_none (response , 0 )
282
+ _set_span_attribute_if_value (
283
+ span , "redis.search.total" , number_of_returned_documents
284
+ )
285
+ if "NOCONTENT" in args or not number_of_returned_documents :
286
+ return
287
+ for document_number in range (number_of_returned_documents ):
288
+ document_index = _value_or_none (response , 1 + 2 * document_number )
289
+ if document_index :
290
+ document = response [2 + 2 * document_number ]
291
+ for attribute_name_index in range (0 , len (document ), 2 ):
292
+ _set_span_attribute_if_value (
293
+ span ,
294
+ f"redis.search.xdoc_{ document_index } .{ document [attribute_name_index ]} " ,
295
+ document [attribute_name_index + 1 ],
296
+ )
297
+
233
298
pipeline_class = (
234
299
"BasePipeline" if redis .VERSION < (3 , 0 , 0 ) else "Pipeline"
235
300
)
0 commit comments