26
26
logger = logging .getLogger (__name__ )
27
27
28
28
29
- def process_message (message : Message [KafkaPayload ]) -> list [Span ]:
30
- value = message .payload .value
31
- segment = BUFFERED_SEGMENT_SCHEMA .decode (value )
32
- return process_segment (cast (list [Span ], segment ["spans" ]))
33
-
34
-
35
- def _process_message (message : Message [KafkaPayload ]) -> list [Span ]:
36
- if not options .get ("standalone-spans.process-segments-consumer.enable" ):
37
- return []
38
-
39
- try :
40
- return process_message (message )
41
- except Exception : # NOQA
42
- raise
43
- # TODO: Implement error handling
44
- # sentry_sdk.capture_exception()
45
- # assert isinstance(message.value, BrokerValue)
46
- # raise InvalidMessage(message.value.partition, message.value.offset)
47
-
48
-
49
- def explode_segment (message : tuple [list [Span ], Mapping [Partition , int ]]):
50
- spans , committable = message
51
- last = len (spans ) - 1
52
- for i , span in enumerate (spans ):
53
- if span is not None :
54
- yield Value (
55
- payload = KafkaPayload (key = None , value = orjson .dumps (span ), headers = []),
56
- committable = committable if i == last else {},
57
- timestamp = None ,
58
- )
59
-
60
-
61
29
class DetectPerformanceIssuesStrategyFactory (ProcessingStrategyFactory [KafkaPayload ]):
62
30
def __init__ (
63
31
self ,
@@ -104,7 +72,7 @@ def create_with_partitions(
104
72
# the generator function.
105
73
zip_commit = RunTask (
106
74
function = lambda m : (m .payload , m .committable ),
107
- next_step = Unfold (generator = explode_segment , next_step = produce_step ),
75
+ next_step = Unfold (generator = _unfold_segment , next_step = produce_step ),
108
76
)
109
77
110
78
return run_task_with_multiprocessing (
@@ -119,3 +87,31 @@ def create_with_partitions(
119
87
120
88
def shutdown (self ):
121
89
self .pool .close ()
90
+
91
+
92
+ def _process_message (message : Message [KafkaPayload ]) -> list [Span ]:
93
+ if not options .get ("standalone-spans.process-segments-consumer.enable" ):
94
+ return []
95
+
96
+ try :
97
+ value = message .payload .value
98
+ segment = BUFFERED_SEGMENT_SCHEMA .decode (value )
99
+ return process_segment (cast (list [Span ], segment ["spans" ]))
100
+ except Exception : # NOQA
101
+ raise
102
+ # TODO: Implement error handling
103
+ # sentry_sdk.capture_exception()
104
+ # assert isinstance(message.value, BrokerValue)
105
+ # raise InvalidMessage(message.value.partition, message.value.offset)
106
+
107
+
108
+ def _unfold_segment (message : tuple [list [Span ], Mapping [Partition , int ]]):
109
+ spans , committable = message
110
+ last = len (spans ) - 1
111
+ for i , span in enumerate (spans ):
112
+ if span is not None :
113
+ yield Value (
114
+ payload = KafkaPayload (key = None , value = orjson .dumps (span ), headers = []),
115
+ committable = committable if i == last else {},
116
+ timestamp = None ,
117
+ )
0 commit comments