1
1
import json
2
2
import pathlib
3
+ from typing import Any , Dict , Union
3
4
4
5
import pytest
5
6
from freezegun .api import freeze_time
15
16
)
16
17
from datahub .ingestion .sink .file import write_metadata_file
17
18
from datahub .metadata .schema_classes import (
19
+ AuditStampClass ,
18
20
DatasetLineageTypeClass ,
21
+ EdgeClass ,
19
22
FineGrainedLineageClass ,
20
23
FineGrainedLineageDownstreamTypeClass ,
21
24
FineGrainedLineageUpstreamTypeClass ,
@@ -182,8 +185,66 @@ def test_basic_dashboard_patch_builder():
182
185
]
183
186
184
187
188
+ @pytest .mark .parametrize (
189
+ "created_on,last_modified,expected_actor" ,
190
+ [
191
+ (1586847600000 , 1586847600000 , "urn:li:corpuser:datahub" ),
192
+ (None , None , "urn:li:corpuser:datahub" ),
193
+ (1586847600000 , None , "urn:li:corpuser:datahub" ),
194
+ (None , 1586847600000 , "urn:li:corpuser:datahub" ),
195
+ ],
196
+ ids = ["both_timestamps" , "no_timestamps" , "only_created" , "only_modified" ],
197
+ )
185
198
@freeze_time ("2020-04-14 07:00:00" )
186
- def test_datajob_patch_builder ():
199
+ def test_datajob_patch_builder (created_on , last_modified , expected_actor ):
200
+ def make_edge_or_urn (urn : str ) -> Union [EdgeClass , str ]:
201
+ if created_on or last_modified :
202
+ return EdgeClass (
203
+ destinationUrn = str (urn ),
204
+ created = (
205
+ AuditStampClass (
206
+ time = created_on ,
207
+ actor = expected_actor ,
208
+ )
209
+ if created_on
210
+ else None
211
+ ),
212
+ lastModified = (
213
+ AuditStampClass (
214
+ time = last_modified ,
215
+ actor = expected_actor ,
216
+ )
217
+ if last_modified
218
+ else None
219
+ ),
220
+ )
221
+ return urn
222
+
223
+ def get_edge_expectation (urn : str ) -> Dict [str , Any ]:
224
+ if created_on or last_modified :
225
+ expected = {
226
+ "destinationUrn" : str (urn ),
227
+ "created" : (
228
+ AuditStampClass (
229
+ time = created_on ,
230
+ actor = expected_actor ,
231
+ ).to_obj ()
232
+ if created_on
233
+ else None
234
+ ),
235
+ "lastModified" : (
236
+ AuditStampClass (
237
+ time = last_modified ,
238
+ actor = expected_actor ,
239
+ ).to_obj ()
240
+ if last_modified
241
+ else None
242
+ ),
243
+ }
244
+ # filter out None values
245
+ return {k : v for k , v in expected .items () if v is not None }
246
+ return {"destinationUrn" : str (urn )}
247
+
187
248
flow_urn = make_data_flow_urn (
188
249
orchestrator = "nifi" , flow_id = "252C34e5af19-0192-1000-b248-b1abee565b5d"
189
250
)
@@ -193,13 +254,19 @@ def test_datajob_patch_builder():
193
254
patcher = DataJobPatchBuilder (job_urn )
194
255
195
256
patcher .add_output_dataset (
196
- "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
257
+ make_edge_or_urn (
258
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
259
+ )
197
260
)
198
261
patcher .add_output_dataset (
199
- "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
262
+ make_edge_or_urn (
263
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
264
+ )
200
265
)
201
266
patcher .add_output_dataset (
202
- "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
267
+ make_edge_or_urn (
268
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
269
+ )
203
270
)
204
271
205
272
assert patcher .build () == [
@@ -214,23 +281,23 @@ def test_datajob_patch_builder():
214
281
{
215
282
"op" : "add" ,
216
283
"path" : "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder1,DEV)" ,
217
- "value" : {
218
- "destinationUrn" : " urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)",
219
- } ,
284
+ "value" : get_edge_expectation (
285
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
286
+ ) ,
220
287
},
221
288
{
222
289
"op" : "add" ,
223
290
"path" : "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder3,DEV)" ,
224
- "value" : {
225
- "destinationUrn" : " urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)",
226
- } ,
291
+ "value" : get_edge_expectation (
292
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
293
+ ) ,
227
294
},
228
295
{
229
296
"op" : "add" ,
230
297
"path" : "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder2,DEV)" ,
231
- "value" : {
232
- "destinationUrn" : " urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)",
233
- } ,
298
+ "value" : get_edge_expectation (
299
+ "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
300
+ ) ,
234
301
},
235
302
]
236
303
).encode ("utf-8" ),
0 commit comments