Skip to content

Commit 9e5adc3

Browse files
authored
Merge branch 'master' into adding-subtypes-for-openapi-source
2 parents a18621e + 3df12dc commit 9e5adc3

File tree

15 files changed

+698
-77
lines changed

15 files changed

+698
-77
lines changed

.github/scripts/docker_helpers.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ function get_tag_full {
2424
}
2525

2626
function get_python_docker_release_v {
27-
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},1!0.0.0+docker.${SHORT_SHA},g" -e 's,refs/tags/v\(.*\),1!\1+docker,g' -e 's,refs/pull/\([0-9]*\).*,1!0.0.0+docker.pr\1,g')
27+
echo $(echo ${GITHUB_REF} | sed -e "s,refs/heads/${MAIN_BRANCH},1!0.0.0+docker.${SHORT_SHA},g" -e 's,refs/tags/v\([0-9a-zA-Z.]*\).*,1\!\1+docker,g' -e 's,refs/pull/\([0-9]*\).*,1!0.0.0+docker.pr\1,g')
2828
}
2929

3030
function get_unique_tag {

docs-website/docusaurus.config.js

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ module.exports = {
3535
},
3636
{
3737
src: "https://app.revenuehero.io/scheduler.min.js"
38+
},
39+
{
40+
src: "https://tag.clearbitscripts.com/v1/pk_2e321cabe30432a5c44c0424781aa35f/tags.js",
41+
referrerPolicy: "strict-origin-when-cross-origin"
3842
}
3943
],
4044
noIndex: isSaas,

metadata-ingestion/examples/ai/dh_ai_client.py

+16-15
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
ChangeTypeClass,
1515
DataProcessInstanceRunResultClass,
1616
DataProcessRunStatusClass,
17+
EdgeClass,
1718
)
1819
from datahub.metadata.urns import (
1920
ContainerUrn,
@@ -255,7 +256,7 @@ def create_model(
255256
version_props = {
256257
"version": version_tag,
257258
"versionSet": str(version_set_urn),
258-
"sortId": "AAAAAAAA",
259+
"sortId": str(version_tag).zfill(10),
259260
}
260261

261262
# Add alias if provided
@@ -266,22 +267,10 @@ def create_model(
266267
models.VersionPropertiesClass, version_props
267268
)
268269

269-
# Create version set properties
270-
version_set_properties = models.VersionSetPropertiesClass(
271-
latest=str(model_urn),
272-
versioningScheme="ALPHANUMERIC_GENERATED_BY_DATAHUB",
273-
)
274-
275270
mcps = [
276271
self._create_mcp(
277272
str(model_urn), properties, "mlModel", "mlModelProperties"
278273
),
279-
self._create_mcp(
280-
str(version_set_urn),
281-
version_set_properties,
282-
"versionSet",
283-
"versionSetProperties",
284-
),
285274
self._create_mcp(
286275
str(model_urn), version_properties, "mlModel", "versionProperties"
287276
),
@@ -429,7 +418,13 @@ def add_input_datasets_to_run(self, run_urn: str, dataset_urns: List[str]) -> No
429418
entity_urn=run_urn,
430419
entity_type="dataProcessInstance",
431420
aspect_name="dataProcessInstanceInput",
432-
aspect=DataProcessInstanceInput(inputs=dataset_urns),
421+
aspect=DataProcessInstanceInput(
422+
inputs=[],
423+
inputEdges=[
424+
EdgeClass(destinationUrn=str(dataset_urn))
425+
for dataset_urn in dataset_urns
426+
],
427+
),
433428
)
434429
self._emit_mcps(mcp)
435430
logger.info(f"Added input datasets to run {run_urn}")
@@ -440,7 +435,13 @@ def add_output_datasets_to_run(self, run_urn: str, dataset_urns: List[str]) -> N
440435
entity_urn=run_urn,
441436
entity_type="dataProcessInstance",
442437
aspect_name="dataProcessInstanceOutput",
443-
aspect=DataProcessInstanceOutput(outputs=dataset_urns),
438+
aspect=DataProcessInstanceOutput(
439+
outputEdges=[
440+
EdgeClass(destinationUrn=str(dataset_urn))
441+
for dataset_urn in dataset_urns
442+
],
443+
outputs=[],
444+
),
444445
)
445446
self._emit_mcps(mcp)
446447
logger.info(f"Added output datasets to run {run_urn}")

metadata-ingestion/examples/ai/dh_ai_client_sample.py

+14-14
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,19 @@
9696
end_timestamp=1628580001000,
9797
)
9898
# Create datasets
99-
input_dataset_urn = client.create_dataset(
100-
platform="snowflake",
101-
name="iris_input",
102-
)
99+
input_dataset_urns = [
100+
client.create_dataset(
101+
platform="snowflake",
102+
name="iris_input",
103+
)
104+
]
103105

104-
output_dataset_urn = client.create_dataset(
105-
platform="snowflake",
106-
name="iris_ouptut",
107-
)
106+
output_dataset_urns = [
107+
client.create_dataset(
108+
platform="snowflake",
109+
name="iris_ouptut",
110+
)
111+
]
108112

109113
# Add run to experiment
110114
client.add_run_to_experiment(run_urn=run_urn, experiment_urn=experiment_urn)
@@ -125,10 +129,6 @@
125129
)
126130

127131
# Add input and output datasets to run
128-
client.add_input_datasets_to_run(
129-
run_urn=run_urn, dataset_urns=[str(input_dataset_urn)]
130-
)
132+
client.add_input_datasets_to_run(run_urn=run_urn, dataset_urns=input_dataset_urns)
131133

132-
client.add_output_datasets_to_run(
133-
run_urn=run_urn, dataset_urns=[str(output_dataset_urn)]
134-
)
134+
client.add_output_datasets_to_run(run_urn=run_urn, dataset_urns=output_dataset_urns)

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

+16-13
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ def _parse_audit_log_row(
403403
res["session_id"],
404404
res["query_start_time"],
405405
object_modified_by_ddl,
406+
res["query_type"],
406407
)
407408
if known_ddl_entry:
408409
return known_ddl_entry
@@ -537,40 +538,42 @@ def parse_ddl_query(
537538
session_id: str,
538539
timestamp: datetime,
539540
object_modified_by_ddl: dict,
541+
query_type: str,
540542
) -> Optional[Union[TableRename, TableSwap]]:
541543
timestamp = timestamp.astimezone(timezone.utc)
542-
if object_modified_by_ddl[
543-
"operationType"
544-
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
545-
urn1 = self.identifiers.gen_dataset_urn(
544+
if (
545+
object_modified_by_ddl["operationType"] == "ALTER"
546+
and query_type == "RENAME_TABLE"
547+
and object_modified_by_ddl["properties"].get("objectName")
548+
):
549+
original_un = self.identifiers.gen_dataset_urn(
546550
self.identifiers.get_dataset_identifier_from_qualified_name(
547551
object_modified_by_ddl["objectName"]
548552
)
549553
)
550554

551-
urn2 = self.identifiers.gen_dataset_urn(
555+
new_urn = self.identifiers.gen_dataset_urn(
552556
self.identifiers.get_dataset_identifier_from_qualified_name(
553-
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
557+
object_modified_by_ddl["properties"]["objectName"]["value"]
554558
)
555559
)
556-
557-
return TableSwap(urn1, urn2, query, session_id, timestamp)
560+
return TableRename(original_un, new_urn, query, session_id, timestamp)
558561
elif object_modified_by_ddl[
559562
"operationType"
560-
] == "RENAME_TABLE" and object_modified_by_ddl["properties"].get("objectName"):
561-
original_un = self.identifiers.gen_dataset_urn(
563+
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
564+
urn1 = self.identifiers.gen_dataset_urn(
562565
self.identifiers.get_dataset_identifier_from_qualified_name(
563566
object_modified_by_ddl["objectName"]
564567
)
565568
)
566569

567-
new_urn = self.identifiers.gen_dataset_urn(
570+
urn2 = self.identifiers.gen_dataset_urn(
568571
self.identifiers.get_dataset_identifier_from_qualified_name(
569-
object_modified_by_ddl["properties"]["objectName"]["value"]
572+
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
570573
)
571574
)
572575

573-
return TableRename(original_un, new_urn, query, session_id, timestamp)
576+
return TableSwap(urn1, urn2, query, session_id, timestamp)
574577
else:
575578
self.report.num_ddl_queries_dropped += 1
576579
return None

metadata-ingestion/src/datahub/ingestion/source/vertexai.py

+10-12
Original file line numberDiff line numberDiff line change
@@ -358,12 +358,15 @@ def _get_project_container(self) -> ProjectIdKey:
358358
return ProjectIdKey(project_id=self.config.project_id, platform=self.platform)
359359

360360
def _is_automl_job(self, job: VertexAiResourceNoun) -> bool:
361-
return (
362-
isinstance(job, AutoMLTabularTrainingJob)
363-
or isinstance(job, AutoMLTextTrainingJob)
364-
or isinstance(job, AutoMLImageTrainingJob)
365-
or isinstance(job, AutoMLVideoTrainingJob)
366-
or isinstance(job, AutoMLForecastingTrainingJob)
361+
return isinstance(
362+
job,
363+
(
364+
AutoMLTabularTrainingJob,
365+
AutoMLTextTrainingJob,
366+
AutoMLImageTrainingJob,
367+
AutoMLVideoTrainingJob,
368+
AutoMLForecastingTrainingJob,
369+
),
367370
)
368371

369372
def _search_model_version(
@@ -618,12 +621,7 @@ def _search_endpoint(self, model: Model) -> List[Endpoint]:
618621
endpoint_dict[resource.model].append(endpoint)
619622
self.endpoints = endpoint_dict
620623

621-
endpoints = (
622-
self.endpoints[model.resource_name]
623-
if model.resource_name in self.endpoints
624-
else []
625-
)
626-
return endpoints
624+
return self.endpoints.get(model.resource_name, [])
627625

628626
def _make_ml_model_urn(self, model_version: VersionInfo, model_name: str) -> str:
629627
urn = builder.make_ml_model_urn(

metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py

+119-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
from typing import Any, Dict
23
from unittest.mock import MagicMock, patch
34

@@ -16,18 +17,27 @@
1617
from datahub.ingestion.source.snowflake.oauth_config import OAuthConfiguration
1718
from datahub.ingestion.source.snowflake.snowflake_config import (
1819
DEFAULT_TEMP_TABLES_PATTERNS,
20+
SnowflakeIdentifierConfig,
1921
SnowflakeV2Config,
2022
)
2123
from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import UpstreamLineageEdge
24+
from datahub.ingestion.source.snowflake.snowflake_queries import (
25+
SnowflakeQueriesExtractor,
26+
SnowflakeQueriesExtractorConfig,
27+
)
2228
from datahub.ingestion.source.snowflake.snowflake_query import (
2329
SnowflakeQuery,
2430
create_deny_regex_sql_filter,
2531
)
2632
from datahub.ingestion.source.snowflake.snowflake_usage_v2 import (
2733
SnowflakeObjectAccessEntry,
2834
)
29-
from datahub.ingestion.source.snowflake.snowflake_utils import SnowsightUrlBuilder
35+
from datahub.ingestion.source.snowflake.snowflake_utils import (
36+
SnowflakeIdentifierBuilder,
37+
SnowsightUrlBuilder,
38+
)
3039
from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source
40+
from datahub.sql_parsing.sql_parsing_aggregator import TableRename, TableSwap
3141
from datahub.testing.doctest import assert_doctest
3242
from tests.test_helpers import test_connection_helpers
3343

@@ -689,3 +699,111 @@ def test_snowflake_query_result_parsing():
689699
],
690700
}
691701
assert UpstreamLineageEdge.parse_obj(db_row)
702+
703+
704+
class TestDDLProcessing:
705+
@pytest.fixture
706+
def session_id(self):
707+
return "14774700483022321"
708+
709+
@pytest.fixture
710+
def timestamp(self):
711+
return datetime.datetime(
712+
year=2025, month=2, day=3, hour=15, minute=1, second=43
713+
).astimezone(datetime.timezone.utc)
714+
715+
@pytest.fixture
716+
def extractor(self) -> SnowflakeQueriesExtractor:
717+
connection = MagicMock()
718+
config = SnowflakeQueriesExtractorConfig()
719+
structured_report = MagicMock()
720+
filters = MagicMock()
721+
structured_report.num_ddl_queries_dropped = 0
722+
identifier_config = SnowflakeIdentifierConfig()
723+
identifiers = SnowflakeIdentifierBuilder(identifier_config, structured_report)
724+
return SnowflakeQueriesExtractor(
725+
connection, config, structured_report, filters, identifiers
726+
)
727+
728+
def test_ddl_processing_alter_table_rename(self, extractor, session_id, timestamp):
729+
query = "ALTER TABLE person_info_loading RENAME TO person_info_final;"
730+
object_modified_by_ddl = {
731+
"objectDomain": "Table",
732+
"objectId": 1789034,
733+
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO_LOADING",
734+
"operationType": "ALTER",
735+
"properties": {
736+
"objectName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_FINAL"}
737+
},
738+
}
739+
query_type = "RENAME_TABLE"
740+
741+
ddl = extractor.parse_ddl_query(
742+
query, session_id, timestamp, object_modified_by_ddl, query_type
743+
)
744+
745+
assert ddl == TableRename(
746+
original_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_loading,PROD)",
747+
new_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_final,PROD)",
748+
query=query,
749+
session_id=session_id,
750+
timestamp=timestamp,
751+
), "Processing ALTER ... RENAME should result in a proper TableRename object"
752+
753+
def test_ddl_processing_alter_table_add_column(
754+
self, extractor, session_id, timestamp
755+
):
756+
query = "ALTER TABLE person_info ADD year BIGINT"
757+
object_modified_by_ddl = {
758+
"objectDomain": "Table",
759+
"objectId": 2612260,
760+
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO",
761+
"operationType": "ALTER",
762+
"properties": {
763+
"columns": {
764+
"BIGINT": {
765+
"objectId": {"value": 8763407},
766+
"subOperationType": "ADD",
767+
}
768+
}
769+
},
770+
}
771+
query_type = "ALTER_TABLE_ADD_COLUMN"
772+
773+
ddl = extractor.parse_ddl_query(
774+
query, session_id, timestamp, object_modified_by_ddl, query_type
775+
)
776+
777+
assert ddl is None, (
778+
"For altering columns statement ddl parsing should return None"
779+
)
780+
assert extractor.report.num_ddl_queries_dropped == 1, (
781+
"Dropped ddls should be properly counted"
782+
)
783+
784+
def test_ddl_processing_alter_table_swap(self, extractor, session_id, timestamp):
785+
query = "ALTER TABLE person_info SWAP WITH person_info_swap;"
786+
object_modified_by_ddl = {
787+
"objectDomain": "Table",
788+
"objectId": 3776835,
789+
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO",
790+
"operationType": "ALTER",
791+
"properties": {
792+
"swapTargetDomain": {"value": "Table"},
793+
"swapTargetId": {"value": 3786260},
794+
"swapTargetName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_SWAP"},
795+
},
796+
}
797+
query_type = "ALTER"
798+
799+
ddl = extractor.parse_ddl_query(
800+
query, session_id, timestamp, object_modified_by_ddl, query_type
801+
)
802+
803+
assert ddl == TableSwap(
804+
urn1="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info,PROD)",
805+
urn2="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_swap,PROD)",
806+
query=query,
807+
session_id=session_id,
808+
timestamp=timestamp,
809+
), "Processing ALTER ... SWAP DDL should result in a proper TableSwap object"

metadata-io/build.gradle

-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ dependencies {
3131

3232
implementation externalDependency.guava
3333
implementation externalDependency.reflections
34-
// https://mvnrepository.com/artifact/nl.basjes.parse.useragent/yauaa
35-
implementation 'nl.basjes.parse.useragent:yauaa:7.27.0'
3634

3735
api(externalDependency.dgraph4j) {
3836
exclude group: 'com.google.guava', module: 'guava'

0 commit comments

Comments
 (0)