Skip to content

Commit 494c522

Browse files
fix(ingest/mssql): add container dataflow/ datajob entities (#12194)
1 parent 8350a4e commit 494c522

File tree

6 files changed

+795
-36
lines changed

6 files changed

+795
-36
lines changed

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py

+26
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
make_data_platform_urn,
88
make_dataplatform_instance_urn,
99
)
10+
from datahub.emitter.mcp_builder import DatabaseKey
1011
from datahub.metadata.schema_classes import (
12+
ContainerClass,
1113
DataFlowInfoClass,
1214
DataJobInfoClass,
1315
DataJobInputOutputClass,
@@ -210,6 +212,18 @@ def as_datajob_info_aspect(self) -> DataJobInfoClass:
210212
status=self.status,
211213
)
212214

215+
@property
216+
def as_container_aspect(self) -> ContainerClass:
217+
databaseKey = DatabaseKey(
218+
platform=self.entity.flow.orchestrator,
219+
instance=self.entity.flow.platform_instance
220+
if self.entity.flow.platform_instance
221+
else None,
222+
env=self.entity.flow.env,
223+
database=self.entity.flow.db,
224+
)
225+
return ContainerClass(container=databaseKey.as_urn())
226+
213227
@property
214228
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
215229
if self.entity.flow.platform_instance:
@@ -257,6 +271,18 @@ def as_dataflow_info_aspect(self) -> DataFlowInfoClass:
257271
externalUrl=self.external_url,
258272
)
259273

274+
@property
275+
def as_container_aspect(self) -> ContainerClass:
276+
databaseKey = DatabaseKey(
277+
platform=self.entity.orchestrator,
278+
instance=self.entity.platform_instance
279+
if self.entity.platform_instance
280+
else None,
281+
env=self.entity.env,
282+
database=self.entity.db,
283+
)
284+
return ContainerClass(container=databaseKey.as_urn())
285+
260286
@property
261287
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
262288
if self.entity.platform_instance:

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py

+10
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,11 @@ def construct_job_workunits(
639639
aspect=data_job.as_datajob_info_aspect,
640640
).as_workunit()
641641

642+
yield MetadataChangeProposalWrapper(
643+
entityUrn=data_job.urn,
644+
aspect=data_job.as_container_aspect,
645+
).as_workunit()
646+
642647
data_platform_instance_aspect = data_job.as_maybe_platform_instance_aspect
643648
if data_platform_instance_aspect:
644649
yield MetadataChangeProposalWrapper(
@@ -662,6 +667,11 @@ def construct_flow_workunits(
662667
aspect=data_flow.as_dataflow_info_aspect,
663668
).as_workunit()
664669

670+
yield MetadataChangeProposalWrapper(
671+
entityUrn=data_flow.urn,
672+
aspect=data_flow.as_container_aspect,
673+
).as_workunit()
674+
665675
data_platform_instance_aspect = data_flow.as_maybe_platform_instance_aspect
666676
if data_platform_instance_aspect:
667677
yield MetadataChangeProposalWrapper(

metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_to_file.json

+196-11
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,43 @@
105105
"lastRunId": "no-run-id-provided"
106106
}
107107
},
108+
{
109+
"entityType": "dataFlow",
110+
"entityUrn": "urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD)",
111+
"changeType": "UPSERT",
112+
"aspectName": "container",
113+
"aspect": {
114+
"json": {
115+
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
116+
}
117+
},
118+
"systemMetadata": {
119+
"lastObserved": 1615443388097,
120+
"runId": "mssql-test",
121+
"lastRunId": "no-run-id-provided"
122+
}
123+
},
124+
{
125+
"entityType": "dataFlow",
126+
"entityUrn": "urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD)",
127+
"changeType": "UPSERT",
128+
"aspectName": "browsePathsV2",
129+
"aspect": {
130+
"json": {
131+
"path": [
132+
{
133+
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
134+
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
135+
}
136+
]
137+
}
138+
},
139+
"systemMetadata": {
140+
"lastObserved": 1615443388097,
141+
"runId": "mssql-test",
142+
"lastRunId": "no-run-id-provided"
143+
}
144+
},
108145
{
109146
"entityType": "dataJob",
110147
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
@@ -113,11 +150,11 @@
113150
"aspect": {
114151
"json": {
115152
"customProperties": {
116-
"job_id": "c2d77890-83ba-435f-879b-1c77fa38dd47",
153+
"job_id": "ab960f9d-30f3-4ced-b558-4f9b6671b6dd",
117154
"job_name": "Weekly Demo Data Backup",
118155
"description": "No description available.",
119-
"date_created": "2024-12-05 16:44:43.910000",
120-
"date_modified": "2024-12-05 16:44:44.043000",
156+
"date_created": "2024-12-20 15:15:24.483000",
157+
"date_modified": "2024-12-20 15:15:24.653000",
121158
"step_id": "1",
122159
"step_name": "Set database to read only",
123160
"subsystem": "TSQL",
@@ -136,6 +173,22 @@
136173
"lastRunId": "no-run-id-provided"
137174
}
138175
},
176+
{
177+
"entityType": "dataJob",
178+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
179+
"changeType": "UPSERT",
180+
"aspectName": "container",
181+
"aspect": {
182+
"json": {
183+
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
184+
}
185+
},
186+
"systemMetadata": {
187+
"lastObserved": 1615443388097,
188+
"runId": "mssql-test",
189+
"lastRunId": "no-run-id-provided"
190+
}
191+
},
139192
{
140193
"entityType": "dataJob",
141194
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
@@ -154,6 +207,27 @@
154207
"lastRunId": "no-run-id-provided"
155208
}
156209
},
210+
{
211+
"entityType": "dataJob",
212+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
213+
"changeType": "UPSERT",
214+
"aspectName": "browsePathsV2",
215+
"aspect": {
216+
"json": {
217+
"path": [
218+
{
219+
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
220+
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
221+
}
222+
]
223+
}
224+
},
225+
"systemMetadata": {
226+
"lastObserved": 1615443388097,
227+
"runId": "mssql-test",
228+
"lastRunId": "no-run-id-provided"
229+
}
230+
},
157231
{
158232
"entityType": "container",
159233
"entityUrn": "urn:li:container:7da983a1581c33cce8a106587b150f02",
@@ -2103,8 +2177,8 @@
21032177
{
21042178
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
21052179
"customProperties": {
2106-
"view_definition": "CREATE VIEW Foo.PersonsView AS SELECT * FROM Foo.Persons;\n",
2107-
"is_view": "True"
2180+
"is_view": "True",
2181+
"view_definition": "CREATE VIEW Foo.PersonsView AS SELECT * FROM Foo.Persons;\n"
21082182
},
21092183
"name": "PersonsView",
21102184
"tags": []
@@ -2269,6 +2343,43 @@
22692343
"lastRunId": "no-run-id-provided"
22702344
}
22712345
},
2346+
{
2347+
"entityType": "dataFlow",
2348+
"entityUrn": "urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD)",
2349+
"changeType": "UPSERT",
2350+
"aspectName": "container",
2351+
"aspect": {
2352+
"json": {
2353+
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
2354+
}
2355+
},
2356+
"systemMetadata": {
2357+
"lastObserved": 1615443388097,
2358+
"runId": "mssql-test",
2359+
"lastRunId": "no-run-id-provided"
2360+
}
2361+
},
2362+
{
2363+
"entityType": "dataFlow",
2364+
"entityUrn": "urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD)",
2365+
"changeType": "UPSERT",
2366+
"aspectName": "browsePathsV2",
2367+
"aspect": {
2368+
"json": {
2369+
"path": [
2370+
{
2371+
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
2372+
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
2373+
}
2374+
]
2375+
}
2376+
},
2377+
"systemMetadata": {
2378+
"lastObserved": 1615443388097,
2379+
"runId": "mssql-test",
2380+
"lastRunId": "no-run-id-provided"
2381+
}
2382+
},
22722383
{
22732384
"entityType": "dataJob",
22742385
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
@@ -2282,8 +2393,8 @@
22822393
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
22832394
"input parameters": "['@ID']",
22842395
"parameter @ID": "{'type': 'int'}",
2285-
"date_created": "2024-12-05 16:44:43.800000",
2286-
"date_modified": "2024-12-05 16:44:43.800000"
2396+
"date_created": "2024-12-20 15:15:24.290000",
2397+
"date_modified": "2024-12-20 15:15:24.290000"
22872398
},
22882399
"externalUrl": "",
22892400
"name": "DemoData.Foo.Proc.With.SpecialChar",
@@ -2298,6 +2409,43 @@
22982409
"lastRunId": "no-run-id-provided"
22992410
}
23002411
},
2412+
{
2413+
"entityType": "dataJob",
2414+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
2415+
"changeType": "UPSERT",
2416+
"aspectName": "container",
2417+
"aspect": {
2418+
"json": {
2419+
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
2420+
}
2421+
},
2422+
"systemMetadata": {
2423+
"lastObserved": 1615443388097,
2424+
"runId": "mssql-test",
2425+
"lastRunId": "no-run-id-provided"
2426+
}
2427+
},
2428+
{
2429+
"entityType": "dataJob",
2430+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
2431+
"changeType": "UPSERT",
2432+
"aspectName": "browsePathsV2",
2433+
"aspect": {
2434+
"json": {
2435+
"path": [
2436+
{
2437+
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
2438+
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
2439+
}
2440+
]
2441+
}
2442+
},
2443+
"systemMetadata": {
2444+
"lastObserved": 1615443388097,
2445+
"runId": "mssql-test",
2446+
"lastRunId": "no-run-id-provided"
2447+
}
2448+
},
23012449
{
23022450
"entityType": "dataJob",
23032451
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
@@ -2310,8 +2458,8 @@
23102458
"depending_on_procedure": "{}",
23112459
"code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n",
23122460
"input parameters": "[]",
2313-
"date_created": "2024-12-05 16:44:43.803000",
2314-
"date_modified": "2024-12-05 16:44:43.803000"
2461+
"date_created": "2024-12-20 15:15:24.300000",
2462+
"date_modified": "2024-12-20 15:15:24.300000"
23152463
},
23162464
"externalUrl": "",
23172465
"name": "DemoData.Foo.NewProc",
@@ -2326,6 +2474,43 @@
23262474
"lastRunId": "no-run-id-provided"
23272475
}
23282476
},
2477+
{
2478+
"entityType": "dataJob",
2479+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
2480+
"changeType": "UPSERT",
2481+
"aspectName": "container",
2482+
"aspect": {
2483+
"json": {
2484+
"container": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
2485+
}
2486+
},
2487+
"systemMetadata": {
2488+
"lastObserved": 1615443388097,
2489+
"runId": "mssql-test",
2490+
"lastRunId": "no-run-id-provided"
2491+
}
2492+
},
2493+
{
2494+
"entityType": "dataJob",
2495+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
2496+
"changeType": "UPSERT",
2497+
"aspectName": "browsePathsV2",
2498+
"aspect": {
2499+
"json": {
2500+
"path": [
2501+
{
2502+
"id": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63",
2503+
"urn": "urn:li:container:a327c3b1f5aadd4524158aeb5121be63"
2504+
}
2505+
]
2506+
}
2507+
},
2508+
"systemMetadata": {
2509+
"lastObserved": 1615443388097,
2510+
"runId": "mssql-test",
2511+
"lastRunId": "no-run-id-provided"
2512+
}
2513+
},
23292514
{
23302515
"entityType": "container",
23312516
"entityUrn": "urn:li:container:250ce23f940485303fa5e5d4f5194975",
@@ -4427,8 +4612,8 @@
44274612
{
44284613
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
44294614
"customProperties": {
4430-
"view_definition": "CREATE VIEW FooNew.View1 AS\nSELECT LastName, FirstName\nFROM FooNew.PersonsNew\nWHERE Age > 18\n",
4431-
"is_view": "True"
4615+
"is_view": "True",
4616+
"view_definition": "CREATE VIEW FooNew.View1 AS\nSELECT LastName, FirstName\nFROM FooNew.PersonsNew\nWHERE Age > 18\n"
44324617
},
44334618
"name": "View1",
44344619
"tags": []

0 commit comments

Comments
 (0)