@@ -75,11 +75,14 @@ pub fn telem(event: events::TelemetryEvent) {
75
75
}
76
76
}
77
77
78
- fn init (
78
+ fn init < C > (
79
79
mut config : TelemetryConfig ,
80
- client : impl telemetry :: TelemetryClient + Clone + Send + Sync + ' static ,
80
+ client : impl Send + FnOnce ( ) -> Option < C > + ' static ,
81
81
color_config : ColorConfig ,
82
- ) -> Result < ( TelemetryHandle , TelemetrySender ) , Box < dyn std:: error:: Error > > {
82
+ ) -> Result < ( TelemetryHandle , TelemetrySender ) , Box < dyn std:: error:: Error > >
83
+ where
84
+ C : telemetry:: TelemetryClient + Clone + Send + Sync + ' static ,
85
+ {
83
86
let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
84
87
let ( cancel_tx, cancel_rx) = oneshot:: channel ( ) ;
85
88
config. show_alert ( color_config) ;
@@ -90,13 +93,12 @@ fn init(
90
93
buffer : Vec :: new ( ) ,
91
94
senders : FuturesUnordered :: new ( ) ,
92
95
exit_ch : cancel_tx,
93
- client,
94
96
session_id : session_id. to_string ( ) ,
95
97
telemetry_id : config. get_id ( ) . to_string ( ) ,
96
98
enabled : config. is_enabled ( ) ,
97
99
color_config,
98
100
} ;
99
- let handle = worker. start ( ) ;
101
+ let handle = worker. start ( client ) ;
100
102
101
103
let telemetry_handle = TelemetryHandle {
102
104
exit_ch : cancel_rx,
@@ -113,10 +115,13 @@ fn init(
113
115
/// We have two different types because the TelemetrySender should be shared
114
116
/// across threads (i.e. Clone + Send), while the TelemetryHandle cannot be
115
117
/// shared since it contains the structs necessary to shut down the worker.
116
- pub fn init_telemetry (
117
- client : impl telemetry :: TelemetryClient + Clone + Send + Sync + ' static ,
118
+ pub fn init_telemetry < C > (
119
+ client : impl Send + FnOnce ( ) -> Option < C > + ' static ,
118
120
color_config : ColorConfig ,
119
- ) -> Result < TelemetryHandle , Box < dyn std:: error:: Error > > {
121
+ ) -> Result < TelemetryHandle , Box < dyn std:: error:: Error > >
122
+ where
123
+ C : telemetry:: TelemetryClient + Clone + Send + Sync + ' static ,
124
+ {
120
125
// make sure we're not already initialized
121
126
if SENDER_INSTANCE . get ( ) . is_some ( ) {
122
127
debug ! ( "telemetry already initialized" ) ;
@@ -147,22 +152,29 @@ impl TelemetryHandle {
147
152
}
148
153
}
149
154
150
- struct Worker < C > {
155
+ struct Worker {
151
156
rx : mpsc:: UnboundedReceiver < TelemetryEvent > ,
152
157
buffer : Vec < TelemetryEvent > ,
153
158
senders : FuturesUnordered < JoinHandle < ( ) > > ,
154
159
// Used to cancel the worker
155
160
exit_ch : oneshot:: Sender < ( ) > ,
156
- client : C ,
157
161
telemetry_id : String ,
158
162
session_id : String ,
159
163
enabled : bool ,
160
164
color_config : ColorConfig ,
161
165
}
162
166
163
- impl < C : telemetry:: TelemetryClient + Clone + Send + Sync + ' static > Worker < C > {
164
- pub fn start ( mut self ) -> JoinHandle < ( ) > {
167
+ impl Worker {
168
+ pub fn start < C > ( mut self , client : impl FnOnce ( ) -> Option < C > + Send + ' static ) -> JoinHandle < ( ) >
169
+ where
170
+ C : telemetry:: TelemetryClient + Clone + Send + Sync + ' static ,
171
+ {
165
172
tokio:: spawn ( async move {
173
+ // Constructing a HTTPS client is almost always a blocking operation
174
+ let Ok ( Some ( client) ) = tokio:: task:: spawn_blocking ( client) . await else {
175
+ // If constructing telemetry client panics, shut down
176
+ return ;
177
+ } ;
166
178
let mut timeout = tokio:: time:: sleep ( NO_TIMEOUT ) ;
167
179
loop {
168
180
select ! {
@@ -176,22 +188,22 @@ impl<C: telemetry::TelemetryClient + Clone + Send + Sync + 'static> Worker<C> {
176
188
break ;
177
189
}
178
190
if self . buffer. len( ) == BUFFER_THRESHOLD {
179
- self . flush_events( ) ;
191
+ self . flush_events( & client ) ;
180
192
timeout = tokio:: time:: sleep( NO_TIMEOUT ) ;
181
193
} else {
182
194
timeout = tokio:: time:: sleep( EVENT_TIMEOUT ) ;
183
195
}
184
196
}
185
197
_ = timeout => {
186
- self . flush_events( ) ;
198
+ self . flush_events( & client ) ;
187
199
timeout = tokio:: time:: sleep( NO_TIMEOUT ) ;
188
200
}
189
201
_ = self . exit_ch. closed( ) => {
190
202
break ;
191
203
}
192
204
}
193
205
}
194
- self . flush_events ( ) ;
206
+ self . flush_events ( & client ) ;
195
207
while let Some ( result) = self . senders . next ( ) . await {
196
208
if let Err ( err) = result {
197
209
debug ! ( "failed to send telemetry event. error: {}" , err)
@@ -200,11 +212,18 @@ impl<C: telemetry::TelemetryClient + Clone + Send + Sync + 'static> Worker<C> {
200
212
} )
201
213
}
202
214
203
- pub fn flush_events ( & mut self ) {
215
+ pub fn flush_events < C > ( & mut self , client : & C )
216
+ where
217
+ C : telemetry:: TelemetryClient + Clone + Send + Sync + ' static ,
218
+ {
204
219
if !self . buffer . is_empty ( ) {
205
220
let events = std:: mem:: take ( & mut self . buffer ) ;
206
221
let num_events = events. len ( ) ;
207
- let handle = self . send_events ( events) ;
222
+ debug ! (
223
+ "Starting telemetry event queue flush (num_events={:?})" ,
224
+ num_events
225
+ ) ;
226
+ let handle = self . send_events ( client, events) ;
208
227
if let Some ( handle) = handle {
209
228
self . senders . push ( handle) ;
210
229
}
@@ -215,7 +234,10 @@ impl<C: telemetry::TelemetryClient + Clone + Send + Sync + 'static> Worker<C> {
215
234
}
216
235
}
217
236
218
- fn send_events ( & self , events : Vec < TelemetryEvent > ) -> Option < JoinHandle < ( ) > > {
237
+ fn send_events < C > ( & self , client : & C , events : Vec < TelemetryEvent > ) -> Option < JoinHandle < ( ) > >
238
+ where
239
+ C : telemetry:: TelemetryClient + Clone + Send + Sync + ' static ,
240
+ {
219
241
if !self . enabled {
220
242
return None ;
221
243
}
@@ -232,7 +254,7 @@ impl<C: telemetry::TelemetryClient + Clone + Send + Sync + 'static> Worker<C> {
232
254
}
233
255
}
234
256
235
- let client = self . client . clone ( ) ;
257
+ let client = client. clone ( ) ;
236
258
let session_id = self . session_id . clone ( ) ;
237
259
let telemetry_id = self . telemetry_id . clone ( ) ;
238
260
Some ( tokio:: spawn ( async move {
@@ -341,8 +363,10 @@ mod tests {
341
363
events : Default :: default ( ) ,
342
364
tx,
343
365
} ;
366
+ let client_copy = client. clone ( ) ;
367
+ let client_builder = move || Some ( client_copy) ;
344
368
345
- let result = init ( config, client . clone ( ) , ColorConfig :: new ( false ) ) ;
369
+ let result = init ( config, client_builder , ColorConfig :: new ( false ) ) ;
346
370
347
371
let ( telemetry_handle, telemetry_sender) = result. unwrap ( ) ;
348
372
@@ -381,8 +405,10 @@ mod tests {
381
405
events : Default :: default ( ) ,
382
406
tx,
383
407
} ;
408
+ let client_copy = client. clone ( ) ;
409
+ let client_builder = move || Some ( client_copy) ;
384
410
385
- let result = init ( config, client . clone ( ) , ColorConfig :: new ( false ) ) ;
411
+ let result = init ( config, client_builder , ColorConfig :: new ( false ) ) ;
386
412
387
413
let ( telemetry_handle, telemetry_sender) = result. unwrap ( ) ;
388
414
@@ -427,8 +453,10 @@ mod tests {
427
453
events : Default :: default ( ) ,
428
454
tx,
429
455
} ;
456
+ let client_copy = client. clone ( ) ;
457
+ let client_builder = move || Some ( client_copy) ;
430
458
431
- let result = init ( config, client . clone ( ) , ColorConfig :: new ( false ) ) ;
459
+ let result = init ( config, client_builder , ColorConfig :: new ( false ) ) ;
432
460
433
461
let ( telemetry_handle, telemetry_sender) = result. unwrap ( ) ;
434
462
0 commit comments