@@ -582,6 +582,8 @@ def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
582
582
generate_operations = False ,
583
583
)
584
584
for dataset_name in self ._view_definition_cache .keys ():
585
+ # TODO: Ensure that the lineage generated from the view definition
586
+ # matches the dataset_name.
585
587
view_definition = self ._view_definition_cache [dataset_name ]
586
588
result = self ._run_sql_parser (
587
589
dataset_name ,
@@ -1059,6 +1061,20 @@ def loop_views(
1059
1061
exc = e ,
1060
1062
)
1061
1063
1064
+ def _get_view_definition (self , inspector : Inspector , schema : str , view : str ) -> str :
1065
+ try :
1066
+ view_definition = inspector .get_view_definition (view , schema )
1067
+ if view_definition is None :
1068
+ view_definition = ""
1069
+ else :
1070
+ # Some dialects return a TextClause instead of a raw string,
1071
+ # so we need to convert them to a string.
1072
+ view_definition = str (view_definition )
1073
+ except NotImplementedError :
1074
+ view_definition = ""
1075
+
1076
+ return view_definition
1077
+
1062
1078
def _process_view (
1063
1079
self ,
1064
1080
dataset_name : str ,
@@ -1077,7 +1093,10 @@ def _process_view(
1077
1093
columns = inspector .get_columns (view , schema )
1078
1094
except KeyError :
1079
1095
# For certain types of views, we are unable to fetch the list of columns.
1080
- self .warn (logger , dataset_name , "unable to get schema for this view" )
1096
+ self .report .warning (
1097
+ message = "Unable to get schema for a view" ,
1098
+ context = f"{ dataset_name } " ,
1099
+ )
1081
1100
schema_metadata = None
1082
1101
else :
1083
1102
schema_fields = self .get_schema_fields (dataset_name , columns , inspector )
@@ -1091,19 +1110,12 @@ def _process_view(
1091
1110
if self ._save_schema_to_resolver ():
1092
1111
self .schema_resolver .add_schema_metadata (dataset_urn , schema_metadata )
1093
1112
self .discovered_datasets .add (dataset_name )
1113
+
1094
1114
description , properties , _ = self .get_table_properties (inspector , schema , view )
1095
- try :
1096
- view_definition = inspector .get_view_definition (view , schema )
1097
- if view_definition is None :
1098
- view_definition = ""
1099
- else :
1100
- # Some dialects return a TextClause instead of a raw string,
1101
- # so we need to convert them to a string.
1102
- view_definition = str (view_definition )
1103
- except NotImplementedError :
1104
- view_definition = ""
1105
- properties ["view_definition" ] = view_definition
1106
1115
properties ["is_view" ] = "True"
1116
+
1117
+ view_definition = self ._get_view_definition (inspector , schema , view )
1118
+ properties ["view_definition" ] = view_definition
1107
1119
if view_definition and self .config .include_view_lineage :
1108
1120
self ._view_definition_cache [dataset_name ] = view_definition
1109
1121
@@ -1135,15 +1147,14 @@ def _process_view(
1135
1147
entityUrn = dataset_urn ,
1136
1148
aspect = SubTypesClass (typeNames = [DatasetSubTypes .VIEW ]),
1137
1149
).as_workunit ()
1138
- if "view_definition" in properties :
1139
- view_definition_string = properties ["view_definition" ]
1140
- view_properties_aspect = ViewPropertiesClass (
1141
- materialized = False , viewLanguage = "SQL" , viewLogic = view_definition_string
1142
- )
1143
- yield MetadataChangeProposalWrapper (
1144
- entityUrn = dataset_urn ,
1145
- aspect = view_properties_aspect ,
1146
- ).as_workunit ()
1150
+
1151
+ view_properties_aspect = ViewPropertiesClass (
1152
+ materialized = False , viewLanguage = "SQL" , viewLogic = view_definition
1153
+ )
1154
+ yield MetadataChangeProposalWrapper (
1155
+ entityUrn = dataset_urn ,
1156
+ aspect = view_properties_aspect ,
1157
+ ).as_workunit ()
1147
1158
1148
1159
if self .config .domain and self .domain_registry :
1149
1160
yield from get_domain_wu (
0 commit comments