21
21
import io
22
22
import json
23
23
import logging
24
+ from timeit import default_timer
24
25
from typing import Any
25
26
26
27
from botocore .eventstream import EventStream
39
40
_BotoClientErrorT ,
40
41
_BotocoreInstrumentorContext ,
41
42
)
43
+ from opentelemetry .metrics import Instrument , Meter
42
44
from opentelemetry .semconv ._incubating .attributes .error_attributes import (
43
45
ERROR_TYPE ,
44
46
)
51
53
GEN_AI_REQUEST_TOP_P ,
52
54
GEN_AI_RESPONSE_FINISH_REASONS ,
53
55
GEN_AI_SYSTEM ,
56
+ GEN_AI_TOKEN_TYPE ,
54
57
GEN_AI_USAGE_INPUT_TOKENS ,
55
58
GEN_AI_USAGE_OUTPUT_TOKENS ,
56
59
GenAiOperationNameValues ,
57
60
GenAiSystemValues ,
61
+ GenAiTokenTypeValues ,
62
+ )
63
+ from opentelemetry .semconv ._incubating .metrics .gen_ai_metrics import (
64
+ GEN_AI_CLIENT_OPERATION_DURATION ,
65
+ GEN_AI_CLIENT_TOKEN_USAGE ,
58
66
)
59
67
from opentelemetry .trace .span import Span
60
68
from opentelemetry .trace .status import Status , StatusCode
61
69
62
70
_logger = logging .getLogger (__name__ )
63
71
72
+ _GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
73
+ 0.01 ,
74
+ 0.02 ,
75
+ 0.04 ,
76
+ 0.08 ,
77
+ 0.16 ,
78
+ 0.32 ,
79
+ 0.64 ,
80
+ 1.28 ,
81
+ 2.56 ,
82
+ 5.12 ,
83
+ 10.24 ,
84
+ 20.48 ,
85
+ 40.96 ,
86
+ 81.92 ,
87
+ ]
88
+
89
+ _GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [
90
+ 1 ,
91
+ 4 ,
92
+ 16 ,
93
+ 64 ,
94
+ 256 ,
95
+ 1024 ,
96
+ 4096 ,
97
+ 16384 ,
98
+ 65536 ,
99
+ 262144 ,
100
+ 1048576 ,
101
+ 4194304 ,
102
+ 16777216 ,
103
+ 67108864 ,
104
+ ]
105
+
64
106
_MODEL_ID_KEY : str = "modelId"
65
107
66
108
@@ -88,6 +130,40 @@ def should_end_span_on_exit(self):
88
130
not in self ._DONT_CLOSE_SPAN_ON_END_OPERATIONS
89
131
)
90
132
133
+ def setup_metrics (self , meter : Meter , metrics : dict [str , Instrument ]):
134
+ metrics [GEN_AI_CLIENT_OPERATION_DURATION ] = meter .create_histogram (
135
+ name = GEN_AI_CLIENT_OPERATION_DURATION ,
136
+ description = "GenAI operation duration" ,
137
+ unit = "s" ,
138
+ explicit_bucket_boundaries_advisory = _GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS ,
139
+ )
140
+ metrics [GEN_AI_CLIENT_TOKEN_USAGE ] = meter .create_histogram (
141
+ name = GEN_AI_CLIENT_TOKEN_USAGE ,
142
+ description = "Measures number of input and output tokens used" ,
143
+ unit = "{token}" ,
144
+ explicit_bucket_boundaries_advisory = _GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS ,
145
+ )
146
+
147
+ def _extract_metrics_attributes (self ) -> _AttributeMapT :
148
+ attributes = {GEN_AI_SYSTEM : GenAiSystemValues .AWS_BEDROCK .value }
149
+
150
+ model_id = self ._call_context .params .get (_MODEL_ID_KEY )
151
+ if not model_id :
152
+ return attributes
153
+
154
+ attributes [GEN_AI_REQUEST_MODEL ] = model_id
155
+
156
+ # titan in invoke model is a text completion one
157
+ if "body" in self ._call_context .params and "amazon.titan" in model_id :
158
+ attributes [GEN_AI_OPERATION_NAME ] = (
159
+ GenAiOperationNameValues .TEXT_COMPLETION .value
160
+ )
161
+ else :
162
+ attributes [GEN_AI_OPERATION_NAME ] = (
163
+ GenAiOperationNameValues .CHAT .value
164
+ )
165
+ return attributes
166
+
91
167
def extract_attributes (self , attributes : _AttributeMapT ):
92
168
if self ._call_context .operation not in self ._HANDLED_OPERATIONS :
93
169
return
@@ -251,16 +327,18 @@ def before_service_call(
251
327
for event in message_to_event (message , capture_content ):
252
328
event_logger .emit (event )
253
329
254
- if not span .is_recording ():
255
- return
330
+ if span .is_recording ():
331
+ operation_name = span .attributes .get (GEN_AI_OPERATION_NAME , "" )
332
+ request_model = span .attributes .get (GEN_AI_REQUEST_MODEL , "" )
333
+ # avoid setting to an empty string if are not available
334
+ if operation_name and request_model :
335
+ span .update_name (f"{ operation_name } { request_model } " )
256
336
257
- operation_name = span .attributes .get (GEN_AI_OPERATION_NAME , "" )
258
- request_model = span .attributes .get (GEN_AI_REQUEST_MODEL , "" )
259
- # avoid setting to an empty string if are not available
260
- if operation_name and request_model :
261
- span .update_name (f"{ operation_name } { request_model } " )
337
+ # this is used to calculate the operation duration metric, duration may be skewed by request_hook
338
+ # pylint: disable=attribute-defined-outside-init
339
+ self ._operation_start = default_timer ()
262
340
263
- # pylint: disable=no-self-use
341
+ # pylint: disable=no-self-use,too-many-locals
264
342
def _converse_on_success (
265
343
self ,
266
344
span : Span ,
@@ -300,6 +378,37 @@ def _converse_on_success(
300
378
)
301
379
)
302
380
381
+ metrics = instrumentor_context .metrics
382
+ metrics_attributes = self ._extract_metrics_attributes ()
383
+ if operation_duration_histogram := metrics .get (
384
+ GEN_AI_CLIENT_OPERATION_DURATION
385
+ ):
386
+ duration = max ((default_timer () - self ._operation_start ), 0 )
387
+ operation_duration_histogram .record (
388
+ duration ,
389
+ attributes = metrics_attributes ,
390
+ )
391
+
392
+ if token_usage_histogram := metrics .get (GEN_AI_CLIENT_TOKEN_USAGE ):
393
+ if usage := result .get ("usage" ):
394
+ if input_tokens := usage .get ("inputTokens" ):
395
+ input_attributes = {
396
+ ** metrics_attributes ,
397
+ GEN_AI_TOKEN_TYPE : GenAiTokenTypeValues .INPUT .value ,
398
+ }
399
+ token_usage_histogram .record (
400
+ input_tokens , input_attributes
401
+ )
402
+
403
+ if output_tokens := usage .get ("outputTokens" ):
404
+ output_attributes = {
405
+ ** metrics_attributes ,
406
+ GEN_AI_TOKEN_TYPE : GenAiTokenTypeValues .COMPLETION .value ,
407
+ }
408
+ token_usage_histogram .record (
409
+ output_tokens , output_attributes
410
+ )
411
+
303
412
def _invoke_model_on_success (
304
413
self ,
305
414
span : Span ,
@@ -338,12 +447,31 @@ def _invoke_model_on_success(
338
447
if original_body is not None :
339
448
original_body .close ()
340
449
341
- def _on_stream_error_callback (self , span : Span , exception ):
450
+ def _on_stream_error_callback (
451
+ self ,
452
+ span : Span ,
453
+ exception ,
454
+ instrumentor_context : _BotocoreInstrumentorContext ,
455
+ ):
342
456
span .set_status (Status (StatusCode .ERROR , str (exception )))
343
457
if span .is_recording ():
344
458
span .set_attribute (ERROR_TYPE , type (exception ).__qualname__ )
345
459
span .end ()
346
460
461
+ metrics = instrumentor_context .metrics
462
+ metrics_attributes = {
463
+ ** self ._extract_metrics_attributes (),
464
+ ERROR_TYPE : type (exception ).__qualname__ ,
465
+ }
466
+ if operation_duration_histogram := metrics .get (
467
+ GEN_AI_CLIENT_OPERATION_DURATION
468
+ ):
469
+ duration = max ((default_timer () - self ._operation_start ), 0 )
470
+ operation_duration_histogram .record (
471
+ duration ,
472
+ attributes = metrics_attributes ,
473
+ )
474
+
347
475
def on_success (
348
476
self ,
349
477
span : Span ,
@@ -367,7 +495,9 @@ def stream_done_callback(response):
367
495
span .end ()
368
496
369
497
def stream_error_callback (exception ):
370
- self ._on_stream_error_callback (span , exception )
498
+ self ._on_stream_error_callback (
499
+ span , exception , instrumentor_context
500
+ )
371
501
372
502
result ["stream" ] = ConverseStreamWrapper (
373
503
result ["stream" ],
@@ -405,7 +535,9 @@ def invoke_model_stream_done_callback(response):
405
535
span .end ()
406
536
407
537
def invoke_model_stream_error_callback (exception ):
408
- self ._on_stream_error_callback (span , exception )
538
+ self ._on_stream_error_callback (
539
+ span , exception , instrumentor_context
540
+ )
409
541
410
542
result ["body" ] = InvokeModelWithResponseStreamWrapper (
411
543
result ["body" ],
@@ -415,7 +547,7 @@ def invoke_model_stream_error_callback(exception):
415
547
)
416
548
return
417
549
418
- # pylint: disable=no-self-use
550
+ # pylint: disable=no-self-use,too-many-locals
419
551
def _handle_amazon_titan_response (
420
552
self ,
421
553
span : Span ,
@@ -445,7 +577,38 @@ def _handle_amazon_titan_response(
445
577
)
446
578
event_logger .emit (choice .to_choice_event ())
447
579
448
- # pylint: disable=no-self-use
580
+ metrics = instrumentor_context .metrics
581
+ metrics_attributes = self ._extract_metrics_attributes ()
582
+ if operation_duration_histogram := metrics .get (
583
+ GEN_AI_CLIENT_OPERATION_DURATION
584
+ ):
585
+ duration = max ((default_timer () - self ._operation_start ), 0 )
586
+ operation_duration_histogram .record (
587
+ duration ,
588
+ attributes = metrics_attributes ,
589
+ )
590
+
591
+ if token_usage_histogram := metrics .get (GEN_AI_CLIENT_TOKEN_USAGE ):
592
+ if input_tokens := response_body .get ("inputTextTokenCount" ):
593
+ input_attributes = {
594
+ ** metrics_attributes ,
595
+ GEN_AI_TOKEN_TYPE : GenAiTokenTypeValues .INPUT .value ,
596
+ }
597
+ token_usage_histogram .record (
598
+ input_tokens , input_attributes
599
+ )
600
+
601
+ if results := response_body .get ("results" ):
602
+ if output_tokens := results [0 ].get ("tokenCount" ):
603
+ output_attributes = {
604
+ ** metrics_attributes ,
605
+ GEN_AI_TOKEN_TYPE : GenAiTokenTypeValues .COMPLETION .value ,
606
+ }
607
+ token_usage_histogram .record (
608
+ output_tokens , output_attributes
609
+ )
610
+
611
+ # pylint: disable=no-self-use,too-many-locals
449
612
def _handle_amazon_nova_response (
450
613
self ,
451
614
span : Span ,
@@ -472,6 +635,37 @@ def _handle_amazon_nova_response(
472
635
choice = _Choice .from_converse (response_body , capture_content )
473
636
event_logger .emit (choice .to_choice_event ())
474
637
638
+ metrics = instrumentor_context .metrics
639
+ metrics_attributes = self ._extract_metrics_attributes ()
640
+ if operation_duration_histogram := metrics .get (
641
+ GEN_AI_CLIENT_OPERATION_DURATION
642
+ ):
643
+ duration = max ((default_timer () - self ._operation_start ), 0 )
644
+ operation_duration_histogram .record (
645
+ duration ,
646
+ attributes = metrics_attributes ,
647
+ )
648
+
649
+ if token_usage_histogram := metrics .get (GEN_AI_CLIENT_TOKEN_USAGE ):
650
+ if usage := response_body .get ("usage" ):
651
+ if input_tokens := usage .get ("inputTokens" ):
652
+ input_attributes = {
653
+ ** metrics_attributes ,
654
+ GEN_AI_TOKEN_TYPE : GenAiTokenTypeValues .INPUT .value ,
655
+ }
656
+ token_usage_histogram .record (
657
+ input_tokens , input_attributes
658
+ )
659
+
660
+ if output_tokens := usage .get ("outputTokens" ):
661
+ output_attributes = {
662
+ ** metrics_attributes ,
663
+ GEN_AI_TOKEN_TYPE : GenAiTokenTypeValues .COMPLETION .value ,
664
+ }
665
+ token_usage_histogram .record (
666
+ output_tokens , output_attributes
667
+ )
668
+
475
669
# pylint: disable=no-self-use
476
670
def _handle_anthropic_claude_response (
477
671
self ,
@@ -500,6 +694,37 @@ def _handle_anthropic_claude_response(
500
694
)
501
695
event_logger .emit (choice .to_choice_event ())
502
696
697
+ metrics = instrumentor_context .metrics
698
+ metrics_attributes = self ._extract_metrics_attributes ()
699
+ if operation_duration_histogram := metrics .get (
700
+ GEN_AI_CLIENT_OPERATION_DURATION
701
+ ):
702
+ duration = max ((default_timer () - self ._operation_start ), 0 )
703
+ operation_duration_histogram .record (
704
+ duration ,
705
+ attributes = metrics_attributes ,
706
+ )
707
+
708
+ if token_usage_histogram := metrics .get (GEN_AI_CLIENT_TOKEN_USAGE ):
709
+ if usage := response_body .get ("usage" ):
710
+ if input_tokens := usage .get ("input_tokens" ):
711
+ input_attributes = {
712
+ ** metrics_attributes ,
713
+ GEN_AI_TOKEN_TYPE : GenAiTokenTypeValues .INPUT .value ,
714
+ }
715
+ token_usage_histogram .record (
716
+ input_tokens , input_attributes
717
+ )
718
+
719
+ if output_tokens := usage .get ("output_tokens" ):
720
+ output_attributes = {
721
+ ** metrics_attributes ,
722
+ GEN_AI_TOKEN_TYPE : GenAiTokenTypeValues .COMPLETION .value ,
723
+ }
724
+ token_usage_histogram .record (
725
+ output_tokens , output_attributes
726
+ )
727
+
503
728
def on_error (
504
729
self ,
505
730
span : Span ,
@@ -515,3 +740,17 @@ def on_error(
515
740
516
741
if not self .should_end_span_on_exit ():
517
742
span .end ()
743
+
744
+ metrics = instrumentor_context .metrics
745
+ metrics_attributes = {
746
+ ** self ._extract_metrics_attributes (),
747
+ ERROR_TYPE : type (exception ).__qualname__ ,
748
+ }
749
+ if operation_duration_histogram := metrics .get (
750
+ GEN_AI_CLIENT_OPERATION_DURATION
751
+ ):
752
+ duration = max ((default_timer () - self ._operation_start ), 0 )
753
+ operation_duration_histogram .record (
754
+ duration ,
755
+ attributes = metrics_attributes ,
756
+ )
0 commit comments