34
34
import com .linkedin .metadata .aspect .patch .builder .GlobalTagsPatchBuilder ;
35
35
import com .linkedin .metadata .aspect .patch .builder .UpstreamLineagePatchBuilder ;
36
36
import com .linkedin .metadata .key .DatasetKey ;
37
+ import com .linkedin .mxe .MetadataChangeProposal ;
37
38
import datahub .event .EventFormatter ;
38
39
import datahub .event .MetadataChangeProposalWrapper ;
39
40
import io .datahubproject .openlineage .config .DatahubOpenlineageConfig ;
56
57
import lombok .Setter ;
57
58
import lombok .ToString ;
58
59
import lombok .extern .slf4j .Slf4j ;
59
- import com .linkedin .mxe .MetadataChangeProposal ;
60
60
import org .apache .commons .lang3 .StringUtils ;
61
61
import org .apache .commons .lang3 .tuple .Pair ;
62
62
63
-
64
63
@ EqualsAndHashCode
65
64
@ Getter
66
65
@ Setter
@@ -102,7 +101,8 @@ public static MetadataChangeProposalWrapper materializeDataset(DatasetUrn datase
102
101
.setPlatform (new DataPlatformUrn (datasetUrn .getPlatformEntity ().getPlatformNameEntity ()));
103
102
104
103
return MetadataChangeProposalWrapper .create (
105
- b -> b .entityType (DATASET_ENTITY_TYPE ).entityUrn (datasetUrn ).upsert ().aspect (datasetAspect ));
104
+ b ->
105
+ b .entityType (DATASET_ENTITY_TYPE ).entityUrn (datasetUrn ).upsert ().aspect (datasetAspect ));
106
106
}
107
107
108
108
public List <MetadataChangeProposal > toMcps (DatahubOpenlineageConfig config ) throws IOException {
@@ -113,7 +113,6 @@ public List<MetadataChangeProposal> toMcps(DatahubOpenlineageConfig config) thro
113
113
addAspectToMcps (flowUrn , DATA_FLOW_ENTITY_TYPE , dataFlowInfo , mcps );
114
114
generateStatus (flowUrn , DATA_FLOW_ENTITY_TYPE , mcps );
115
115
116
-
117
116
// Generate and add PlatformInstance Aspect
118
117
if (flowPlatformInstance != null ) {
119
118
addAspectToMcps (flowUrn , DATA_FLOW_ENTITY_TYPE , flowPlatformInstance , mcps );
@@ -154,7 +153,7 @@ public List<MetadataChangeProposal> toMcps(DatahubOpenlineageConfig config) thro
154
153
UrnArray inputUrnArray = inputsTuple .getLeft ();
155
154
EdgeArray inputEdges = inputsTuple .getRight ();
156
155
157
- Pair <UrnArray , EdgeArray > outputTuple = processDownstreams (config , mcps );
156
+ Pair <UrnArray , EdgeArray > outputTuple = processDownstreams (config , mcps );
158
157
UrnArray outputUrnArray = outputTuple .getLeft ();
159
158
EdgeArray outputEdges = outputTuple .getRight ();
160
159
@@ -168,11 +167,16 @@ public List<MetadataChangeProposal> toMcps(DatahubOpenlineageConfig config) thro
168
167
return mcps ;
169
168
}
170
169
171
- private void generateDataJobInputOutputMcp (EdgeArray inputEdges , EdgeArray outputEdges , DatahubOpenlineageConfig config , List <MetadataChangeProposal > mcps ) {
170
+ private void generateDataJobInputOutputMcp (
171
+ EdgeArray inputEdges ,
172
+ EdgeArray outputEdges ,
173
+ DatahubOpenlineageConfig config ,
174
+ List <MetadataChangeProposal > mcps ) {
172
175
DataJobInputOutput dataJobInputOutput = new DataJobInputOutput ();
173
176
log .info ("Adding DataJob edges to {}" , jobUrn );
174
177
if (config .isUsePatch ()) {
175
- DataJobInputOutputPatchBuilder dataJobInputOutputPatchBuilder = new DataJobInputOutputPatchBuilder ().urn (jobUrn );
178
+ DataJobInputOutputPatchBuilder dataJobInputOutputPatchBuilder =
179
+ new DataJobInputOutputPatchBuilder ().urn (jobUrn );
176
180
for (DatahubDataset dataset : inSet ) {
177
181
dataJobInputOutputPatchBuilder .addInputDatasetEdge (dataset .getUrn ());
178
182
}
@@ -183,7 +187,9 @@ private void generateDataJobInputOutputMcp(EdgeArray inputEdges, EdgeArray outpu
183
187
dataJobInputOutputPatchBuilder .addInputDatajobEdge (parentJob );
184
188
}
185
189
MetadataChangeProposal dataJobInputOutputMcp = dataJobInputOutputPatchBuilder .build ();
186
- log .info ("dataJobInputOutputMcp: {}" , dataJobInputOutputMcp .getAspect ().getValue ().asString (Charset .defaultCharset ()));
190
+ log .info (
191
+ "dataJobInputOutputMcp: {}" ,
192
+ dataJobInputOutputMcp .getAspect ().getValue ().asString (Charset .defaultCharset ()));
187
193
mcps .add (dataJobInputOutputPatchBuilder .build ());
188
194
189
195
} else {
@@ -194,32 +200,41 @@ private void generateDataJobInputOutputMcp(EdgeArray inputEdges, EdgeArray outpu
194
200
DataJobUrnArray parentDataJobUrnArray = new DataJobUrnArray ();
195
201
parentDataJobUrnArray .addAll (parentJobs );
196
202
197
- log .info ("Adding input data jobs {} Number of jobs: {}" , jobUrn , parentDataJobUrnArray .size ());
203
+ log .info (
204
+ "Adding input data jobs {} Number of jobs: {}" , jobUrn , parentDataJobUrnArray .size ());
198
205
dataJobInputOutput .setInputDatajobs (parentDataJobUrnArray );
199
206
addAspectToMcps (jobUrn , DATAJOB_ENTITY_TYPE , dataJobInputOutput , mcps );
200
207
}
201
208
}
202
209
203
- private void generateDataProcessInstanceMcp (UrnArray inputUrnArray , UrnArray outputUrnArray , List <MetadataChangeProposal > mcps ) {
210
+ private void generateDataProcessInstanceMcp (
211
+ UrnArray inputUrnArray , UrnArray outputUrnArray , List <MetadataChangeProposal > mcps ) {
204
212
DataProcessInstanceInput dataProcessInstanceInput = new DataProcessInstanceInput ();
205
213
dataProcessInstanceInput .setInputs (inputUrnArray );
206
214
207
215
DataProcessInstanceOutput dataProcessInstanceOutput = new DataProcessInstanceOutput ();
208
216
dataProcessInstanceOutput .setOutputs (outputUrnArray );
209
217
210
- addAspectToMcps (dataProcessInstanceUrn , DATA_PROCESS_INSTANCE_ENTITY_TYPE , dataProcessInstanceInput , mcps );
211
- addAspectToMcps (dataProcessInstanceUrn , DATA_PROCESS_INSTANCE_ENTITY_TYPE , dataProcessInstanceOutput , mcps );
218
+ addAspectToMcps (
219
+ dataProcessInstanceUrn , DATA_PROCESS_INSTANCE_ENTITY_TYPE , dataProcessInstanceInput , mcps );
220
+ addAspectToMcps (
221
+ dataProcessInstanceUrn , DATA_PROCESS_INSTANCE_ENTITY_TYPE , dataProcessInstanceOutput , mcps );
212
222
213
223
if (dataProcessInstanceProperties != null ) {
214
224
log .info ("Adding dataProcessInstanceProperties to {}" , jobUrn );
215
- addAspectToMcps (dataProcessInstanceUrn , DATA_PROCESS_INSTANCE_ENTITY_TYPE , dataProcessInstanceProperties , mcps );
225
+ addAspectToMcps (
226
+ dataProcessInstanceUrn ,
227
+ DATA_PROCESS_INSTANCE_ENTITY_TYPE ,
228
+ dataProcessInstanceProperties ,
229
+ mcps );
216
230
}
217
231
218
232
generateDataProcessInstanceRunEvent (mcps );
219
233
generateDataProcessInstanceRelationship (mcps );
220
234
}
221
235
222
- private Pair <UrnArray , EdgeArray > processDownstreams (DatahubOpenlineageConfig config , List <MetadataChangeProposal > mcps ) {
236
+ private Pair <UrnArray , EdgeArray > processDownstreams (
237
+ DatahubOpenlineageConfig config , List <MetadataChangeProposal > mcps ) {
223
238
UrnArray outputUrnArray = new UrnArray ();
224
239
EdgeArray outputEdges = new EdgeArray ();
225
240
@@ -242,40 +257,53 @@ private Pair<UrnArray, EdgeArray> processDownstreams(DatahubOpenlineageConfig co
242
257
outputEdges .add (edge );
243
258
244
259
if ((dataset .getSchemaMetadata () != null ) && (config .isIncludeSchemaMetadata ())) {
245
- addAspectToMcps (dataset .getUrn (), DATASET_ENTITY_TYPE , dataset .getSchemaMetadata (), mcps );
260
+ addAspectToMcps (
261
+ dataset .getUrn (), DATASET_ENTITY_TYPE , dataset .getSchemaMetadata (), mcps );
246
262
}
247
263
248
264
if (dataset .getLineage () != null ) {
249
265
if (config .isUsePatch ()) {
250
- UpstreamLineagePatchBuilder upstreamLineagePatchBuilder = new UpstreamLineagePatchBuilder ().urn (dataset .getUrn ());
266
+ UpstreamLineagePatchBuilder upstreamLineagePatchBuilder =
267
+ new UpstreamLineagePatchBuilder ().urn (dataset .getUrn ());
251
268
for (Upstream upstream : dataset .getLineage ().getUpstreams ()) {
252
269
upstreamLineagePatchBuilder .addUpstream (upstream .getDataset (), upstream .getType ());
253
270
}
254
271
255
272
log .info ("Adding FineGrainedLineage to {}" , dataset .getUrn ());
256
- for (FineGrainedLineage fineGrainedLineage : Objects . requireNonNull (
257
- dataset .getLineage ().getFineGrainedLineages ())) {
273
+ for (FineGrainedLineage fineGrainedLineage :
274
+ Objects . requireNonNull ( dataset .getLineage ().getFineGrainedLineages ())) {
258
275
for (Urn upstream : Objects .requireNonNull (fineGrainedLineage .getUpstreams ())) {
259
276
upstreamLineagePatchBuilder .addFineGrainedUpstreamField (
260
- upstream , fineGrainedLineage .getConfidenceScore (), StringUtils .defaultIfEmpty (fineGrainedLineage .getTransformOperation (), "TRANSFORM" ), fineGrainedLineage .getUpstreamType ());
277
+ upstream ,
278
+ fineGrainedLineage .getConfidenceScore (),
279
+ StringUtils .defaultIfEmpty (
280
+ fineGrainedLineage .getTransformOperation (), "TRANSFORM" ),
281
+ fineGrainedLineage .getUpstreamType ());
261
282
}
262
283
for (Urn downstream : Objects .requireNonNull (fineGrainedLineage .getDownstreams ())) {
263
284
upstreamLineagePatchBuilder .addFineGrainedDownstreamField (
264
- downstream , fineGrainedLineage .getConfidenceScore (), StringUtils .defaultIfEmpty (fineGrainedLineage .getTransformOperation (), "TRANSFORM" ), fineGrainedLineage .getDownstreamType ());
285
+ downstream ,
286
+ fineGrainedLineage .getConfidenceScore (),
287
+ StringUtils .defaultIfEmpty (
288
+ fineGrainedLineage .getTransformOperation (), "TRANSFORM" ),
289
+ fineGrainedLineage .getDownstreamType ());
265
290
}
266
291
}
267
292
MetadataChangeProposal mcp = upstreamLineagePatchBuilder .build ();
268
- log .info ("upstreamLineagePatch: {}" , mcp .getAspect ().getValue ().asString (Charset .defaultCharset ()));
293
+ log .info (
294
+ "upstreamLineagePatch: {}" ,
295
+ mcp .getAspect ().getValue ().asString (Charset .defaultCharset ()));
269
296
mcps .add (mcp );
270
- }else {
297
+ } else {
271
298
addAspectToMcps (dataset .getUrn (), DATASET_ENTITY_TYPE , dataset .getLineage (), mcps );
272
299
}
273
300
}
274
301
});
275
302
return Pair .of (outputUrnArray , outputEdges );
276
303
}
277
304
278
- private Pair <UrnArray , EdgeArray > processUpstreams (DatahubOpenlineageConfig config , List <MetadataChangeProposal > mcps ) {
305
+ private Pair <UrnArray , EdgeArray > processUpstreams (
306
+ DatahubOpenlineageConfig config , List <MetadataChangeProposal > mcps ) {
279
307
UrnArray inputUrnArray = new UrnArray ();
280
308
EdgeArray inputEdges = new EdgeArray ();
281
309
@@ -298,7 +326,8 @@ private Pair<UrnArray, EdgeArray> processUpstreams(DatahubOpenlineageConfig conf
298
326
}
299
327
300
328
if (dataset .getSchemaMetadata () != null && config .isIncludeSchemaMetadata ()) {
301
- addAspectToMcps (dataset .getUrn (), DATASET_ENTITY_TYPE , dataset .getSchemaMetadata (), mcps );
329
+ addAspectToMcps (
330
+ dataset .getUrn (), DATASET_ENTITY_TYPE , dataset .getSchemaMetadata (), mcps );
302
331
}
303
332
304
333
if (dataset .getLineage () != null ) {
@@ -308,11 +337,16 @@ private Pair<UrnArray, EdgeArray> processUpstreams(DatahubOpenlineageConfig conf
308
337
return Pair .of (inputUrnArray , inputEdges );
309
338
}
310
339
311
- private void generateFlowDomainsAspect (List <MetadataChangeProposal > mcps , StringMap customProperties ) {
340
+ private void generateFlowDomainsAspect (
341
+ List <MetadataChangeProposal > mcps , StringMap customProperties ) {
312
342
if (flowDomains != null ) {
313
343
MetadataChangeProposalWrapper domains =
314
344
MetadataChangeProposalWrapper .create (
315
- b -> b .entityType (DATAFLOW_ENTITY_TYPE ).entityUrn (flowUrn ).upsert ().aspect (flowDomains ));
345
+ b ->
346
+ b .entityType (DATAFLOW_ENTITY_TYPE )
347
+ .entityUrn (flowUrn )
348
+ .upsert ()
349
+ .aspect (flowDomains ));
316
350
try {
317
351
mcps .add (eventFormatter .convert (domains ));
318
352
} catch (IOException e ) {
@@ -321,7 +355,11 @@ private void generateFlowDomainsAspect(List<MetadataChangeProposal> mcps, String
321
355
}
322
356
}
323
357
324
- private void generateFlowGlobalTagsAspect (Urn flowUrn , GlobalTags flowGlobalTags , DatahubOpenlineageConfig config , List <MetadataChangeProposal > mcps ) {
358
+ private void generateFlowGlobalTagsAspect (
359
+ Urn flowUrn ,
360
+ GlobalTags flowGlobalTags ,
361
+ DatahubOpenlineageConfig config ,
362
+ List <MetadataChangeProposal > mcps ) {
325
363
if (flowGlobalTags != null ) {
326
364
if (config .isUsePatch ()) {
327
365
GlobalTagsPatchBuilder globalTagsPatchBuilder = new GlobalTagsPatchBuilder ().urn (flowUrn );
@@ -341,9 +379,11 @@ private void generateStatus(Urn entityUrn, String entityType, List<MetadataChang
341
379
addAspectToMcps (entityUrn , entityType , statusInfo , mcps );
342
380
}
343
381
344
- private void addAspectToMcps (Urn entityUrn , String entityType , DataTemplate aspect , List <MetadataChangeProposal > mcps ) {
345
- MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper .create (
346
- b -> b .entityType (entityType ).entityUrn (entityUrn ).upsert ().aspect (aspect ));
382
+ private void addAspectToMcps (
383
+ Urn entityUrn , String entityType , DataTemplate aspect , List <MetadataChangeProposal > mcps ) {
384
+ MetadataChangeProposalWrapper mcpw =
385
+ MetadataChangeProposalWrapper .create (
386
+ b -> b .entityType (entityType ).entityUrn (entityUrn ).upsert ().aspect (aspect ));
347
387
try {
348
388
mcps .add (eventFormatter .convert (mcpw ));
349
389
} catch (IOException e ) {
@@ -355,13 +395,14 @@ private void generateDataProcessInstanceRelationship(List<MetadataChangeProposal
355
395
if (dataProcessInstanceRelationships != null ) {
356
396
log .info ("Adding dataProcessInstanceRelationships to {}" , jobUrn );
357
397
try {
358
- mcps .add (eventFormatter .convert (
359
- MetadataChangeProposalWrapper .create (
360
- b ->
361
- b .entityType (DATA_PROCESS_INSTANCE_ENTITY_TYPE )
362
- .entityUrn (dataProcessInstanceUrn )
363
- .upsert ()
364
- .aspect (dataProcessInstanceRelationships ))));
398
+ mcps .add (
399
+ eventFormatter .convert (
400
+ MetadataChangeProposalWrapper .create (
401
+ b ->
402
+ b .entityType (DATA_PROCESS_INSTANCE_ENTITY_TYPE )
403
+ .entityUrn (dataProcessInstanceUrn )
404
+ .upsert ()
405
+ .aspect (dataProcessInstanceRelationships ))));
365
406
} catch (IOException e ) {
366
407
throw new RuntimeException (e );
367
408
}
@@ -372,13 +413,14 @@ private void generateDataProcessInstanceRunEvent(List<MetadataChangeProposal> mc
372
413
if (dataProcessInstanceRunEvent != null ) {
373
414
log .info ("Adding dataProcessInstanceRunEvent to {}" , jobUrn );
374
415
try {
375
- mcps .add (eventFormatter .convert (
376
- MetadataChangeProposalWrapper .create (
377
- b ->
378
- b .entityType (DATA_PROCESS_INSTANCE_ENTITY_TYPE )
379
- .entityUrn (dataProcessInstanceUrn )
380
- .upsert ()
381
- .aspect (dataProcessInstanceRunEvent ))));
416
+ mcps .add (
417
+ eventFormatter .convert (
418
+ MetadataChangeProposalWrapper .create (
419
+ b ->
420
+ b .entityType (DATA_PROCESS_INSTANCE_ENTITY_TYPE )
421
+ .entityUrn (dataProcessInstanceUrn )
422
+ .upsert ()
423
+ .aspect (dataProcessInstanceRunEvent ))));
382
424
} catch (IOException e ) {
383
425
throw new RuntimeException (e );
384
426
}
0 commit comments