Skip to content

Commit e2e05f7

Browse files
committed
use function instead of class
1 parent f63cd29 commit e2e05f7

File tree

2 files changed

+36
-42
lines changed

2 files changed

+36
-42
lines changed

metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@
4646
parse_statement,
4747
)
4848
from datahub.utilities.cooperative_timeout import (
49-
CooperativeTimeout,
5049
CooperativeTimeoutError,
50+
cooperative_timeout,
5151
)
5252

5353
logger = logging.getLogger(__name__)
@@ -873,7 +873,7 @@ def _sqlglot_lineage_inner(
873873
column_lineage: Optional[List[_ColumnLineageInfo]] = None
874874
try:
875875
if select_statement is not None:
876-
with CooperativeTimeout(
876+
with cooperative_timeout(
877877
timeout=SQL_LINEAGE_TIMEOUT_SECONDS
878878
if SQL_LINEAGE_TIMEOUT_ENABLED
879879
else None
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1+
import contextlib
12
import threading
23
import time
3-
from contextlib import AbstractContextManager
4-
from types import TracebackType
5-
from typing import Optional, Type
4+
from typing import Iterator, Optional
65

76
_cooperation = threading.local()
87

@@ -19,51 +18,46 @@ def cooperate() -> None:
1918
raise CooperativeTimeoutError("CooperativeTimeout deadline exceeded")
2019

2120

22-
class CooperativeTimeout(AbstractContextManager):
21+
@contextlib.contextmanager
22+
def cooperative_timeout(timeout: Optional[float] = None) -> Iterator[None]:
2323
"""A cooperative timeout mechanism.
2424
25-
Getting code to time out in Python is actually rather tricky. Common approaches include:
26-
27-
- Using the signal module to set a signal handler that raises an exception
28-
after a certain time. Unfortunately, this approach only works on the main
29-
thread, and is not available on Windows.
30-
- Creating a separate process to run the code and then killing it if it hasn't
31-
finished by the deadline. This usually requires that all arguments/return
32-
types are pickleable so that they can be passed between processes. Overall,
33-
this approach is heavy-handed and can be tricky to implement correctly.
34-
- Using `threading` is not an option, since Python threads are not interruptible
35-
(unless you're willing to use some hacks https://stackoverflow.com/a/61528202).
36-
Attempting to forcibly terminate a thread can deadlock on the GIL.
37-
38-
In cases where (1) we have control over the code that we want to time out and
39-
(2) we can modify it to regularly and reliably call a specific function, we can
40-
use a cooperative timeout mechanism instead.
25+
Useful in cases where (1) we have control over the code that we want to time out
26+
and (2) we can modify it to regularly and reliably call a specific function.
4127
4228
This is not reentrant and cannot be used in nested contexts. It can be used
4329
in multi-threaded contexts, so long as the cooperative function is called
4430
from the same thread that created the timeout.
4531
4632
Args:
4733
timeout: The timeout in seconds. If None, the timeout is disabled.
34+
35+
Raises:
36+
RuntimeError: If a cooperative timeout is already active.
37+
CooperativeTimeoutError: If the cooperative timeout is exceeded.
4838
"""
4939

50-
def __init__(self, timeout: Optional[None] = None):
51-
self.timeout = timeout
52-
53-
def __enter__(self) -> "CooperativeTimeout":
54-
if hasattr(_cooperation, "deadline"):
55-
raise RuntimeError("CooperativeTimeout already active")
56-
if self.timeout is not None:
57-
_cooperation.deadline = (
58-
time.perf_counter_ns() + self.timeout * 1_000_000_000
59-
)
60-
return self
61-
62-
def __exit__(
63-
self,
64-
exc_type: Optional[Type[BaseException]],
65-
exc_val: Optional[BaseException],
66-
exc_tb: Optional[TracebackType],
67-
) -> None:
68-
if self.timeout is not None:
69-
del _cooperation.deadline
40+
# Getting code to time out in Python is actually rather tricky, and so this felt
41+
# like the most straightforward approach. Other approaches include:
42+
# - Using the signal module to set a signal handler that raises an exception
43+
# after a certain time. Unfortunately, this approach only works on the main
44+
# thread, and is not available on Windows.
45+
# - Creating a separate process to run the code and then killing it if it hasn't
46+
# finished by the deadline. This usually requires that all arguments/return
47+
# types are pickleable so that they can be passed between processes. Overall,
48+
# this approach is heavy-handed and can be tricky to implement correctly.
49+
# - Using `threading` is not an option, since Python threads are not interruptible
50+
# (unless you're willing to use some hacks https://stackoverflow.com/a/61528202).
51+
# Attempting to forcibly terminate a thread can deadlock on the GIL.
52+
53+
if hasattr(_cooperation, "deadline"):
54+
raise RuntimeError("cooperative timeout already active")
55+
56+
if timeout is not None:
57+
_cooperation.deadline = time.perf_counter_ns() + timeout * 1_000_000_000
58+
yield
59+
del _cooperation.deadline
60+
61+
else:
62+
# No-op.
63+
yield

0 commit comments

Comments
 (0)