3
3
import static com .linkedin .metadata .Constants .DEFAULT_OWNERSHIP_TYPE_URN ;
4
4
import static com .linkedin .metadata .Constants .OWNERSHIP_ASPECT_NAME ;
5
5
6
- import com .linkedin .common .AuditStamp ;
7
6
import com .linkedin .common .Owner ;
8
7
import com .linkedin .common .Ownership ;
9
8
import com .linkedin .common .UrnArray ;
10
9
import com .linkedin .common .UrnArrayMap ;
11
10
import com .linkedin .common .urn .Urn ;
12
11
import com .linkedin .data .template .RecordTemplate ;
13
- import com .linkedin .events .metadata .ChangeType ;
12
+ import com .linkedin .metadata .aspect .AspectRetriever ;
13
+ import com .linkedin .metadata .aspect .batch .ChangeMCP ;
14
14
import com .linkedin .metadata .aspect .plugins .config .AspectPluginConfig ;
15
15
import com .linkedin .metadata .aspect .plugins .hooks .MutationHook ;
16
- import com .linkedin .metadata .aspect .plugins .validation .AspectRetriever ;
17
- import com .linkedin .metadata .models .AspectSpec ;
18
- import com .linkedin .metadata .models .EntitySpec ;
19
- import com .linkedin .mxe .SystemMetadata ;
20
16
import com .linkedin .util .Pair ;
17
+ import java .util .Collection ;
18
+ import java .util .Collections ;
19
+ import java .util .LinkedList ;
20
+ import java .util .List ;
21
21
import java .util .Map ;
22
22
import java .util .Set ;
23
23
import java .util .stream .Collectors ;
24
+ import java .util .stream .Stream ;
24
25
import javax .annotation .Nonnull ;
25
26
import javax .annotation .Nullable ;
26
27
@@ -31,42 +32,73 @@ public OwnerTypeMap(AspectPluginConfig aspectPluginConfig) {
31
32
}
32
33
33
34
@ Override
34
- protected void mutate (
35
- @ Nonnull ChangeType changeType ,
36
- @ Nonnull EntitySpec entitySpec ,
37
- @ Nonnull AspectSpec aspectSpec ,
38
- @ Nullable RecordTemplate oldAspectValue ,
39
- @ Nullable RecordTemplate newAspectValue ,
40
- @ Nullable SystemMetadata oldSystemMetadata ,
41
- @ Nullable SystemMetadata newSystemMetadata ,
42
- @ Nonnull AuditStamp auditStamp ,
43
- @ Nonnull AspectRetriever aspectRetriever ) {
44
- if (OWNERSHIP_ASPECT_NAME .equals (aspectSpec .getName ()) && newAspectValue != null ) {
45
- Ownership ownership = new Ownership (newAspectValue .data ());
46
- if (!ownership .getOwners ().isEmpty ()) {
35
+ protected Stream <Pair <ChangeMCP , Boolean >> writeMutation (
36
+ @ Nonnull Collection <ChangeMCP > changeMCPS , @ Nonnull AspectRetriever aspectRetriever ) {
37
+
38
+ List <Pair <ChangeMCP , Boolean >> results = new LinkedList <>();
39
+
40
+ for (ChangeMCP item : changeMCPS ) {
41
+ if (OWNERSHIP_ASPECT_NAME .equals (item .getAspectName ()) && item .getRecordTemplate () != null ) {
42
+ final Map <Urn , Set <Owner >> oldTypeOwner =
43
+ groupByOwnerType (item .getPreviousRecordTemplate ());
44
+ final Map <Urn , Set <Owner >> newTypeOwner = groupByOwnerType (item .getRecordTemplate ());
45
+
46
+ Set <Urn > removedTypes =
47
+ oldTypeOwner .keySet ().stream ()
48
+ .filter (typeUrn -> !newTypeOwner .containsKey (typeUrn ))
49
+ .collect (Collectors .toSet ());
50
+
51
+ Set <Urn > updatedTypes = newTypeOwner .keySet ();
52
+
53
+ Map <String , UrnArray > typeOwners =
54
+ Stream .concat (removedTypes .stream (), updatedTypes .stream ())
55
+ .map (
56
+ typeUrn -> {
57
+ final String typeFieldName = encodeFieldName (typeUrn .toString ());
58
+ if (removedTypes .contains (typeUrn )) {
59
+ // removed
60
+ return Pair .of (typeFieldName , new UrnArray ());
61
+ }
62
+ // updated
63
+ return Pair .of (
64
+ typeFieldName ,
65
+ new UrnArray (
66
+ newTypeOwner .getOrDefault (typeUrn , Collections .emptySet ()).stream ()
67
+ .map (Owner ::getOwner )
68
+ .collect (Collectors .toSet ())));
69
+ })
70
+ .collect (Collectors .toMap (Pair ::getFirst , Pair ::getSecond ));
47
71
48
- Map <Urn , Set <Owner >> ownerTypes =
49
- ownership .getOwners ().stream ()
50
- .collect (Collectors .groupingBy (Owner ::getOwner , Collectors .toSet ()));
72
+ if (!typeOwners .isEmpty ()) {
73
+ item .getAspect (Ownership .class ).setOwnerTypes (new UrnArrayMap (typeOwners ));
74
+ results .add (Pair .of (item , true ));
75
+ continue ;
76
+ }
77
+ }
78
+
79
+ // no op
80
+ results .add (Pair .of (item , false ));
81
+ }
51
82
52
- ownership . setOwnerTypes (
53
- new UrnArrayMap (
54
- ownerTypes . entrySet (). stream ()
55
- . map (
56
- entry ->
57
- Pair . of (
58
- encodeFieldName ( entry . getKey (). toString ()),
59
- new UrnArray (
60
- entry . getValue ().stream ()
61
- . map (
62
- owner ->
63
- owner . getTypeUrn () != null
64
- ? owner .getTypeUrn ()
65
- : DEFAULT_OWNERSHIP_TYPE_URN )
66
- . collect ( Collectors . toSet ()))))
67
- . collect ( Collectors .toMap ( Pair :: getKey , Pair :: getValue ) )));
83
+ return results . stream ();
84
+ }
85
+
86
+ private static Map < Urn , Set < Owner >> groupByOwnerType (
87
+ @ Nullable RecordTemplate ownershipRecordTemplate ) {
88
+ if ( ownershipRecordTemplate != null ) {
89
+ Ownership ownership = new Ownership ( ownershipRecordTemplate . data ());
90
+ if (! ownership . getOwners (). isEmpty ()) {
91
+ return ownership . getOwners ().stream ()
92
+ . collect (
93
+ Collectors . groupingBy (
94
+ owner ->
95
+ owner .getTypeUrn () != null
96
+ ? owner . getTypeUrn ( )
97
+ : DEFAULT_OWNERSHIP_TYPE_URN ,
98
+ Collectors .toSet ( )));
68
99
}
69
100
}
101
+ return Collections .emptyMap ();
70
102
}
71
103
72
104
public static String encodeFieldName (String value ) {
0 commit comments