Skip to content

Commit c7fd0c5

Browse files
authored
Merge branch 'master' into hive-improvements
2 parents e72e425 + bd7649e commit c7fd0c5

File tree

8 files changed

+70
-81
lines changed

8 files changed

+70
-81
lines changed

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py

+2-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import os
2-
from abc import ABC
32
from dataclasses import dataclass
43
from enum import Enum
54
from typing import Any, Dict, List, Optional
@@ -12,18 +11,8 @@
1211
TRACE_POWERBI_MQUERY_PARSER = os.getenv("DATAHUB_TRACE_POWERBI_MQUERY_PARSER", False)
1312

1413

15-
class AbstractIdentifierAccessor(ABC): # To pass lint
16-
pass
17-
18-
19-
# @dataclass
20-
# class ItemSelector:
21-
# items: Dict[str, Any]
22-
# next: Optional[AbstractIdentifierAccessor]
23-
24-
2514
@dataclass
26-
class IdentifierAccessor(AbstractIdentifierAccessor):
15+
class IdentifierAccessor:
2716
"""
2817
statement
2918
public_order_date = Source{[Schema="public",Item="order_date"]}[Data]
@@ -40,7 +29,7 @@ class IdentifierAccessor(AbstractIdentifierAccessor):
4029

4130
identifier: str
4231
items: Dict[str, Any]
43-
next: Optional[AbstractIdentifierAccessor]
32+
next: Optional["IdentifierAccessor"]
4433

4534

4635
@dataclass

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py

+19-27
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from abc import ABC, abstractmethod
33
from enum import Enum
4-
from typing import Dict, List, Optional, Tuple, Type, Union, cast
4+
from typing import Dict, List, Optional, Tuple, Type, cast
55

66
from lark import Tree
77

@@ -22,7 +22,6 @@
2222
)
2323
from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function
2424
from datahub.ingestion.source.powerbi.m_query.data_classes import (
25-
AbstractIdentifierAccessor,
2625
DataAccessFunctionDetail,
2726
DataPlatformTable,
2827
FunctionName,
@@ -412,33 +411,25 @@ def create_lineage(
412411
)
413412
table_detail: Dict[str, str] = {}
414413
temp_accessor: Optional[
415-
Union[IdentifierAccessor, AbstractIdentifierAccessor]
414+
IdentifierAccessor
416415
] = data_access_func_detail.identifier_accessor
417416

418417
while temp_accessor:
419-
if isinstance(temp_accessor, IdentifierAccessor):
420-
# Condition to handle databricks M-query pattern where table, schema and database all are present in
421-
# the same invoke statement
422-
if all(
423-
element in temp_accessor.items
424-
for element in ["Item", "Schema", "Catalog"]
425-
):
426-
table_detail["Schema"] = temp_accessor.items["Schema"]
427-
table_detail["Table"] = temp_accessor.items["Item"]
428-
else:
429-
table_detail[temp_accessor.items["Kind"]] = temp_accessor.items[
430-
"Name"
431-
]
432-
433-
if temp_accessor.next is not None:
434-
temp_accessor = temp_accessor.next
435-
else:
436-
break
418+
# Condition to handle databricks M-query pattern where table, schema and database all are present in
419+
# the same invoke statement
420+
if all(
421+
element in temp_accessor.items
422+
for element in ["Item", "Schema", "Catalog"]
423+
):
424+
table_detail["Schema"] = temp_accessor.items["Schema"]
425+
table_detail["Table"] = temp_accessor.items["Item"]
437426
else:
438-
logger.debug(
439-
"expecting instance to be IdentifierAccessor, please check if parsing is done properly"
440-
)
441-
return Lineage.empty()
427+
table_detail[temp_accessor.items["Kind"]] = temp_accessor.items["Name"]
428+
429+
if temp_accessor.next is not None:
430+
temp_accessor = temp_accessor.next
431+
else:
432+
break
442433

443434
table_reference = self.create_reference_table(
444435
arg_list=data_access_func_detail.arg_list,
@@ -786,9 +777,10 @@ def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]:
786777
def create_lineage(
787778
self, data_access_func_detail: DataAccessFunctionDetail
788779
) -> Lineage:
789-
t1: Tree = cast(
790-
Tree, tree_function.first_arg_list_func(data_access_func_detail.arg_list)
780+
t1: Optional[Tree] = tree_function.first_arg_list_func(
781+
data_access_func_detail.arg_list
791782
)
783+
assert t1 is not None
792784
flat_argument_list: List[Tree] = tree_function.flat_argument_list(t1)
793785

794786
if len(flat_argument_list) != 2:

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py

+8-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from abc import ABC, abstractmethod
3-
from typing import Any, Dict, List, Optional, Tuple, Union, cast
3+
from typing import Any, Dict, List, Optional, Tuple, Union
44

55
from lark import Tree
66

@@ -95,14 +95,12 @@ def get_item_selector_tokens(
9595
# remove whitespaces and quotes from token
9696
tokens: List[str] = tree_function.strip_char_from_list(
9797
tree_function.remove_whitespaces_from_list(
98-
tree_function.token_values(
99-
cast(Tree, item_selector), parameters=self.parameters
100-
)
98+
tree_function.token_values(item_selector, parameters=self.parameters)
10199
),
102100
)
103101
identifier: List[str] = tree_function.token_values(
104-
cast(Tree, identifier_tree)
105-
) # type :ignore
102+
identifier_tree, parameters={}
103+
)
106104

107105
# convert tokens to dict
108106
iterator = iter(tokens)
@@ -238,10 +236,10 @@ def _process_invoke_expression(
238236
def _process_item_selector_expression(
239237
self, rh_tree: Tree
240238
) -> Tuple[Optional[str], Optional[Dict[str, str]]]:
241-
new_identifier, key_vs_value = self.get_item_selector_tokens( # type: ignore
242-
cast(Tree, tree_function.first_expression_func(rh_tree))
243-
)
239+
first_expression: Optional[Tree] = tree_function.first_expression_func(rh_tree)
240+
assert first_expression is not None
244241

242+
new_identifier, key_vs_value = self.get_item_selector_tokens(first_expression)
245243
return new_identifier, key_vs_value
246244

247245
@staticmethod
@@ -327,7 +325,7 @@ def internal(
327325
# The first argument can be a single table argument or list of table.
328326
# For example Table.Combine({t1,t2},....), here first argument is list of table.
329327
# Table.AddColumn(t1,....), here first argument is single table.
330-
for token in cast(List[str], result):
328+
for token in result:
331329
internal(token, identifier_accessor)
332330

333331
else:

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from functools import partial
3-
from typing import Any, Dict, List, Optional, Union, cast
3+
from typing import Any, Dict, List, Optional, Union
44

55
from lark import Token, Tree
66

@@ -58,7 +58,7 @@ def internal(node: Union[Tree, Token]) -> Optional[Tree]:
5858
if isinstance(node, Token):
5959
return None
6060

61-
for child in cast(Tree, node).children:
61+
for child in node.children:
6262
child_node: Optional[Tree] = internal(child)
6363
if child_node is not None:
6464
return child_node
@@ -99,7 +99,7 @@ def internal(node: Union[Tree, Token]) -> None:
9999
logger.debug(f"Unable to resolve parameter reference to {ref}")
100100
values.append(ref)
101101
elif isinstance(node, Token):
102-
values.append(cast(Token, node).value)
102+
values.append(node.value)
103103
return
104104
else:
105105
for child in node.children:

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from collections import defaultdict
33
from dataclasses import dataclass
4-
from typing import Dict, List, Optional, Set, cast
4+
from typing import Dict, List, Optional, Set
55

66
import pydantic
77
from pydantic import Field, SecretStr, root_validator, validator
@@ -118,9 +118,10 @@ def validate_legacy_schema_pattern(cls, values: Dict) -> Dict:
118118
)
119119

120120
# Always exclude reporting metadata for INFORMATION_SCHEMA schema
121-
if schema_pattern is not None and schema_pattern:
121+
if schema_pattern:
122122
logger.debug("Adding deny for INFORMATION_SCHEMA to schema_pattern.")
123-
cast(AllowDenyPattern, schema_pattern).deny.append(r".*INFORMATION_SCHEMA$")
123+
assert isinstance(schema_pattern, AllowDenyPattern)
124+
schema_pattern.deny.append(r".*INFORMATION_SCHEMA$")
124125

125126
return values
126127

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def tables_for_database(db_name: Optional[str]) -> str:
132132
auto_clustering_on AS "AUTO_CLUSTERING_ON"
133133
FROM {db_clause}information_schema.tables t
134134
WHERE table_schema != 'INFORMATION_SCHEMA'
135-
and table_type in ( 'BASE TABLE', 'EXTERNAL TABLE')
135+
and table_type in ( 'BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE')
136136
order by table_schema, table_name"""
137137

138138
@staticmethod
@@ -152,7 +152,7 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str:
152152
auto_clustering_on AS "AUTO_CLUSTERING_ON"
153153
FROM {db_clause}information_schema.tables t
154154
where table_schema='{schema_name}'
155-
and table_type in ('BASE TABLE', 'EXTERNAL TABLE')
155+
and table_type in ('BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE')
156156
order by table_schema, table_name"""
157157

158158
@staticmethod

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py

-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
import pydantic
77
import sqlalchemy.dialects.mssql
8-
9-
# This import verifies that the dependencies are available.
108
from pydantic.fields import Field
119
from sqlalchemy import create_engine, inspect
1210
from sqlalchemy.engine.base import Connection

metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py

+32-21
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,8 @@ def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
582582
generate_operations=False,
583583
)
584584
for dataset_name in self._view_definition_cache.keys():
585+
# TODO: Ensure that the lineage generated from the view definition
586+
# matches the dataset_name.
585587
view_definition = self._view_definition_cache[dataset_name]
586588
result = self._run_sql_parser(
587589
dataset_name,
@@ -1059,6 +1061,20 @@ def loop_views(
10591061
exc=e,
10601062
)
10611063

1064+
def _get_view_definition(self, inspector: Inspector, schema: str, view: str) -> str:
1065+
try:
1066+
view_definition = inspector.get_view_definition(view, schema)
1067+
if view_definition is None:
1068+
view_definition = ""
1069+
else:
1070+
# Some dialects return a TextClause instead of a raw string,
1071+
# so we need to convert them to a string.
1072+
view_definition = str(view_definition)
1073+
except NotImplementedError:
1074+
view_definition = ""
1075+
1076+
return view_definition
1077+
10621078
def _process_view(
10631079
self,
10641080
dataset_name: str,
@@ -1077,7 +1093,10 @@ def _process_view(
10771093
columns = inspector.get_columns(view, schema)
10781094
except KeyError:
10791095
# For certain types of views, we are unable to fetch the list of columns.
1080-
self.warn(logger, dataset_name, "unable to get schema for this view")
1096+
self.report.warning(
1097+
message="Unable to get schema for a view",
1098+
context=f"{dataset_name}",
1099+
)
10811100
schema_metadata = None
10821101
else:
10831102
schema_fields = self.get_schema_fields(dataset_name, columns, inspector)
@@ -1091,19 +1110,12 @@ def _process_view(
10911110
if self._save_schema_to_resolver():
10921111
self.schema_resolver.add_schema_metadata(dataset_urn, schema_metadata)
10931112
self.discovered_datasets.add(dataset_name)
1113+
10941114
description, properties, _ = self.get_table_properties(inspector, schema, view)
1095-
try:
1096-
view_definition = inspector.get_view_definition(view, schema)
1097-
if view_definition is None:
1098-
view_definition = ""
1099-
else:
1100-
# Some dialects return a TextClause instead of a raw string,
1101-
# so we need to convert them to a string.
1102-
view_definition = str(view_definition)
1103-
except NotImplementedError:
1104-
view_definition = ""
1105-
properties["view_definition"] = view_definition
11061115
properties["is_view"] = "True"
1116+
1117+
view_definition = self._get_view_definition(inspector, schema, view)
1118+
properties["view_definition"] = view_definition
11071119
if view_definition and self.config.include_view_lineage:
11081120
self._view_definition_cache[dataset_name] = view_definition
11091121

@@ -1135,15 +1147,14 @@ def _process_view(
11351147
entityUrn=dataset_urn,
11361148
aspect=SubTypesClass(typeNames=[DatasetSubTypes.VIEW]),
11371149
).as_workunit()
1138-
if "view_definition" in properties:
1139-
view_definition_string = properties["view_definition"]
1140-
view_properties_aspect = ViewPropertiesClass(
1141-
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
1142-
)
1143-
yield MetadataChangeProposalWrapper(
1144-
entityUrn=dataset_urn,
1145-
aspect=view_properties_aspect,
1146-
).as_workunit()
1150+
1151+
view_properties_aspect = ViewPropertiesClass(
1152+
materialized=False, viewLanguage="SQL", viewLogic=view_definition
1153+
)
1154+
yield MetadataChangeProposalWrapper(
1155+
entityUrn=dataset_urn,
1156+
aspect=view_properties_aspect,
1157+
).as_workunit()
11471158

11481159
if self.config.domain and self.domain_registry:
11491160
yield from get_domain_wu(

0 commit comments

Comments
 (0)