Skip to content

Commit f63cd29

Browse files
committed
setup cooperative timeout
1 parent e016621 commit f63cd29

File tree

3 files changed

+90
-14
lines changed

3 files changed

+90
-14
lines changed

metadata-ingestion/setup.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@
9494
sqlglot_lib = {
9595
# Using an Acryl fork of sqlglot.
9696
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
97-
"acryl-sqlglot==21.1.2.dev9",
98-
"wrapt_timeout_decorator",
97+
"acryl-sqlglot==21.1.2.dev10",
9998
}
10099

101100
sql_common = (

metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py

+20-12
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import sqlglot.optimizer.annotate_types
1313
import sqlglot.optimizer.optimizer
1414
import sqlglot.optimizer.qualify
15-
import wrapt_timeout_decorator
1615

1716
from datahub.cli.env_utils import get_boolean_env_variable
1817
from datahub.ingestion.graph.client import DataHubGraph
@@ -46,6 +45,10 @@
4645
is_dialect_instance,
4746
parse_statement,
4847
)
48+
from datahub.utilities.cooperative_timeout import (
49+
CooperativeTimeout,
50+
CooperativeTimeoutError,
51+
)
4952

5053
logger = logging.getLogger(__name__)
5154

@@ -311,9 +314,6 @@ class SqlUnderstandingError(Exception):
311314

312315

313316
# TODO: Break this up into smaller functions.
314-
@wrapt_timeout_decorator.timeout(
315-
SQL_LINEAGE_TIMEOUT_SECONDS if SQL_LINEAGE_TIMEOUT_ENABLED else None,
316-
)
317317
def _column_level_lineage( # noqa: C901
318318
statement: sqlglot.exp.Expression,
319319
dialect: sqlglot.Dialect,
@@ -873,19 +873,27 @@ def _sqlglot_lineage_inner(
873873
column_lineage: Optional[List[_ColumnLineageInfo]] = None
874874
try:
875875
if select_statement is not None:
876-
column_lineage = _column_level_lineage(
877-
select_statement,
878-
dialect=dialect,
879-
table_schemas=table_name_schema_mapping,
880-
output_table=downstream_table,
881-
default_db=default_db,
882-
default_schema=default_schema,
883-
)
876+
with CooperativeTimeout(
877+
timeout=SQL_LINEAGE_TIMEOUT_SECONDS
878+
if SQL_LINEAGE_TIMEOUT_ENABLED
879+
else None
880+
):
881+
column_lineage = _column_level_lineage(
882+
select_statement,
883+
dialect=dialect,
884+
table_schemas=table_name_schema_mapping,
885+
output_table=downstream_table,
886+
default_db=default_db,
887+
default_schema=default_schema,
888+
)
884889
except UnsupportedStatementTypeError as e:
885890
# Inject details about the outer statement type too.
886891
e.args = (f"{e.args[0]} (outer statement type: {type(statement)})",)
887892
debug_info.column_error = e
888893
logger.debug(debug_info.column_error)
894+
except CooperativeTimeoutError as e:
895+
logger.debug(f"Timed out while generating column-level lineage: {e}")
896+
debug_info.column_error = e
889897
except SqlUnderstandingError as e:
890898
logger.debug(f"Failed to generate column-level lineage: {e}", exc_info=True)
891899
debug_info.column_error = e
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import threading
2+
import time
3+
from contextlib import AbstractContextManager
4+
from types import TracebackType
5+
from typing import Optional, Type
6+
7+
_cooperation = threading.local()
8+
9+
10+
class CooperativeTimeoutError(TimeoutError):
11+
"""An exception raised when a cooperative timeout is exceeded."""
12+
13+
14+
def cooperate() -> None:
15+
"""Method to be called periodically to cooperate with the timeout mechanism."""
16+
17+
deadline = getattr(_cooperation, "deadline", None)
18+
if deadline is not None and deadline < time.perf_counter_ns():
19+
raise CooperativeTimeoutError("CooperativeTimeout deadline exceeded")
20+
21+
22+
class CooperativeTimeout(AbstractContextManager):
23+
"""A cooperative timeout mechanism.
24+
25+
Getting code to time out in Python is actually rather tricky. Common approaches include:
26+
27+
- Using the signal module to set a signal handler that raises an exception
28+
after a certain time. Unfortunately, this approach only works on the main
29+
thread, and is not available on Windows.
30+
- Creating a separate process to run the code and then killing it if it hasn't
31+
finished by the deadline. This usually requires that all arguments/return
32+
types are pickleable so that they can be passed between processes. Overall,
33+
this approach is heavy-handed and can be tricky to implement correctly.
34+
- Using `threading` is not an option, since Python threads are not interruptible
35+
(unless you're willing to use some hacks https://stackoverflow.com/a/61528202).
36+
Attempting to forcibly terminate a thread can deadlock on the GIL.
37+
38+
In cases where (1) we have control over the code that we want to time out and
39+
(2) we can modify it to regularly and reliably call a specific function, we can
40+
use a cooperative timeout mechanism instead.
41+
42+
This is not reentrant and cannot be used in nested contexts. It can be used
43+
in multi-threaded contexts, so long as the cooperative function is called
44+
from the same thread that created the timeout.
45+
46+
Args:
47+
timeout: The timeout in seconds. If None, the timeout is disabled.
48+
"""
49+
50+
def __init__(self, timeout: Optional[None] = None):
51+
self.timeout = timeout
52+
53+
def __enter__(self) -> "CooperativeTimeout":
54+
if hasattr(_cooperation, "deadline"):
55+
raise RuntimeError("CooperativeTimeout already active")
56+
if self.timeout is not None:
57+
_cooperation.deadline = (
58+
time.perf_counter_ns() + self.timeout * 1_000_000_000
59+
)
60+
return self
61+
62+
def __exit__(
63+
self,
64+
exc_type: Optional[Type[BaseException]],
65+
exc_val: Optional[BaseException],
66+
exc_tb: Optional[TracebackType],
67+
) -> None:
68+
if self.timeout is not None:
69+
del _cooperation.deadline

0 commit comments

Comments
 (0)