1
+ from concurrent .futures import ThreadPoolExecutor
1
2
from datetime import datetime
2
- from typing import Any , TypedDict
3
+ from typing import Any , NotRequired , TypedDict
3
4
4
5
import sentry_sdk
5
6
from django .http import HttpRequest , HttpResponse
16
17
from sentry .models .project import Project
17
18
from sentry .organizations .services .organization import RpcOrganization
18
19
from sentry .search .eap .types import SearchResolverConfig
19
- from sentry .search .events .types import SnubaParams
20
+ from sentry .search .events .builder .discover import DiscoverQueryBuilder
21
+ from sentry .search .events .types import QueryBuilderConfig , SnubaParams
22
+ from sentry .snuba .dataset import Dataset
20
23
from sentry .snuba .referrer import Referrer
21
24
from sentry .snuba .spans_rpc import run_trace_query
22
25
26
+ # 1 worker each for spans, errors, performance issues
27
+ _query_thread_pool = ThreadPoolExecutor (max_workers = 3 )
28
+
23
29
24
30
class SerializedEvent (TypedDict ):
25
- children : list [ "SerializedEvent" ]
31
+ description : str
26
32
event_id : str
27
- parent_span_id : str | None
33
+ event_type : str
34
+ is_transaction : bool
28
35
project_id : int
29
36
project_slug : str
30
- start_timestamp : datetime | None
31
- end_timestamp : datetime | None
37
+ start_timestamp : datetime
32
38
transaction : str
33
- description : str
34
- duration : float
35
- is_transaction : bool
36
- op : str
37
- event_type : str
39
+ children : NotRequired [ list [ "SerializedEvent" ]]
40
+ duration : NotRequired [ float ]
41
+ end_timestamp : NotRequired [ datetime ]
42
+ op : NotRequired [ str ]
43
+ parent_span_id : NotRequired [ str | None ]
38
44
39
45
40
46
@region_silo_endpoint
41
47
class OrganizationTraceEndpoint (OrganizationEventsV2EndpointBase ):
48
+ """Replaces OrganizationEventsTraceEndpoint"""
49
+
42
50
publish_status = {
43
51
"GET" : ApiPublishStatus .PRIVATE ,
44
52
}
@@ -65,37 +73,100 @@ def get_projects(
65
73
include_all_accessible = True ,
66
74
)
67
75
68
- def serialize_rpc_span (self , span : dict [str , Any ]) -> SerializedEvent :
69
- return SerializedEvent (
70
- children = [self .serialize_rpc_span (child ) for child in span ["children" ]],
71
- event_id = span ["id" ],
72
- project_id = span ["project.id" ],
73
- project_slug = span ["project.slug" ],
74
- parent_span_id = None if span ["parent_span" ] == "0" * 16 else span ["parent_span" ],
75
- start_timestamp = span ["precise.start_ts" ],
76
- end_timestamp = span ["precise.finish_ts" ],
77
- duration = span ["span.duration" ],
78
- transaction = span ["transaction" ],
79
- is_transaction = span ["is_transaction" ],
80
- description = span ["description" ],
81
- op = span ["span.op" ],
82
- event_type = "span" ,
76
+ def serialize_rpc_event (self , event : dict [str , Any ]) -> SerializedEvent :
77
+ if event .get ("event_type" ) == "error" :
78
+ return SerializedEvent (
79
+ event_id = event ["id" ],
80
+ project_id = event ["project.id" ],
81
+ project_slug = event ["project.name" ],
82
+ start_timestamp = event ["timestamp" ],
83
+ is_transaction = False ,
84
+ transaction = event ["transaction" ],
85
+ description = event ["message" ],
86
+ event_type = "error" ,
87
+ )
88
+ elif event .get ("event_type" ) == "span" :
89
+ return SerializedEvent (
90
+ children = [self .serialize_rpc_event (child ) for child in event ["children" ]],
91
+ event_id = event ["id" ],
92
+ project_id = event ["project.id" ],
93
+ project_slug = event ["project.slug" ],
94
+ parent_span_id = None if event ["parent_span" ] == "0" * 16 else event ["parent_span" ],
95
+ start_timestamp = event ["precise.start_ts" ],
96
+ end_timestamp = event ["precise.finish_ts" ],
97
+ duration = event ["span.duration" ],
98
+ transaction = event ["transaction" ],
99
+ is_transaction = event ["is_transaction" ],
100
+ description = event ["description" ],
101
+ op = event ["span.op" ],
102
+ event_type = "span" ,
103
+ )
104
+ else :
105
+ raise Exception (f"Unknown event encountered in trace: { event .get ('event_type' )} " )
106
+
107
+ def run_errors_query (self , snuba_params : SnubaParams , trace_id : str ):
108
+ """Run an error query, getting all the errors for a given trace id"""
109
+ # TODO: replace this with EAP calls, this query is copied from the old trace view
110
+ error_query = DiscoverQueryBuilder (
111
+ Dataset .Events ,
112
+ params = {},
113
+ snuba_params = snuba_params ,
114
+ query = f"trace:{ trace_id } " ,
115
+ selected_columns = [
116
+ "id" ,
117
+ "project.name" ,
118
+ "project.id" ,
119
+ "timestamp" ,
120
+ "trace.span" ,
121
+ "transaction" ,
122
+ "issue" ,
123
+ "title" ,
124
+ "message" ,
125
+ "tags[level]" ,
126
+ ],
127
+ # Don't add timestamp to this orderby as snuba will have to split the time range up and make multiple queries
128
+ orderby = ["id" ],
129
+ limit = 10_000 ,
130
+ config = QueryBuilderConfig (
131
+ auto_fields = True ,
132
+ ),
83
133
)
134
+ result = error_query .run_query (Referrer .API_TRACE_VIEW_GET_EVENTS .value )
135
+ error_data = error_query .process_results (result )["data" ]
136
+ for event in error_data :
137
+ event ["event_type" ] = "error"
138
+ return error_data
84
139
85
140
@sentry_sdk .tracing .trace
86
141
def query_trace_data (self , snuba_params : SnubaParams , trace_id : str ) -> list [SerializedEvent ]:
87
- trace_data = run_trace_query (
88
- trace_id , snuba_params , Referrer .API_TRACE_VIEW_GET_EVENTS .value , SearchResolverConfig ()
142
+ """Queries span/error data for a given trace"""
143
+ # This is a hack, long term EAP will store both errors and performance_issues eventually but is not ready
144
+ # currently. But we want to move performance data off the old tables immediately. To keep the code simpler I'm
145
+ # parallelizing the queries here, but ideally this parallelization lives in the spans_rpc module instead
146
+ spans_future = _query_thread_pool .submit (
147
+ run_trace_query ,
148
+ trace_id ,
149
+ snuba_params ,
150
+ Referrer .API_TRACE_VIEW_GET_EVENTS .value ,
151
+ SearchResolverConfig (),
89
152
)
153
+ errors_future = _query_thread_pool .submit (self .run_errors_query , snuba_params , trace_id )
154
+ spans_data = spans_future .result ()
155
+ errors_data = errors_future .result ()
156
+
90
157
result = []
91
- id_to_event = {event ["id" ]: event for event in trace_data }
92
- for span in trace_data :
93
- if span ["parent_span" ] in id_to_event :
94
- parent = id_to_event [span ["parent_span" ]]
158
+ id_to_span = {event ["id" ]: event for event in spans_data }
159
+ id_to_error = {event ["trace.span" ]: event for event in errors_data }
160
+ for span in spans_data :
161
+ if span ["parent_span" ] in id_to_span :
162
+ parent = id_to_span [span ["parent_span" ]]
95
163
parent ["children" ].append (span )
96
164
else :
97
165
result .append (span )
98
- return [self .serialize_rpc_span (root ) for root in result ]
166
+ if span ["id" ] in id_to_error :
167
+ error = id_to_error [span ["id" ]]
168
+ span ["children" ].append (error )
169
+ return [self .serialize_rpc_event (root ) for root in result ]
99
170
100
171
def has_feature (self , organization : Organization , request : Request ) -> bool :
101
172
return bool (
0 commit comments