Skip to content

Commit 6998167

Browse files
feat(mssql): adds subtypes aspect for dataflow and datajobs (#12775)
1 parent 85d3a9d commit 6998167

File tree

7 files changed

+420
-36
lines changed

7 files changed

+420
-36
lines changed

metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py

+7
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,15 @@ class BIContainerSubTypes(StrEnum):
6060
MODE_COLLECTION = "Collection"
6161

6262

63+
class FlowContainerSubTypes(StrEnum):
64+
MSSQL_JOB = "Job"
65+
MSSQL_PROCEDURE_CONTAINER = "Procedures Container"
66+
67+
6368
class JobContainerSubTypes(StrEnum):
6469
NIFI_PROCESS_GROUP = "Process Group"
70+
MSSQL_JOBSTEP = "Job Step"
71+
MSSQL_STORED_PROCEDURE = "Stored Procedure"
6572

6673

6774
class BIAssetSubTypes(StrEnum):

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py

+29
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@
1111
DatabaseKey,
1212
SchemaKey,
1313
)
14+
from datahub.ingestion.source.common.subtypes import (
15+
FlowContainerSubTypes,
16+
JobContainerSubTypes,
17+
)
1418
from datahub.metadata.schema_classes import (
1519
ContainerClass,
1620
DataFlowInfoClass,
1721
DataJobInfoClass,
1822
DataJobInputOutputClass,
1923
DataPlatformInstanceClass,
24+
SubTypesClass,
2025
)
2126

2227

@@ -211,6 +216,18 @@ def as_datajob_info_aspect(self) -> DataJobInfoClass:
211216
status=self.status,
212217
)
213218

219+
@property
220+
def as_subtypes_aspect(self) -> SubTypesClass:
221+
assert isinstance(self.entity, (JobStep, StoredProcedure))
222+
type = (
223+
JobContainerSubTypes.MSSQL_JOBSTEP
224+
if isinstance(self.entity, JobStep)
225+
else JobContainerSubTypes.MSSQL_STORED_PROCEDURE
226+
)
227+
return SubTypesClass(
228+
typeNames=[type],
229+
)
230+
214231
@property
215232
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
216233
if self.entity.flow.platform_instance:
@@ -276,6 +293,18 @@ def as_dataflow_info_aspect(self) -> DataFlowInfoClass:
276293
externalUrl=self.external_url,
277294
)
278295

296+
@property
297+
def as_subtypes_aspect(self) -> SubTypesClass:
298+
assert isinstance(self.entity, (MSSQLJob, MSSQLProceduresContainer))
299+
type = (
300+
FlowContainerSubTypes.MSSQL_JOB
301+
if isinstance(self.entity, MSSQLJob)
302+
else FlowContainerSubTypes.MSSQL_PROCEDURE_CONTAINER
303+
)
304+
return SubTypesClass(
305+
typeNames=[type],
306+
)
307+
279308
@property
280309
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
281310
if self.entity.platform_instance:

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,11 @@ def construct_job_workunits(
638638
aspect=data_job.as_datajob_info_aspect,
639639
).as_workunit()
640640

641+
yield MetadataChangeProposalWrapper(
642+
entityUrn=data_job.urn,
643+
aspect=data_job.as_subtypes_aspect,
644+
).as_workunit()
645+
641646
data_platform_instance_aspect = data_job.as_maybe_platform_instance_aspect
642647
if data_platform_instance_aspect:
643648
yield MetadataChangeProposalWrapper(
@@ -676,8 +681,6 @@ def construct_job_workunits(
676681
),
677682
).as_workunit()
678683

679-
# TODO: Add SubType when it appear
680-
681684
def construct_flow_workunits(
682685
self,
683686
data_flow: MSSQLDataFlow,
@@ -687,6 +690,11 @@ def construct_flow_workunits(
687690
aspect=data_flow.as_dataflow_info_aspect,
688691
).as_workunit()
689692

693+
yield MetadataChangeProposalWrapper(
694+
entityUrn=data_flow.urn,
695+
aspect=data_flow.as_subtypes_aspect,
696+
).as_workunit()
697+
690698
data_platform_instance_aspect = data_flow.as_maybe_platform_instance_aspect
691699
if data_platform_instance_aspect:
692700
yield MetadataChangeProposalWrapper(
@@ -700,8 +708,6 @@ def construct_flow_workunits(
700708
aspect=data_flow.as_container_aspect,
701709
).as_workunit()
702710

703-
# TODO: Add SubType when it appear
704-
705711
def get_inspectors(self) -> Iterable[Inspector]:
706712
# This method can be overridden in the case that you want to dynamically
707713
# run on multiple databases.

metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json

+99-9
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,24 @@
104104
"lastRunId": "no-run-id-provided"
105105
}
106106
},
107+
{
108+
"entityType": "dataFlow",
109+
"entityUrn": "urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD)",
110+
"changeType": "UPSERT",
111+
"aspectName": "subTypes",
112+
"aspect": {
113+
"json": {
114+
"typeNames": [
115+
"Job"
116+
]
117+
}
118+
},
119+
"systemMetadata": {
120+
"lastObserved": 1615443388097,
121+
"runId": "mssql-test",
122+
"lastRunId": "no-run-id-provided"
123+
}
124+
},
107125
{
108126
"entityType": "dataJob",
109127
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
@@ -112,11 +130,11 @@
112130
"aspect": {
113131
"json": {
114132
"customProperties": {
115-
"job_id": "2fc72675-0c68-4260-ab00-c361b96c8c36",
133+
"job_id": "ae341aad-8ab2-421e-b46b-147afd4b0705",
116134
"job_name": "Weekly Demo Data Backup",
117135
"description": "No description available.",
118-
"date_created": "2025-01-31 08:02:41.167000",
119-
"date_modified": "2025-01-31 08:02:41.360000",
136+
"date_created": "2025-03-04 16:55:50.893000",
137+
"date_modified": "2025-03-04 16:55:51.043000",
120138
"step_id": "1",
121139
"step_name": "Set database to read only",
122140
"subsystem": "TSQL",
@@ -134,6 +152,24 @@
134152
"lastRunId": "no-run-id-provided"
135153
}
136154
},
155+
{
156+
"entityType": "dataJob",
157+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
158+
"changeType": "UPSERT",
159+
"aspectName": "subTypes",
160+
"aspect": {
161+
"json": {
162+
"typeNames": [
163+
"Job Step"
164+
]
165+
}
166+
},
167+
"systemMetadata": {
168+
"lastObserved": 1615443388097,
169+
"runId": "mssql-test",
170+
"lastRunId": "no-run-id-provided"
171+
}
172+
},
137173
{
138174
"entityType": "dataJob",
139175
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
@@ -2266,6 +2302,24 @@
22662302
"lastRunId": "no-run-id-provided"
22672303
}
22682304
},
2305+
{
2306+
"entityType": "dataFlow",
2307+
"entityUrn": "urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD)",
2308+
"changeType": "UPSERT",
2309+
"aspectName": "subTypes",
2310+
"aspect": {
2311+
"json": {
2312+
"typeNames": [
2313+
"Procedures Container"
2314+
]
2315+
}
2316+
},
2317+
"systemMetadata": {
2318+
"lastObserved": 1615443388097,
2319+
"runId": "mssql-test",
2320+
"lastRunId": "no-run-id-provided"
2321+
}
2322+
},
22692323
{
22702324
"entityType": "dataJob",
22712325
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
@@ -2279,8 +2333,8 @@
22792333
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
22802334
"input parameters": "['@ID']",
22812335
"parameter @ID": "{'type': 'int'}",
2282-
"date_created": "2025-01-31 08:02:40.980000",
2283-
"date_modified": "2025-01-31 08:02:40.980000"
2336+
"date_created": "2025-03-04 16:55:50.720000",
2337+
"date_modified": "2025-03-04 16:55:50.720000"
22842338
},
22852339
"name": "DemoData.Foo.Proc.With.SpecialChar",
22862340
"type": {
@@ -2294,6 +2348,24 @@
22942348
"lastRunId": "no-run-id-provided"
22952349
}
22962350
},
2351+
{
2352+
"entityType": "dataJob",
2353+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
2354+
"changeType": "UPSERT",
2355+
"aspectName": "subTypes",
2356+
"aspect": {
2357+
"json": {
2358+
"typeNames": [
2359+
"Stored Procedure"
2360+
]
2361+
}
2362+
},
2363+
"systemMetadata": {
2364+
"lastObserved": 1615443388097,
2365+
"runId": "mssql-test",
2366+
"lastRunId": "no-run-id-provided"
2367+
}
2368+
},
22972369
{
22982370
"entityType": "dataJob",
22992371
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
@@ -2329,8 +2401,8 @@
23292401
"depending_on_procedure": "{}",
23302402
"code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n",
23312403
"input parameters": "[]",
2332-
"date_created": "2025-01-31 08:02:40.987000",
2333-
"date_modified": "2025-01-31 08:02:40.987000"
2404+
"date_created": "2025-03-04 16:55:50.727000",
2405+
"date_modified": "2025-03-04 16:55:50.727000"
23342406
},
23352407
"name": "DemoData.Foo.NewProc",
23362408
"type": {
@@ -2344,6 +2416,24 @@
23442416
"lastRunId": "no-run-id-provided"
23452417
}
23462418
},
2419+
{
2420+
"entityType": "dataJob",
2421+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
2422+
"changeType": "UPSERT",
2423+
"aspectName": "subTypes",
2424+
"aspect": {
2425+
"json": {
2426+
"typeNames": [
2427+
"Stored Procedure"
2428+
]
2429+
}
2430+
},
2431+
"systemMetadata": {
2432+
"lastObserved": 1615443388097,
2433+
"runId": "mssql-test",
2434+
"lastRunId": "no-run-id-provided"
2435+
}
2436+
},
23472437
{
23482438
"entityType": "dataJob",
23492439
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
@@ -4969,7 +5059,7 @@
49695059
"actor": "urn:li:corpuser:_ingestion"
49705060
},
49715061
"lastModified": {
4972-
"time": 1738310563767,
5062+
"time": 1741107354163,
49735063
"actor": "urn:li:corpuser:_ingestion"
49745064
}
49755065
}
@@ -5092,7 +5182,7 @@
50925182
"actor": "urn:li:corpuser:_ingestion"
50935183
},
50945184
"lastModified": {
5095-
"time": 1738310563770,
5185+
"time": 1741107354165,
50965186
"actor": "urn:li:corpuser:_ingestion"
50975187
}
50985188
}

0 commit comments

Comments
 (0)