Skip to content

Commit b1f714e

Browse files
authored
botocore: send choice events for bedrock chat completion (#3275)
* botocore: send choice events for bedrock chat completion * Please pylint * Add CHANGELOG * Always call done stream callback * Move choice event creation to _Choice and add types
1 parent 3c2599c commit b1f714e

File tree

5 files changed

+456
-97
lines changed

5 files changed

+456
-97
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919
([#3258](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3258))
2020
- `opentelemetry-instrumentation-botocore` Add support for GenAI system events
2121
([#3266](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3266))
22+
- `opentelemetry-instrumentation-botocore` Add support for GenAI choice events
23+
([#3275](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3275))
2224

2325
### Fixed
2426

instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock.py

+143-73
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import (
3030
ConverseStreamWrapper,
3131
InvokeModelWithResponseStreamWrapper,
32+
_Choice,
3233
genai_capture_message_content,
3334
message_to_event,
3435
)
@@ -242,12 +243,12 @@ def before_service_call(
242243
if self._call_context.operation not in self._HANDLED_OPERATIONS:
243244
return
244245

245-
_capture_content = genai_capture_message_content()
246+
capture_content = genai_capture_message_content()
246247

247248
messages = self._get_request_messages()
248249
for message in messages:
249250
event_logger = instrumentor_context.event_logger
250-
event_logger.emit(message_to_event(message, _capture_content))
251+
event_logger.emit(message_to_event(message, capture_content))
251252

252253
if not span.is_recording():
253254
return
@@ -259,27 +260,52 @@ def before_service_call(
259260
span.update_name(f"{operation_name} {request_model}")
260261

261262
# pylint: disable=no-self-use
262-
def _converse_on_success(self, span: Span, result: dict[str, Any]):
263-
if usage := result.get("usage"):
264-
if input_tokens := usage.get("inputTokens"):
265-
span.set_attribute(
266-
GEN_AI_USAGE_INPUT_TOKENS,
267-
input_tokens,
268-
)
269-
if output_tokens := usage.get("outputTokens"):
263+
def _converse_on_success(
264+
self,
265+
span: Span,
266+
result: dict[str, Any],
267+
instrumentor_context: _BotocoreInstrumentorContext,
268+
capture_content,
269+
):
270+
if span.is_recording():
271+
if usage := result.get("usage"):
272+
if input_tokens := usage.get("inputTokens"):
273+
span.set_attribute(
274+
GEN_AI_USAGE_INPUT_TOKENS,
275+
input_tokens,
276+
)
277+
if output_tokens := usage.get("outputTokens"):
278+
span.set_attribute(
279+
GEN_AI_USAGE_OUTPUT_TOKENS,
280+
output_tokens,
281+
)
282+
283+
if stop_reason := result.get("stopReason"):
270284
span.set_attribute(
271-
GEN_AI_USAGE_OUTPUT_TOKENS,
272-
output_tokens,
285+
GEN_AI_RESPONSE_FINISH_REASONS,
286+
[stop_reason],
273287
)
274288

275-
if stop_reason := result.get("stopReason"):
276-
span.set_attribute(
277-
GEN_AI_RESPONSE_FINISH_REASONS,
278-
[stop_reason],
289+
event_logger = instrumentor_context.event_logger
290+
choice = _Choice.from_converse(result, capture_content)
291+
# this path is used by streaming apis, in that case we are already out of the span
292+
# context so need to add the span context manually
293+
span_ctx = span.get_span_context()
294+
event_logger.emit(
295+
choice.to_choice_event(
296+
trace_id=span_ctx.trace_id,
297+
span_id=span_ctx.span_id,
298+
trace_flags=span_ctx.trace_flags,
279299
)
300+
)
280301

281302
def _invoke_model_on_success(
282-
self, span: Span, result: dict[str, Any], model_id: str
303+
self,
304+
span: Span,
305+
result: dict[str, Any],
306+
model_id: str,
307+
instrumentor_context: _BotocoreInstrumentorContext,
308+
capture_content,
283309
):
284310
original_body = None
285311
try:
@@ -292,12 +318,17 @@ def _invoke_model_on_success(
292318

293319
response_body = json.loads(body_content.decode("utf-8"))
294320
if "amazon.titan" in model_id:
295-
self._handle_amazon_titan_response(span, response_body)
321+
self._handle_amazon_titan_response(
322+
span, response_body, instrumentor_context, capture_content
323+
)
296324
elif "amazon.nova" in model_id:
297-
self._handle_amazon_nova_response(span, response_body)
325+
self._handle_amazon_nova_response(
326+
span, response_body, instrumentor_context, capture_content
327+
)
298328
elif "anthropic.claude" in model_id:
299-
self._handle_anthropic_claude_response(span, response_body)
300-
329+
self._handle_anthropic_claude_response(
330+
span, response_body, instrumentor_context, capture_content
331+
)
301332
except json.JSONDecodeError:
302333
_logger.debug("Error: Unable to parse the response body as JSON")
303334
except Exception as exc: # pylint: disable=broad-exception-caught
@@ -321,80 +352,105 @@ def on_success(
321352
if self._call_context.operation not in self._HANDLED_OPERATIONS:
322353
return
323354

324-
if not span.is_recording():
325-
if not self.should_end_span_on_exit():
326-
span.end()
327-
return
355+
capture_content = genai_capture_message_content()
328356

329-
# ConverseStream
330-
if "stream" in result and isinstance(result["stream"], EventStream):
357+
if self._call_context.operation == "ConverseStream":
358+
if "stream" in result and isinstance(
359+
result["stream"], EventStream
360+
):
331361

332-
def stream_done_callback(response):
333-
self._converse_on_success(span, response)
334-
span.end()
362+
def stream_done_callback(response):
363+
self._converse_on_success(
364+
span, response, instrumentor_context, capture_content
365+
)
366+
span.end()
335367

336-
def stream_error_callback(exception):
337-
self._on_stream_error_callback(span, exception)
368+
def stream_error_callback(exception):
369+
self._on_stream_error_callback(span, exception)
338370

339-
result["stream"] = ConverseStreamWrapper(
340-
result["stream"], stream_done_callback, stream_error_callback
371+
result["stream"] = ConverseStreamWrapper(
372+
result["stream"],
373+
stream_done_callback,
374+
stream_error_callback,
375+
)
376+
return
377+
elif self._call_context.operation == "Converse":
378+
self._converse_on_success(
379+
span, result, instrumentor_context, capture_content
341380
)
342-
return
343-
344-
# Converse
345-
self._converse_on_success(span, result)
346381

347382
model_id = self._call_context.params.get(_MODEL_ID_KEY)
348383
if not model_id:
349384
return
350385

351-
# InvokeModel
352-
if "body" in result and isinstance(result["body"], StreamingBody):
353-
self._invoke_model_on_success(span, result, model_id)
354-
return
355-
356-
# InvokeModelWithResponseStream
357-
if "body" in result and isinstance(result["body"], EventStream):
358-
359-
def invoke_model_stream_done_callback(response):
360-
# the callback gets data formatted as the simpler converse API
361-
self._converse_on_success(span, response)
362-
span.end()
386+
if self._call_context.operation == "InvokeModel":
387+
if "body" in result and isinstance(result["body"], StreamingBody):
388+
self._invoke_model_on_success(
389+
span,
390+
result,
391+
model_id,
392+
instrumentor_context,
393+
capture_content,
394+
)
395+
return
396+
elif self._call_context.operation == "InvokeModelWithResponseStream":
397+
if "body" in result and isinstance(result["body"], EventStream):
398+
399+
def invoke_model_stream_done_callback(response):
400+
# the callback gets data formatted as the simpler converse API
401+
self._converse_on_success(
402+
span, response, instrumentor_context, capture_content
403+
)
404+
span.end()
363405

364-
def invoke_model_stream_error_callback(exception):
365-
self._on_stream_error_callback(span, exception)
406+
def invoke_model_stream_error_callback(exception):
407+
self._on_stream_error_callback(span, exception)
366408

367-
result["body"] = InvokeModelWithResponseStreamWrapper(
368-
result["body"],
369-
invoke_model_stream_done_callback,
370-
invoke_model_stream_error_callback,
371-
model_id,
372-
)
373-
return
409+
result["body"] = InvokeModelWithResponseStreamWrapper(
410+
result["body"],
411+
invoke_model_stream_done_callback,
412+
invoke_model_stream_error_callback,
413+
model_id,
414+
)
415+
return
374416

375417
# pylint: disable=no-self-use
376418
def _handle_amazon_titan_response(
377-
self, span: Span, response_body: dict[str, Any]
419+
self,
420+
span: Span,
421+
response_body: dict[str, Any],
422+
instrumentor_context: _BotocoreInstrumentorContext,
423+
capture_content: bool,
378424
):
379425
if "inputTextTokenCount" in response_body:
380426
span.set_attribute(
381427
GEN_AI_USAGE_INPUT_TOKENS, response_body["inputTextTokenCount"]
382428
)
383-
if "results" in response_body and response_body["results"]:
384-
result = response_body["results"][0]
385-
if "tokenCount" in result:
386-
span.set_attribute(
387-
GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"]
388-
)
389-
if "completionReason" in result:
390-
span.set_attribute(
391-
GEN_AI_RESPONSE_FINISH_REASONS,
392-
[result["completionReason"]],
393-
)
429+
if "results" in response_body and response_body["results"]:
430+
result = response_body["results"][0]
431+
if "tokenCount" in result:
432+
span.set_attribute(
433+
GEN_AI_USAGE_OUTPUT_TOKENS, result["tokenCount"]
434+
)
435+
if "completionReason" in result:
436+
span.set_attribute(
437+
GEN_AI_RESPONSE_FINISH_REASONS,
438+
[result["completionReason"]],
439+
)
440+
441+
event_logger = instrumentor_context.event_logger
442+
choice = _Choice.from_invoke_amazon_titan(
443+
response_body, capture_content
444+
)
445+
event_logger.emit(choice.to_choice_event())
394446

395447
# pylint: disable=no-self-use
396448
def _handle_amazon_nova_response(
397-
self, span: Span, response_body: dict[str, Any]
449+
self,
450+
span: Span,
451+
response_body: dict[str, Any],
452+
instrumentor_context: _BotocoreInstrumentorContext,
453+
capture_content: bool,
398454
):
399455
if "usage" in response_body:
400456
usage = response_body["usage"]
@@ -411,9 +467,17 @@ def _handle_amazon_nova_response(
411467
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stopReason"]]
412468
)
413469

470+
event_logger = instrumentor_context.event_logger
471+
choice = _Choice.from_converse(response_body, capture_content)
472+
event_logger.emit(choice.to_choice_event())
473+
414474
# pylint: disable=no-self-use
415475
def _handle_anthropic_claude_response(
416-
self, span: Span, response_body: dict[str, Any]
476+
self,
477+
span: Span,
478+
response_body: dict[str, Any],
479+
instrumentor_context: _BotocoreInstrumentorContext,
480+
capture_content: bool,
417481
):
418482
if usage := response_body.get("usage"):
419483
if "input_tokens" in usage:
@@ -429,6 +493,12 @@ def _handle_anthropic_claude_response(
429493
GEN_AI_RESPONSE_FINISH_REASONS, [response_body["stop_reason"]]
430494
)
431495

496+
event_logger = instrumentor_context.event_logger
497+
choice = _Choice.from_invoke_anthropic_claude(
498+
response_body, capture_content
499+
)
500+
event_logger.emit(choice.to_choice_event())
501+
432502
def on_error(
433503
self,
434504
span: Span,

0 commit comments

Comments
 (0)