Skip to content

Commit 21102f2

Browse files
committed
feat(ingest/s3): add table filtering
- add tables_filter_pattern to path_spec - add table filtering to S3Source().s3_browser()
1 parent fa4ff7b commit 21102f2

File tree

7 files changed

+2310
-1
lines changed

7 files changed

+2310
-1
lines changed

metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py

+21-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pydantic.fields import Field
1212
from wcmatch import pathlib
1313

14-
from datahub.configuration.common import ConfigModel
14+
from datahub.configuration.common import AllowDenyPattern, ConfigModel
1515
from datahub.ingestion.source.aws.s3_util import is_s3_uri
1616
from datahub.ingestion.source.azure.abs_utils import is_abs_uri
1717
from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri
@@ -145,6 +145,11 @@ class Config:
145145
description="Include hidden folders in the traversal (folders starting with . or _",
146146
)
147147

148+
tables_filter_pattern: AllowDenyPattern = Field(
149+
default=AllowDenyPattern.allow_all(),
150+
description="The tables_filter_pattern configuration field uses regular expressions to filter the tables part of the Pathspec for ingestion, allowing fine-grained control over which tables are included or excluded based on specified patterns. The default setting allows all tables.",
151+
)
152+
148153
def is_path_hidden(self, path: str) -> bool:
149154
# Split the path into directories and filename
150155
dirs, filename = os.path.split(path)
@@ -177,6 +182,12 @@ def allowed(self, path: str, ignore_ext: bool = False) -> bool:
177182
):
178183
return False
179184
logger.debug(f"{path} is not excluded")
185+
186+
table_name, _ = self.extract_table_name_and_path(path)
187+
if not self.tables_filter_pattern.allowed(table_name):
188+
return False
189+
logger.debug(f"{path} is passed table name check")
190+
180191
ext = os.path.splitext(path)[1].strip(".")
181192

182193
if not ignore_ext:
@@ -218,6 +229,15 @@ def dir_allowed(self, path: str) -> bool:
218229
exclude_path.rstrip("/"), flags=pathlib.GLOBSTAR
219230
):
220231
return False
232+
233+
file_name_pattern = self.include.rsplit("/", 1)[1]
234+
table_name, _ = self.extract_table_name_and_path(
235+
os.path.join(path, file_name_pattern)
236+
)
237+
if not self.tables_filter_pattern.allowed(table_name):
238+
return False
239+
logger.debug(f"{path} is passed table name check")
240+
221241
return True
222242

223243
@classmethod

metadata-ingestion/src/datahub/ingestion/source/s3/source.py

+11
Original file line numberDiff line numberDiff line change
@@ -945,6 +945,17 @@ def s3_browser(self, path_spec: PathSpec, sample_size: int) -> Iterable[BrowsePa
945945
for f in list_folders(
946946
bucket_name, f"{folder}", self.source_config.aws_config
947947
):
948+
table_path = self.create_s3_path(bucket_name, f)
949+
table_name, _ = path_spec.extract_table_name_and_path(
950+
table_path
951+
)
952+
if not path_spec.tables_filter_pattern.allowed(table_name):
953+
logger.debug(
954+
f"Table '{table_name}' not allowed and skipping"
955+
)
956+
self.report.report_file_dropped(table_path)
957+
continue
958+
948959
dirs_to_process = []
949960
logger.info(f"Processing folder: {f}")
950961
if path_spec.traversal_method == FolderTraversalMethod.ALL:

0 commit comments

Comments
 (0)