Skip to content

Commit 4e1ff8d

Browse files
committed
Merge remote-tracking branch 'oss-datahub/master' into vertex_exp_4
2 parents 5a7ddc8 + cffc6d4 commit 4e1ff8d

File tree

19 files changed

+2911
-190
lines changed

19 files changed

+2911
-190
lines changed

.github/scripts/docker_helpers.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ function get_tag_full {
2424
}
2525

2626
function get_python_docker_release_v {
27-
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},1!0.0.0+docker.${SHORT_SHA},g" -e 's,refs/tags/v\(.*\),1!\1+docker,g' -e 's,refs/pull/\([0-9]*\).*,1!0.0.0+docker.pr\1,g')
27+
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},1!0.0.0+docker.${SHORT_SHA},g" -e 's,refs/tags/v\([0-9a-zA-Z.]*\).*,1\!\1+docker,g' -e 's,refs/pull/\([0-9]*\).*,1!0.0.0+docker.pr\1,g')
2828
}
2929

3030
function get_unique_tag {

docs-website/docusaurus.config.js

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ module.exports = {
3535
},
3636
{
3737
src: "https://app.revenuehero.io/scheduler.min.js"
38+
},
39+
{
40+
src: "https://tag.clearbitscripts.com/v1/pk_2e321cabe30432a5c44c0424781aa35f/tags.js",
41+
referrerPolicy: "strict-origin-when-cross-origin"
3842
}
3943
],
4044
noIndex: isSaas,

metadata-ingestion/examples/ai/dh_ai_client.py

+16-15
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
ChangeTypeClass,
1515
DataProcessInstanceRunResultClass,
1616
DataProcessRunStatusClass,
17+
EdgeClass,
1718
)
1819
from datahub.metadata.urns import (
1920
ContainerUrn,
@@ -255,7 +256,7 @@ def create_model(
255256
version_props = {
256257
"version": version_tag,
257258
"versionSet": str(version_set_urn),
258-
"sortId": "AAAAAAAA",
259+
"sortId": str(version_tag).zfill(10),
259260
}
260261

261262
# Add alias if provided
@@ -266,22 +267,10 @@ def create_model(
266267
models.VersionPropertiesClass, version_props
267268
)
268269

269-
# Create version set properties
270-
version_set_properties = models.VersionSetPropertiesClass(
271-
latest=str(model_urn),
272-
versioningScheme="ALPHANUMERIC_GENERATED_BY_DATAHUB",
273-
)
274-
275270
mcps = [
276271
self._create_mcp(
277272
str(model_urn), properties, "mlModel", "mlModelProperties"
278273
),
279-
self._create_mcp(
280-
str(version_set_urn),
281-
version_set_properties,
282-
"versionSet",
283-
"versionSetProperties",
284-
),
285274
self._create_mcp(
286275
str(model_urn), version_properties, "mlModel", "versionProperties"
287276
),
@@ -429,7 +418,13 @@ def add_input_datasets_to_run(self, run_urn: str, dataset_urns: List[str]) -> No
429418
entity_urn=run_urn,
430419
entity_type="dataProcessInstance",
431420
aspect_name="dataProcessInstanceInput",
432-
aspect=DataProcessInstanceInput(inputs=dataset_urns),
421+
aspect=DataProcessInstanceInput(
422+
inputs=[],
423+
inputEdges=[
424+
EdgeClass(destinationUrn=str(dataset_urn))
425+
for dataset_urn in dataset_urns
426+
],
427+
),
433428
)
434429
self._emit_mcps(mcp)
435430
logger.info(f"Added input datasets to run {run_urn}")
@@ -440,7 +435,13 @@ def add_output_datasets_to_run(self, run_urn: str, dataset_urns: List[str]) -> N
440435
entity_urn=run_urn,
441436
entity_type="dataProcessInstance",
442437
aspect_name="dataProcessInstanceOutput",
443-
aspect=DataProcessInstanceOutput(outputs=dataset_urns),
438+
aspect=DataProcessInstanceOutput(
439+
outputEdges=[
440+
EdgeClass(destinationUrn=str(dataset_urn))
441+
for dataset_urn in dataset_urns
442+
],
443+
outputs=[],
444+
),
444445
)
445446
self._emit_mcps(mcp)
446447
logger.info(f"Added output datasets to run {run_urn}")

metadata-ingestion/examples/ai/dh_ai_client_sample.py

+14-14
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,19 @@
9696
end_timestamp=1628580001000,
9797
)
9898
# Create datasets
99-
input_dataset_urn = client.create_dataset(
100-
platform="snowflake",
101-
name="iris_input",
102-
)
99+
input_dataset_urns = [
100+
client.create_dataset(
101+
platform="snowflake",
102+
name="iris_input",
103+
)
104+
]
103105

104-
output_dataset_urn = client.create_dataset(
105-
platform="snowflake",
106-
name="iris_ouptut",
107-
)
106+
output_dataset_urns = [
107+
client.create_dataset(
108+
platform="snowflake",
109+
name="iris_ouptut",
110+
)
111+
]
108112

109113
# Add run to experiment
110114
client.add_run_to_experiment(run_urn=run_urn, experiment_urn=experiment_urn)
@@ -125,10 +129,6 @@
125129
)
126130

127131
# Add input and output datasets to run
128-
client.add_input_datasets_to_run(
129-
run_urn=run_urn, dataset_urns=[str(input_dataset_urn)]
130-
)
132+
client.add_input_datasets_to_run(run_urn=run_urn, dataset_urns=input_dataset_urns)
131133

132-
client.add_output_datasets_to_run(
133-
run_urn=run_urn, dataset_urns=[str(output_dataset_urn)]
134-
)
134+
client.add_output_datasets_to_run(run_urn=run_urn, dataset_urns=output_dataset_urns)

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

+16-13
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ def _parse_audit_log_row(
403403
res["session_id"],
404404
res["query_start_time"],
405405
object_modified_by_ddl,
406+
res["query_type"],
406407
)
407408
if known_ddl_entry:
408409
return known_ddl_entry
@@ -537,40 +538,42 @@ def parse_ddl_query(
537538
session_id: str,
538539
timestamp: datetime,
539540
object_modified_by_ddl: dict,
541+
query_type: str,
540542
) -> Optional[Union[TableRename, TableSwap]]:
541543
timestamp = timestamp.astimezone(timezone.utc)
542-
if object_modified_by_ddl[
543-
"operationType"
544-
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
545-
urn1 = self.identifiers.gen_dataset_urn(
544+
if (
545+
object_modified_by_ddl["operationType"] == "ALTER"
546+
and query_type == "RENAME_TABLE"
547+
and object_modified_by_ddl["properties"].get("objectName")
548+
):
549+
original_un = self.identifiers.gen_dataset_urn(
546550
self.identifiers.get_dataset_identifier_from_qualified_name(
547551
object_modified_by_ddl["objectName"]
548552
)
549553
)
550554

551-
urn2 = self.identifiers.gen_dataset_urn(
555+
new_urn = self.identifiers.gen_dataset_urn(
552556
self.identifiers.get_dataset_identifier_from_qualified_name(
553-
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
557+
object_modified_by_ddl["properties"]["objectName"]["value"]
554558
)
555559
)
556-
557-
return TableSwap(urn1, urn2, query, session_id, timestamp)
560+
return TableRename(original_un, new_urn, query, session_id, timestamp)
558561
elif object_modified_by_ddl[
559562
"operationType"
560-
] == "RENAME_TABLE" and object_modified_by_ddl["properties"].get("objectName"):
561-
original_un = self.identifiers.gen_dataset_urn(
563+
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
564+
urn1 = self.identifiers.gen_dataset_urn(
562565
self.identifiers.get_dataset_identifier_from_qualified_name(
563566
object_modified_by_ddl["objectName"]
564567
)
565568
)
566569

567-
new_urn = self.identifiers.gen_dataset_urn(
570+
urn2 = self.identifiers.gen_dataset_urn(
568571
self.identifiers.get_dataset_identifier_from_qualified_name(
569-
object_modified_by_ddl["properties"]["objectName"]["value"]
572+
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
570573
)
571574
)
572575

573-
return TableRename(original_un, new_urn, query, session_id, timestamp)
576+
return TableSwap(urn1, urn2, query, session_id, timestamp)
574577
else:
575578
self.report.num_ddl_queries_dropped += 1
576579
return None

metadata-ingestion/src/datahub/ingestion/source/superset.py

+118-14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
make_dataset_urn,
2424
make_dataset_urn_with_platform_instance,
2525
make_domain_urn,
26+
make_schema_field_urn,
2627
make_user_urn,
2728
)
2829
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
@@ -72,6 +73,9 @@
7273
DashboardInfoClass,
7374
DatasetLineageTypeClass,
7475
DatasetPropertiesClass,
76+
FineGrainedLineageClass,
77+
FineGrainedLineageDownstreamTypeClass,
78+
FineGrainedLineageUpstreamTypeClass,
7579
GlobalTagsClass,
7680
OwnerClass,
7781
OwnershipClass,
@@ -80,6 +84,10 @@
8084
UpstreamClass,
8185
UpstreamLineageClass,
8286
)
87+
from datahub.sql_parsing.sqlglot_lineage import (
88+
SqlParsingResult,
89+
create_lineage_sql_parsed_result,
90+
)
8391
from datahub.utilities import config_clean
8492
from datahub.utilities.lossy_collections import LossyList
8593
from datahub.utilities.registries.domain_registry import DomainRegistry
@@ -342,7 +350,7 @@ def get_dataset_info(self, dataset_id: int) -> dict:
342350
)
343351
if dataset_response.status_code != 200:
344352
logger.warning(f"Failed to get dataset info: {dataset_response.text}")
345-
dataset_response.raise_for_status()
353+
return {}
346354
return dataset_response.json()
347355

348356
def get_datasource_urn_from_id(
@@ -680,6 +688,88 @@ def gen_dataset_urn(self, datahub_dataset_name: str) -> str:
680688
env=self.config.env,
681689
)
682690

691+
def generate_virtual_dataset_lineage(
692+
self,
693+
parsed_query_object: SqlParsingResult,
694+
datasource_urn: str,
695+
) -> UpstreamLineageClass:
696+
cll = (
697+
parsed_query_object.column_lineage
698+
if parsed_query_object.column_lineage is not None
699+
else []
700+
)
701+
702+
fine_grained_lineages: List[FineGrainedLineageClass] = []
703+
704+
for cll_info in cll:
705+
downstream = (
706+
[make_schema_field_urn(datasource_urn, cll_info.downstream.column)]
707+
if cll_info.downstream and cll_info.downstream.column
708+
else []
709+
)
710+
upstreams = [
711+
make_schema_field_urn(column_ref.table, column_ref.column)
712+
for column_ref in cll_info.upstreams
713+
]
714+
fine_grained_lineages.append(
715+
FineGrainedLineageClass(
716+
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
717+
downstreams=downstream,
718+
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
719+
upstreams=upstreams,
720+
)
721+
)
722+
723+
upstream_lineage = UpstreamLineageClass(
724+
upstreams=[
725+
UpstreamClass(
726+
type=DatasetLineageTypeClass.TRANSFORMED,
727+
dataset=input_table_urn,
728+
)
729+
for input_table_urn in parsed_query_object.in_tables
730+
],
731+
fineGrainedLineages=fine_grained_lineages,
732+
)
733+
return upstream_lineage
734+
735+
def generate_physical_dataset_lineage(
736+
self,
737+
dataset_response: dict,
738+
upstream_dataset: str,
739+
datasource_urn: str,
740+
) -> UpstreamLineageClass:
741+
# To generate column level lineage, we can manually decode the metadata
742+
# to produce the ColumnLineageInfo
743+
columns = dataset_response.get("result", {}).get("columns", [])
744+
fine_grained_lineages: List[FineGrainedLineageClass] = []
745+
746+
for column in columns:
747+
column_name = column.get("column_name", "")
748+
if not column_name:
749+
continue
750+
751+
downstream = [make_schema_field_urn(datasource_urn, column_name)]
752+
upstreams = [make_schema_field_urn(upstream_dataset, column_name)]
753+
fine_grained_lineages.append(
754+
FineGrainedLineageClass(
755+
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
756+
downstreams=downstream,
757+
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
758+
upstreams=upstreams,
759+
)
760+
)
761+
762+
upstream_lineage = UpstreamLineageClass(
763+
upstreams=[
764+
UpstreamClass(
765+
type=DatasetLineageTypeClass.TRANSFORMED,
766+
dataset=upstream_dataset,
767+
)
768+
],
769+
fineGrainedLineages=fine_grained_lineages,
770+
)
771+
return upstream_lineage
772+
683773
def construct_dataset_from_dataset_data(
684774
self, dataset_data: dict
685775
) -> DatasetSnapshot:
@@ -700,6 +790,14 @@ def construct_dataset_from_dataset_data(
700790
upstream_warehouse_platform = (
701791
dataset_response.get("result", {}).get("database", {}).get("backend")
702792
)
793+
upstream_warehouse_db_name = (
794+
dataset_response.get("result", {}).get("database", {}).get("database_name")
795+
)
796+
797+
# if we have rendered sql, we always use that and defualt back to regular sql
798+
sql = dataset_response.get("result", {}).get(
799+
"rendered_sql"
800+
) or dataset_response.get("result", {}).get("sql")
703801

704802
# Preset has a way of naming their platforms differently than
705803
# how datahub names them, so map the platform name to the correct naming
@@ -712,22 +810,28 @@ def construct_dataset_from_dataset_data(
712810
if upstream_warehouse_platform in warehouse_naming:
713811
upstream_warehouse_platform = warehouse_naming[upstream_warehouse_platform]
714812

715-
# TODO: Categorize physical vs virtual upstream dataset
716-
# mark all upstream dataset as physical for now, in the future we would ideally like
717-
# to differentiate physical vs virtual upstream datasets
718-
tag_urn = f"urn:li:tag:{self.platform}:physical"
719813
upstream_dataset = self.get_datasource_urn_from_id(
720814
dataset_response, upstream_warehouse_platform
721815
)
722-
upstream_lineage = UpstreamLineageClass(
723-
upstreams=[
724-
UpstreamClass(
725-
type=DatasetLineageTypeClass.TRANSFORMED,
726-
dataset=upstream_dataset,
727-
properties={"externalUrl": dataset_url},
728-
)
729-
]
730-
)
816+
817+
# Sometimes the field will be null instead of not existing
818+
if sql == "null" or not sql:
819+
tag_urn = f"urn:li:tag:{self.platform}:physical"
820+
upstream_lineage = self.generate_physical_dataset_lineage(
821+
dataset_response, upstream_dataset, datasource_urn
822+
)
823+
else:
824+
tag_urn = f"urn:li:tag:{self.platform}:virtual"
825+
parsed_query_object = create_lineage_sql_parsed_result(
826+
query=sql,
827+
default_db=upstream_warehouse_db_name,
828+
platform=upstream_warehouse_platform,
829+
platform_instance=None,
830+
env=self.config.env,
831+
)
832+
upstream_lineage = self.generate_virtual_dataset_lineage(
833+
parsed_query_object, datasource_urn
834+
)
731835

732836
dataset_info = DatasetPropertiesClass(
733837
name=dataset.table_name,

metadata-ingestion/src/datahub/ingestion/source/vertexai.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -805,8 +805,7 @@ def _search_endpoint(self, model: Model) -> List[Endpoint]:
805805
endpoint_dict[resource.model].append(endpoint)
806806
self.endpoints = endpoint_dict
807807

808-
endpoints = self.endpoints.get(model.resource_name, [])
809-
return endpoints
808+
return self.endpoints.get(model.resource_name, [])
810809

811810
def _make_ml_model_urn(self, model_version: VersionInfo, model_name: str) -> str:
812811
urn = builder.make_ml_model_urn(

0 commit comments

Comments
 (0)