Skip to content

Commit f16d23f

Browse files
committed
refactor(metadata-ingestion): add cachetool to operation_config
1 parent 5eaf6ce commit f16d23f

File tree

4 files changed

+19
-23
lines changed

4 files changed

+19
-23
lines changed

metadata-ingestion/setup.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@
251251
# Iceberg Python SDK
252252
# Kept at 0.4.0 due to higher versions requiring pydantic>2, as soon as we are fine with it, bump this dependency
253253
"pyiceberg>=0.4.0",
254+
*cachetools_lib,
254255
}
255256

256257
mssql_common = {
@@ -404,13 +405,14 @@
404405
# UnsupportedProductError
405406
# https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/release-notes.html#rn-7-14-0
406407
# https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433
407-
"elasticsearch": {"elasticsearch==7.13.4"},
408+
"elasticsearch": {"elasticsearch==7.13.4", *cachetools_lib},
408409
"cassandra": {
409410
"cassandra-driver>=3.28.0",
410411
# We were seeing an error like this `numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject`
411412
# with numpy 2.0. This likely indicates a mismatch between scikit-learn and numpy versions.
412413
# https://stackoverflow.com/questions/40845304/runtimewarning-numpy-dtype-size-changed-may-indicate-binary-incompatibility
413414
"numpy<2",
415+
cachetools_lib,
414416
},
415417
"feast": {
416418
"feast>=0.34.0,<1",
@@ -422,7 +424,7 @@
422424
"numpy<2",
423425
},
424426
"grafana": {"requests"},
425-
"glue": aws_common,
427+
"glue": aws_common | cachetools_lib,
426428
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
427429
"hana": sql_common
428430
| {
@@ -479,11 +481,11 @@
479481
| classification_lib
480482
| {"db-dtypes"} # Pandas extension data types
481483
| cachetools_lib,
482-
"s3": {*s3_base, *data_lake_profiling},
484+
"s3": {*s3_base, *data_lake_profiling, *cachetools_lib},
483485
"gcs": {*s3_base, *data_lake_profiling},
484-
"abs": {*abs_base, *data_lake_profiling},
486+
"abs": {*abs_base, *data_lake_profiling, *cachetools_lib},
485487
"sagemaker": aws_common,
486-
"salesforce": {"simple-salesforce"},
488+
"salesforce": {"simple-salesforce", *cachetools_lib},
487489
"snowflake": snowflake_common | usage_common | sqlglot_lib,
488490
"snowflake-summary": snowflake_common | usage_common | sqlglot_lib,
489491
"snowflake-queries": snowflake_common | usage_common | sqlglot_lib,

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py

-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from pydantic import Field, validator
77
from pyiceberg.catalog import Catalog, load_catalog
88
from sortedcontainers import SortedList
9-
from functools import lru_cache
109
from datahub.configuration.common import AllowDenyPattern, ConfigModel
1110
from datahub.configuration.source_common import DatasetSourceConfigMixin
1211
from datahub.ingestion.source.state.stale_entity_removal_handler import (
@@ -125,7 +124,6 @@ def validate_catalog_size(cls, value):
125124

126125
return value
127126

128-
@lru_cache()
129127
def is_profiling_enabled(self) -> bool:
130128
return self.profiling.enabled and is_profiling_enabled(
131129
self.profiling.operation_config

metadata-ingestion/src/datahub/ingestion/source/sql/sql_config.py

+3-10
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from abc import abstractmethod
33
from typing import Any, Dict, Optional
44

5-
import cachetools
5+
66
import cachetools.keys
77
import pydantic
88
from pydantic import Field
@@ -29,7 +29,7 @@
2929
StatefulIngestionConfigBase,
3030
)
3131
from datahub.ingestion.source_config.operation_config import is_profiling_enabled
32-
from datahub.utilities.cachetools_keys import self_methodkey
32+
3333

3434
logger: logging.Logger = logging.getLogger(__name__)
3535

@@ -117,14 +117,7 @@ class SQLCommonConfig(
117117
profiling: GEProfilingConfig = GEProfilingConfig()
118118
# Custom Stateful Ingestion settings
119119
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
120-
121-
# TRICKY: The operation_config is time-dependent. Because we don't want to change
122-
# whether or not we're running profiling mid-ingestion, we cache the result of this method.
123-
# TODO: This decorator should be moved to the is_profiling_enabled(operation_config) method.
124-
@cachetools.cached(
125-
cache=cachetools.LRUCache(maxsize=1),
126-
key=self_methodkey,
127-
)
120+
128121
def is_profiling_enabled(self) -> bool:
129122
return self.profiling.enabled and is_profiling_enabled(
130123
self.profiling.operation_config

metadata-ingestion/src/datahub/ingestion/source_config/operation_config.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import datetime
22
import logging
33
from typing import Any, Dict, Optional
4-
4+
import cachetools
55
import pydantic
66
from pydantic.fields import Field
7-
7+
from datahub.utilities.cachetools_keys import self_methodkey
88
from datahub.configuration.common import ConfigModel
99

1010
logger = logging.getLogger(__name__)
@@ -61,25 +61,28 @@ def validate_profile_date_of_month(cls, v: Optional[int]) -> Optional[int]:
6161
)
6262
return profile_date_of_month
6363

64-
64+
@cachetools.cached(
65+
cache=cachetools.LRUCache(maxsize=1),
66+
key=self_methodkey,
67+
)
6568
def is_profiling_enabled(operation_config: OperationConfig) -> bool:
6669
if operation_config.lower_freq_profile_enabled is False:
6770
return True
68-
logger.debug("Lower freq profiling setting is enabled.")
71+
logger.info("Lower freq profiling setting is enabled.")
6972
today = datetime.date.today()
7073
if (
7174
operation_config.profile_day_of_week is not None
7275
and operation_config.profile_day_of_week != today.weekday()
7376
):
74-
logger.debug(
77+
logger.info(
7578
"Profiling won't be done because weekday does not match config profile_day_of_week.",
7679
)
7780
return False
7881
if (
7982
operation_config.profile_date_of_month is not None
8083
and operation_config.profile_date_of_month != today.day
8184
):
82-
logger.debug(
85+
logger.info(
8386
"Profiling won't be done because date of month does not match config profile_date_of_month.",
8487
)
8588
return False

0 commit comments

Comments
 (0)