3
3
import static com .linkedin .metadata .Constants .DEFAULT_OWNERSHIP_TYPE_URN ;
4
4
import static com .linkedin .metadata .Constants .OWNERSHIP_ASPECT_NAME ;
5
5
6
- import com .linkedin .common .AuditStamp ;
7
6
import com .linkedin .common .Owner ;
8
7
import com .linkedin .common .Ownership ;
9
8
import com .linkedin .common .UrnArray ;
10
9
import com .linkedin .common .UrnArrayMap ;
11
10
import com .linkedin .common .urn .Urn ;
12
11
import com .linkedin .data .template .RecordTemplate ;
13
- import com .linkedin .events .metadata .ChangeType ;
12
+ import com .linkedin .metadata .aspect .AspectRetriever ;
13
+ import com .linkedin .metadata .aspect .batch .ChangeMCP ;
14
14
import com .linkedin .metadata .aspect .plugins .config .AspectPluginConfig ;
15
15
import com .linkedin .metadata .aspect .plugins .hooks .MutationHook ;
16
- import com .linkedin .metadata .aspect .plugins .validation .AspectRetriever ;
17
- import com .linkedin .metadata .models .AspectSpec ;
18
- import com .linkedin .metadata .models .EntitySpec ;
19
- import com .linkedin .mxe .SystemMetadata ;
20
16
import com .linkedin .util .Pair ;
17
+ import java .util .Collection ;
18
+ import java .util .Collections ;
19
+ import java .util .LinkedList ;
20
+ import java .util .List ;
21
21
import java .util .Map ;
22
22
import java .util .Set ;
23
23
import java .util .stream .Collectors ;
24
+ import java .util .stream .Stream ;
24
25
import javax .annotation .Nonnull ;
25
26
import javax .annotation .Nullable ;
26
27
@@ -31,42 +32,70 @@ public OwnerTypeMap(AspectPluginConfig aspectPluginConfig) {
31
32
}
32
33
33
34
@ Override
34
- protected void mutate (
35
- @ Nonnull ChangeType changeType ,
36
- @ Nonnull EntitySpec entitySpec ,
37
- @ Nonnull AspectSpec aspectSpec ,
38
- @ Nullable RecordTemplate oldAspectValue ,
39
- @ Nullable RecordTemplate newAspectValue ,
40
- @ Nullable SystemMetadata oldSystemMetadata ,
41
- @ Nullable SystemMetadata newSystemMetadata ,
42
- @ Nonnull AuditStamp auditStamp ,
43
- @ Nonnull AspectRetriever aspectRetriever ) {
44
- if (OWNERSHIP_ASPECT_NAME .equals (aspectSpec .getName ()) && newAspectValue != null ) {
45
- Ownership ownership = new Ownership (newAspectValue .data ());
46
- if (!ownership .getOwners ().isEmpty ()) {
35
+ protected Stream <Pair <ChangeMCP , Boolean >> writeMutation (
36
+ @ Nonnull Collection <ChangeMCP > changeMCPS , @ Nonnull AspectRetriever aspectRetriever ) {
37
+
38
+ List <Pair <ChangeMCP , Boolean >> results = new LinkedList <>();
39
+
40
+ for (ChangeMCP item : changeMCPS ) {
41
+ if (OWNERSHIP_ASPECT_NAME .equals (item .getAspectName ()) && item .getRecordTemplate () != null ) {
42
+ final Map <Urn , Set <Owner >> oldOwnerTypes = groupByOwner (item .getPreviousRecordTemplate ());
43
+ final Map <Urn , Set <Owner >> newOwnerTypes = groupByOwner (item .getRecordTemplate ());
44
+
45
+ Set <Urn > removed =
46
+ oldOwnerTypes .keySet ().stream ()
47
+ .filter (owner -> !newOwnerTypes .containsKey (owner ))
48
+ .collect (Collectors .toSet ());
49
+
50
+ Set <Urn > updated = newOwnerTypes .keySet ();
51
+
52
+ Map <String , UrnArray > ownerTypes =
53
+ Stream .concat (removed .stream (), updated .stream ())
54
+ .map (
55
+ ownerUrn -> {
56
+ final String ownerFieldName = encodeFieldName (ownerUrn .toString ());
57
+ if (removed .contains (ownerUrn )) {
58
+ // removed
59
+ return Pair .of (ownerFieldName , new UrnArray ());
60
+ }
61
+ // updated
62
+ return Pair .of (
63
+ ownerFieldName ,
64
+ new UrnArray (
65
+ newOwnerTypes .getOrDefault (ownerUrn , Collections .emptySet ()).stream ()
66
+ .map (
67
+ owner ->
68
+ owner .getTypeUrn () != null
69
+ ? owner .getTypeUrn ()
70
+ : DEFAULT_OWNERSHIP_TYPE_URN )
71
+ .collect (Collectors .toSet ())));
72
+ })
73
+ .collect (Collectors .toMap (Pair ::getFirst , Pair ::getSecond ));
47
74
48
- Map <Urn , Set <Owner >> ownerTypes =
49
- ownership .getOwners ().stream ()
50
- .collect (Collectors .groupingBy (Owner ::getOwner , Collectors .toSet ()));
75
+ if (!ownerTypes .isEmpty ()) {
76
+ item .getAspect (Ownership .class ).setOwnerTypes (new UrnArrayMap (ownerTypes ));
77
+ results .add (Pair .of (item , true ));
78
+ continue ;
79
+ }
80
+ }
81
+
82
+ // no op
83
+ results .add (Pair .of (item , false ));
84
+ }
51
85
52
- ownership .setOwnerTypes (
53
- new UrnArrayMap (
54
- ownerTypes .entrySet ().stream ()
55
- .map (
56
- entry ->
57
- Pair .of (
58
- encodeFieldName (entry .getKey ().toString ()),
59
- new UrnArray (
60
- entry .getValue ().stream ()
61
- .map (
62
- owner ->
63
- owner .getTypeUrn () != null
64
- ? owner .getTypeUrn ()
65
- : DEFAULT_OWNERSHIP_TYPE_URN )
66
- .collect (Collectors .toSet ()))))
67
- .collect (Collectors .toMap (Pair ::getKey , Pair ::getValue ))));
86
+ return results .stream ();
87
+ }
88
+
89
+ private static Map <Urn , Set <Owner >> groupByOwner (
90
+ @ Nullable RecordTemplate ownershipRecordTemplate ) {
91
+ if (ownershipRecordTemplate != null ) {
92
+ Ownership ownership = new Ownership (ownershipRecordTemplate .data ());
93
+ if (!ownership .getOwners ().isEmpty ()) {
94
+ return ownership .getOwners ().stream ()
95
+ .collect (Collectors .groupingBy (Owner ::getOwner , Collectors .toSet ()));
68
96
}
69
97
}
98
+ return Collections .emptyMap ();
70
99
}
71
100
72
101
public static String encodeFieldName (String value ) {
0 commit comments