Skip to content

Commit 554c375

Browse files
hsheth2sleeperdeep
authored andcommitted
refactor(ingest/sql): add _get_view_definition helper method (datahub-project#12033)
1 parent 4abbfea commit 554c375

File tree

2 files changed

+32
-23
lines changed

2 files changed

+32
-23
lines changed

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py

-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
import pydantic
88
import sqlalchemy.dialects.mssql
9-
10-
# This import verifies that the dependencies are available.
119
from pydantic.fields import Field
1210
from sqlalchemy import create_engine, inspect
1311
from sqlalchemy.engine.base import Connection

metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py

+32-21
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,8 @@ def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
589589
generate_operations=False,
590590
)
591591
for dataset_name in self._view_definition_cache.keys():
592+
# TODO: Ensure that the lineage generated from the view definition
593+
# matches the dataset_name.
592594
view_definition = self._view_definition_cache[dataset_name]
593595
result = self._run_sql_parser(
594596
dataset_name,
@@ -1094,6 +1096,20 @@ def loop_views(
10941096
exc=e,
10951097
)
10961098

1099+
def _get_view_definition(self, inspector: Inspector, schema: str, view: str) -> str:
1100+
try:
1101+
view_definition = inspector.get_view_definition(view, schema)
1102+
if view_definition is None:
1103+
view_definition = ""
1104+
else:
1105+
# Some dialects return a TextClause instead of a raw string,
1106+
# so we need to convert them to a string.
1107+
view_definition = str(view_definition)
1108+
except NotImplementedError:
1109+
view_definition = ""
1110+
1111+
return view_definition
1112+
10971113
def _process_view(
10981114
self,
10991115
dataset_name: str,
@@ -1112,7 +1128,10 @@ def _process_view(
11121128
columns = inspector.get_columns(view, schema)
11131129
except KeyError:
11141130
# For certain types of views, we are unable to fetch the list of columns.
1115-
self.warn(logger, dataset_name, "unable to get schema for this view")
1131+
self.report.warning(
1132+
message="Unable to get schema for a view",
1133+
context=f"{dataset_name}",
1134+
)
11161135
schema_metadata = None
11171136
else:
11181137
schema_fields = self.get_schema_fields(dataset_name, columns, inspector)
@@ -1126,19 +1145,12 @@ def _process_view(
11261145
if self._save_schema_to_resolver():
11271146
self.schema_resolver.add_schema_metadata(dataset_urn, schema_metadata)
11281147
self.discovered_datasets.add(dataset_name)
1148+
11291149
description, properties, _ = self.get_table_properties(inspector, schema, view)
1130-
try:
1131-
view_definition = inspector.get_view_definition(view, schema)
1132-
if view_definition is None:
1133-
view_definition = ""
1134-
else:
1135-
# Some dialects return a TextClause instead of a raw string,
1136-
# so we need to convert them to a string.
1137-
view_definition = str(view_definition)
1138-
except NotImplementedError:
1139-
view_definition = ""
1140-
properties["view_definition"] = view_definition
11411150
properties["is_view"] = "True"
1151+
1152+
view_definition = self._get_view_definition(inspector, schema, view)
1153+
properties["view_definition"] = view_definition
11421154
if view_definition and self.config.include_view_lineage:
11431155
self._view_definition_cache[dataset_name] = view_definition
11441156

@@ -1170,15 +1182,14 @@ def _process_view(
11701182
entityUrn=dataset_urn,
11711183
aspect=SubTypesClass(typeNames=[DatasetSubTypes.VIEW]),
11721184
).as_workunit()
1173-
if "view_definition" in properties:
1174-
view_definition_string = properties["view_definition"]
1175-
view_properties_aspect = ViewPropertiesClass(
1176-
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
1177-
)
1178-
yield MetadataChangeProposalWrapper(
1179-
entityUrn=dataset_urn,
1180-
aspect=view_properties_aspect,
1181-
).as_workunit()
1185+
1186+
view_properties_aspect = ViewPropertiesClass(
1187+
materialized=False, viewLanguage="SQL", viewLogic=view_definition
1188+
)
1189+
yield MetadataChangeProposalWrapper(
1190+
entityUrn=dataset_urn,
1191+
aspect=view_properties_aspect,
1192+
).as_workunit()
11821193

11831194
if self.config.domain and self.domain_registry:
11841195
yield from get_domain_wu(

0 commit comments

Comments
 (0)