35
35
run method or the executor's worker thread."
36
36
"""
37
37
38
+ from __future__ import annotations
39
+
38
40
import threading
39
41
from concurrent import futures
40
- from typing import Collection
42
+ from typing import TYPE_CHECKING , Any , Callable , Collection
41
43
42
- from wrapt import wrap_function_wrapper
44
+ from wrapt import (
45
+ wrap_function_wrapper , # type: ignore[reportUnknownVariableType]
46
+ )
43
47
44
48
from opentelemetry import context
45
49
from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
46
50
from opentelemetry .instrumentation .threading .package import _instruments
47
51
from opentelemetry .instrumentation .utils import unwrap
48
52
53
+ if TYPE_CHECKING :
54
+ from typing import Protocol , TypeVar
55
+
56
+ R = TypeVar ("R" )
57
+
58
+ class HasOtelContext (Protocol ):
59
+ _otel_context : context .Context
60
+
49
61
50
62
class ThreadingInstrumentor (BaseInstrumentor ):
51
63
__WRAPPER_START_METHOD = "start"
@@ -55,12 +67,12 @@ class ThreadingInstrumentor(BaseInstrumentor):
55
67
def instrumentation_dependencies (self ) -> Collection [str ]:
56
68
return _instruments
57
69
58
- def _instrument (self , ** kwargs ):
70
+ def _instrument (self , ** kwargs : Any ):
59
71
self ._instrument_thread ()
60
72
self ._instrument_timer ()
61
73
self ._instrument_thread_pool ()
62
74
63
- def _uninstrument (self , ** kwargs ):
75
+ def _uninstrument (self , ** kwargs : Any ):
64
76
self ._uninstrument_thread ()
65
77
self ._uninstrument_timer ()
66
78
self ._uninstrument_thread_pool ()
@@ -117,12 +129,22 @@ def _uninstrument_thread_pool():
117
129
)
118
130
119
131
@staticmethod
120
- def __wrap_threading_start (call_wrapped , instance , args , kwargs ):
132
+ def __wrap_threading_start (
133
+ call_wrapped : Callable [[], None ],
134
+ instance : HasOtelContext ,
135
+ args : ...,
136
+ kwargs : ...,
137
+ ) -> None :
121
138
instance ._otel_context = context .get_current ()
122
139
return call_wrapped (* args , ** kwargs )
123
140
124
141
@staticmethod
125
- def __wrap_threading_run (call_wrapped , instance , args , kwargs ):
142
+ def __wrap_threading_run (
143
+ call_wrapped : Callable [..., R ],
144
+ instance : HasOtelContext ,
145
+ args : tuple [Any , ...],
146
+ kwargs : dict [str , Any ],
147
+ ) -> R :
126
148
token = None
127
149
try :
128
150
token = context .attach (instance ._otel_context )
@@ -131,12 +153,17 @@ def __wrap_threading_run(call_wrapped, instance, args, kwargs):
131
153
context .detach (token )
132
154
133
155
@staticmethod
134
- def __wrap_thread_pool_submit (call_wrapped , instance , args , kwargs ):
156
+ def __wrap_thread_pool_submit (
157
+ call_wrapped : Callable [..., R ],
158
+ instance : futures .ThreadPoolExecutor ,
159
+ args : tuple [Callable [..., Any ], ...],
160
+ kwargs : dict [str , Any ],
161
+ ) -> R :
135
162
# obtain the original function and wrapped kwargs
136
163
original_func = args [0 ]
137
164
otel_context = context .get_current ()
138
165
139
- def wrapped_func (* func_args , ** func_kwargs ) :
166
+ def wrapped_func (* func_args : Any , ** func_kwargs : Any ) -> R :
140
167
token = None
141
168
try :
142
169
token = context .attach (otel_context )
@@ -145,5 +172,5 @@ def wrapped_func(*func_args, **func_kwargs):
145
172
context .detach (token )
146
173
147
174
# replace the original function with the wrapped function
148
- new_args = (wrapped_func ,) + args [1 :]
175
+ new_args : tuple [ Callable [..., Any ], ...] = (wrapped_func ,) + args [1 :]
149
176
return call_wrapped (* new_args , ** kwargs )
0 commit comments