|
1 | 1 | package com.linkedin.metadata.entity.ebean.batch;
|
2 | 2 |
|
3 | 3 | import com.linkedin.common.AuditStamp;
|
| 4 | +import com.linkedin.common.urn.Urn; |
4 | 5 | import com.linkedin.data.template.RecordTemplate;
|
5 | 6 | import com.linkedin.events.metadata.ChangeType;
|
6 | 7 | import com.linkedin.metadata.aspect.AspectRetriever;
|
|
15 | 16 | import com.linkedin.metadata.models.EntitySpec;
|
16 | 17 | import com.linkedin.mxe.MetadataChangeProposal;
|
17 | 18 | import com.linkedin.util.Pair;
|
| 19 | +import java.util.ArrayList; |
18 | 20 | import java.util.Collection;
|
| 21 | +import java.util.HashMap; |
19 | 22 | import java.util.LinkedList;
|
20 | 23 | import java.util.List;
|
21 | 24 | import java.util.Map;
|
|
29 | 32 | import lombok.extern.slf4j.Slf4j;
|
30 | 33 |
|
31 | 34 | @Slf4j
|
32 |
| -@Getter |
33 | 35 | @Builder(toBuilder = true)
|
34 | 36 | public class AspectsBatchImpl implements AspectsBatch {
|
35 | 37 |
|
36 | 38 | @Nonnull private final Collection<? extends BatchItem> items;
|
37 |
| - @Nonnull private final RetrieverContext retrieverContext; |
| 39 | + @Nonnull private final Collection<? extends BatchItem> nonRepeatedItems; |
| 40 | + @Getter @Nonnull private final RetrieverContext retrieverContext; |
| 41 | + |
| 42 | + @Override |
| 43 | + @Nonnull |
| 44 | + public Collection<? extends BatchItem> getItems() { |
| 45 | + return nonRepeatedItems; |
| 46 | + } |
| 47 | + |
| 48 | + @Override |
| 49 | + public Collection<? extends BatchItem> getInitialItems() { |
| 50 | + return items; |
| 51 | + } |
38 | 52 |
|
39 | 53 | /**
|
40 | 54 | * Convert patches to upserts, apply hooks at the aspect and batch level.
|
@@ -207,14 +221,32 @@ public AspectsBatchImplBuilder mcps(
|
207 | 221 | return this;
|
208 | 222 | }
|
209 | 223 |
|
| 224 | + private static <T extends BatchItem> List<T> filterRepeats(Collection<T> items) { |
| 225 | + List<T> result = new ArrayList<>(); |
| 226 | + Map<Pair<Urn, String>, T> last = new HashMap<>(); |
| 227 | + |
| 228 | + for (T item : items) { |
| 229 | + Pair<Urn, String> urnAspect = Pair.of(item.getUrn(), item.getAspectName()); |
| 230 | + // Check if this item is a duplicate of the previous |
| 231 | + if (!last.containsKey(urnAspect) || !item.isDatabaseDuplicateOf(last.get(urnAspect))) { |
| 232 | + result.add(item); |
| 233 | + } |
| 234 | + last.put(urnAspect, item); |
| 235 | + } |
| 236 | + |
| 237 | + return result; |
| 238 | + } |
| 239 | + |
210 | 240 | public AspectsBatchImpl build() {
|
| 241 | + this.nonRepeatedItems = filterRepeats(this.items); |
| 242 | + |
211 | 243 | ValidationExceptionCollection exceptions =
|
212 |
| - AspectsBatch.validateProposed(this.items, this.retrieverContext); |
| 244 | + AspectsBatch.validateProposed(this.nonRepeatedItems, this.retrieverContext); |
213 | 245 | if (!exceptions.isEmpty()) {
|
214 | 246 | throw new IllegalArgumentException("Failed to validate MCP due to: " + exceptions);
|
215 | 247 | }
|
216 | 248 |
|
217 |
| - return new AspectsBatchImpl(this.items, this.retrieverContext); |
| 249 | + return new AspectsBatchImpl(this.items, this.nonRepeatedItems, this.retrieverContext); |
218 | 250 | }
|
219 | 251 | }
|
220 | 252 |
|
|
0 commit comments