Skip to content

Commit 5576b3c

Browse files
authored
feat(versioning): Support entity versioning ingestion (#12755)
1 parent 17de393 commit 5576b3c

File tree

14 files changed

+1011
-145
lines changed

14 files changed

+1011
-145
lines changed

datahub-web-react/src/app/ingest/secret/SecretsList.tsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ export const SecretsList = () => {
174174
);
175175
setTimeout(() => {
176176
refetch();
177-
}, 2000);
177+
}, 3000);
178178
})
179179
.catch((e) => {
180180
message.destroy();

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -968,8 +968,9 @@ private IngestAspectsResult ingestAspectsToLocalDB(
968968
// lock)
969969

970970
// Initial database state from database
971-
Map<String, Map<String, SystemAspect>> batchAspects =
971+
final Map<String, Map<String, SystemAspect>> batchAspects =
972972
aspectDao.getLatestAspects(opContext, urnAspects, true);
973+
final Map<String, Map<String, SystemAspect>> updatedLatestAspects;
973974

974975
// read #2 (potentially)
975976
final Map<String, Map<String, Long>> nextVersions =
@@ -989,7 +990,6 @@ private IngestAspectsResult ingestAspectsToLocalDB(
989990
// These items are new items from side effects
990991
Map<String, Set<String>> sideEffects = updatedItems.getFirst();
991992

992-
final Map<String, Map<String, SystemAspect>> updatedLatestAspects;
993993
final Map<String, Map<String, Long>> updatedNextVersions;
994994

995995
Map<String, Map<String, SystemAspect>> newLatestAspects =
@@ -1024,6 +1024,7 @@ private IngestAspectsResult ingestAspectsToLocalDB(
10241024
.collect(Collectors.toList());
10251025
} else {
10261026
changeMCPs = updatedItems.getSecond();
1027+
updatedLatestAspects = batchAspects;
10271028
}
10281029

10291030
// No changes, return
@@ -1080,7 +1081,7 @@ private IngestAspectsResult ingestAspectsToLocalDB(
10801081
Latest aspect after possible in-memory mutation
10811082
*/
10821083
final SystemAspect latestAspect =
1083-
batchAspects
1084+
updatedLatestAspects
10841085
.getOrDefault(writeItem.getUrn().toString(), Map.of())
10851086
.get(writeItem.getAspectName());
10861087

@@ -1145,8 +1146,9 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
11451146
// Only consider retention when there was a previous version
11461147
.filter(
11471148
result ->
1148-
batchAspects.containsKey(result.getUrn().toString())
1149-
&& batchAspects
1149+
updatedLatestAspects.containsKey(
1150+
result.getUrn().toString())
1151+
&& updatedLatestAspects
11501152
.get(result.getUrn().toString())
11511153
.containsKey(
11521154
result.getRequest().getAspectName()))

metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/EntityVersioningServiceImpl.java

+12-49
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,8 @@ public List<IngestResult> linkLatestVersion(
7676
Urn versionSet,
7777
Urn newLatestVersion,
7878
VersionPropertiesInput inputProperties) {
79-
List<IngestResult> ingestResults = new ArrayList<>();
8079
AspectRetriever aspectRetriever = opContext.getAspectRetriever();
8180
String sortId;
82-
Long versionSetConstraint;
8381
Long versionPropertiesConstraint = -1L;
8482
VersionSetKey versionSetKey =
8583
(VersionSetKey)
@@ -93,36 +91,26 @@ public List<IngestResult> linkLatestVersion(
9391
+ newLatestVersion.getEntityType());
9492
}
9593
if (!aspectRetriever.entityExists(ImmutableSet.of(versionSet)).get(versionSet)) {
96-
MetadataChangeProposal versionSetKeyProposal = new MetadataChangeProposal();
97-
versionSetKeyProposal.setEntityUrn(versionSet);
98-
versionSetKeyProposal.setEntityType(VERSION_SET_ENTITY_NAME);
99-
versionSetKeyProposal.setAspectName(VERSION_SET_KEY_ASPECT_NAME);
100-
versionSetKeyProposal.setAspect(GenericRecordUtils.serializeAspect(versionSetKey));
101-
versionSetKeyProposal.setChangeType(ChangeType.CREATE_ENTITY);
102-
ingestResults.add(
103-
entityService.ingestProposal(
104-
opContext, versionSetKeyProposal, opContext.getAuditStamp(), false));
105-
10694
sortId = INITIAL_VERSION_SORT_ID;
107-
versionSetConstraint = -1L;
10895
} else {
10996
SystemAspect versionSetPropertiesAspect =
11097
aspectRetriever.getLatestSystemAspect(versionSet, VERSION_SET_PROPERTIES_ASPECT_NAME);
11198
VersionSetProperties versionSetProperties =
11299
RecordUtils.toRecordTemplate(
113100
VersionSetProperties.class, versionSetPropertiesAspect.getRecordTemplate().data());
114-
versionSetConstraint =
115-
versionSetPropertiesAspect
116-
.getSystemMetadataVersion()
117-
.orElse(versionSetPropertiesAspect.getVersion());
101+
102+
if (versionSetProperties.getVersioningScheme()
103+
!= VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB) {
104+
throw new IllegalArgumentException(
105+
"Only versioning scheme supported is ALPHANUMERIC_GENERATED_BY_DATAHUB");
106+
}
107+
118108
SystemAspect latestVersion =
119109
aspectRetriever.getLatestSystemAspect(
120110
versionSetProperties.getLatest(), VERSION_PROPERTIES_ASPECT_NAME);
121111
VersionProperties latestVersionProperties =
122112
RecordUtils.toRecordTemplate(
123113
VersionProperties.class, latestVersion.getRecordTemplate().data());
124-
// When more impls for versioning scheme are set up, this will need to be resolved to the
125-
// correct scheme generation strategy
126114
sortId = AlphanumericSortIdGenerator.increment(latestVersionProperties.getSortId());
127115
}
128116

@@ -154,9 +142,9 @@ public List<IngestResult> linkLatestVersion(
154142
.setComment(inputProperties.getComment(), SetMode.IGNORE_NULL)
155143
.setVersion(versionTag)
156144
.setMetadataCreatedTimestamp(opContext.getAuditStamp())
157-
.setSortId(sortId);
145+
.setSortId(sortId)
146+
.setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB);
158147
if (inputProperties.getSourceCreationTimestamp() != null) {
159-
160148
AuditStamp sourceCreatedAuditStamp =
161149
new AuditStamp().setTime(inputProperties.getSourceCreationTimestamp());
162150
Urn actor = null;
@@ -182,36 +170,11 @@ public List<IngestResult> linkLatestVersion(
182170
headerMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionPropertiesConstraint.toString());
183171
versionPropertiesProposal.setChangeType(ChangeType.UPSERT);
184172
versionPropertiesProposal.setHeaders(headerMap);
185-
ingestResults.add(
186-
entityService.ingestProposal(
187-
opContext, versionPropertiesProposal, opContext.getAuditStamp(), false));
188173

189-
// Might want to refactor this to a Patch w/ Create if not exists logic if more properties get
190-
// added
191-
// to Version Set Properties
192-
VersionSetProperties versionSetProperties =
193-
new VersionSetProperties()
194-
.setVersioningScheme(
195-
VersioningScheme
196-
.ALPHANUMERIC_GENERATED_BY_DATAHUB) // Only one available, will need to add to
197-
// input properties once more are added.
198-
.setLatest(newLatestVersion);
199-
MetadataChangeProposal versionSetPropertiesProposal = new MetadataChangeProposal();
200-
versionSetPropertiesProposal.setEntityUrn(versionSet);
201-
versionSetPropertiesProposal.setEntityType(VERSION_SET_ENTITY_NAME);
202-
versionSetPropertiesProposal.setAspectName(VERSION_SET_PROPERTIES_ASPECT_NAME);
203-
versionSetPropertiesProposal.setAspect(
204-
GenericRecordUtils.serializeAspect(versionSetProperties));
205-
versionSetPropertiesProposal.setChangeType(ChangeType.UPSERT);
206-
StringMap versionSetHeaderMap = new StringMap();
207-
versionSetHeaderMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionSetConstraint.toString());
208-
versionSetPropertiesProposal.setHeaders(versionSetHeaderMap);
209-
versionSetPropertiesProposal.setSystemMetadata(systemMetadata);
210-
ingestResults.add(
174+
IngestResult result =
211175
entityService.ingestProposal(
212-
opContext, versionSetPropertiesProposal, opContext.getAuditStamp(), false));
213-
214-
return ingestResults;
176+
opContext, versionPropertiesProposal, opContext.getAuditStamp(), false);
177+
return result != null ? List.of(result) : List.of();
215178
}
216179

217180
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package com.linkedin.metadata.entity.versioning.sideeffects;
2+
3+
import static com.linkedin.metadata.Constants.*;
4+
5+
import com.datahub.util.RecordUtils;
6+
import com.linkedin.common.VersionProperties;
7+
import com.linkedin.common.urn.Urn;
8+
import com.linkedin.data.template.RecordTemplate;
9+
import com.linkedin.entity.Aspect;
10+
import com.linkedin.events.metadata.ChangeType;
11+
import com.linkedin.metadata.aspect.RetrieverContext;
12+
import com.linkedin.metadata.aspect.batch.ChangeMCP;
13+
import com.linkedin.metadata.aspect.batch.MCLItem;
14+
import com.linkedin.metadata.aspect.batch.MCPItem;
15+
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
16+
import com.linkedin.metadata.aspect.patch.PatchOperationType;
17+
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
18+
import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect;
19+
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
20+
import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl;
21+
import com.linkedin.metadata.models.AspectSpec;
22+
import com.linkedin.metadata.models.EntitySpec;
23+
import com.linkedin.metadata.utils.EntityKeyUtils;
24+
import com.linkedin.versionset.VersionSetProperties;
25+
import java.util.Collection;
26+
import java.util.List;
27+
import java.util.stream.Stream;
28+
import javax.annotation.Nonnull;
29+
import javax.annotation.Nullable;
30+
import lombok.Getter;
31+
import lombok.Setter;
32+
import lombok.experimental.Accessors;
33+
import lombok.extern.slf4j.Slf4j;
34+
35+
/**
36+
* Side effect that updates the isLatest property for the referenced versioned entity's Version
37+
* Properties aspect.
38+
*/
39+
@Slf4j
40+
@Getter
41+
@Setter
42+
@Accessors(chain = true)
43+
public class VersionPropertiesSideEffect extends MCPSideEffect {
44+
@Nonnull private AspectPluginConfig config;
45+
46+
@Override
47+
protected Stream<ChangeMCP> applyMCPSideEffect(
48+
Collection<ChangeMCP> changeMCPS, @Nonnull RetrieverContext retrieverContext) {
49+
return changeMCPS.stream().flatMap(item -> processMCP(item, retrieverContext));
50+
}
51+
52+
@Override
53+
protected Stream<MCPItem> postMCPSideEffect(
54+
Collection<MCLItem> mclItems, @Nonnull RetrieverContext retrieverContext) {
55+
return Stream.of();
56+
}
57+
58+
private static Stream<ChangeMCP> processMCP(
59+
ChangeMCP changeMCP, @Nonnull RetrieverContext retrieverContext) {
60+
Urn entityUrn = changeMCP.getUrn();
61+
62+
if (!VERSION_PROPERTIES_ASPECT_NAME.equals(changeMCP.getAspectName())) {
63+
return Stream.empty();
64+
}
65+
66+
VersionProperties versionProperties = changeMCP.getAspect(VersionProperties.class);
67+
if (versionProperties == null) {
68+
log.error("Unable to process version properties for urn: {}", changeMCP.getUrn());
69+
return Stream.empty();
70+
}
71+
72+
Urn versionSetUrn = versionProperties.getVersionSet();
73+
Aspect versionSetPropertiesAspect =
74+
retrieverContext
75+
.getAspectRetriever()
76+
.getLatestAspectObject(versionSetUrn, VERSION_SET_PROPERTIES_ASPECT_NAME);
77+
if (versionSetPropertiesAspect == null) {
78+
return createVersionSet(versionProperties, changeMCP, retrieverContext);
79+
}
80+
81+
// Version set exists -- only update if there is a new latest
82+
VersionSetProperties versionSetProperties =
83+
RecordUtils.toRecordTemplate(VersionSetProperties.class, versionSetPropertiesAspect.data());
84+
Urn prevLatest = versionSetProperties.getLatest();
85+
if (prevLatest.equals(entityUrn)) {
86+
return Stream.empty();
87+
}
88+
89+
VersionProperties prevLatestVersionProperties = null;
90+
Aspect prevLatestVersionPropertiesAspect =
91+
retrieverContext
92+
.getAspectRetriever()
93+
.getLatestAspectObject(prevLatest, VERSION_PROPERTIES_ASPECT_NAME);
94+
if (prevLatestVersionPropertiesAspect != null) {
95+
prevLatestVersionProperties =
96+
RecordUtils.toRecordTemplate(
97+
VersionProperties.class, prevLatestVersionPropertiesAspect.data());
98+
if (versionProperties.getSortId().compareTo(prevLatestVersionProperties.getSortId()) <= 0) {
99+
return Stream.empty();
100+
}
101+
}
102+
103+
// New version properties is the new latest
104+
return updateVersionSetLatest(
105+
versionProperties, prevLatestVersionProperties, prevLatest, changeMCP, retrieverContext);
106+
}
107+
108+
private static Stream<ChangeMCP> createVersionSet(
109+
@Nonnull VersionProperties versionProperties,
110+
ChangeMCP changeMCP,
111+
@Nonnull RetrieverContext retrieverContext) {
112+
versionProperties.setIsLatest(true);
113+
114+
Urn entityUrn = changeMCP.getUrn();
115+
Urn versionSetUrn = versionProperties.getVersionSet();
116+
117+
AspectSpec keyAspectSpec =
118+
retrieverContext
119+
.getAspectRetriever()
120+
.getEntityRegistry()
121+
.getEntitySpec(VERSION_SET_ENTITY_NAME)
122+
.getKeyAspectSpec();
123+
RecordTemplate versionSetKey =
124+
EntityKeyUtils.convertUrnToEntityKey(versionSetUrn, keyAspectSpec);
125+
ChangeMCP createVersionSetKey =
126+
ChangeItemImpl.builder()
127+
.urn(versionSetUrn)
128+
.aspectName(VERSION_SET_KEY_ASPECT_NAME)
129+
.changeType(ChangeType.UPSERT)
130+
.recordTemplate(versionSetKey)
131+
.auditStamp(changeMCP.getAuditStamp())
132+
.systemMetadata(changeMCP.getSystemMetadata())
133+
.build(retrieverContext.getAspectRetriever());
134+
135+
VersionSetProperties versionSetPropertiesWithNewLatest =
136+
new VersionSetProperties()
137+
.setVersioningScheme(versionProperties.getVersioningScheme())
138+
.setLatest(entityUrn);
139+
ChangeMCP createVersionSetProperties =
140+
ChangeItemImpl.builder()
141+
.urn(versionSetUrn)
142+
.aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME)
143+
.changeType(ChangeType.UPSERT)
144+
.recordTemplate(versionSetPropertiesWithNewLatest)
145+
.auditStamp(changeMCP.getAuditStamp())
146+
.systemMetadata(changeMCP.getSystemMetadata())
147+
.build(retrieverContext.getAspectRetriever());
148+
149+
return Stream.of(createVersionSetKey, createVersionSetProperties);
150+
}
151+
152+
private static Stream<ChangeMCP> updateVersionSetLatest(
153+
@Nonnull VersionProperties versionProperties,
154+
@Nullable VersionProperties prevLatestVersionProperties,
155+
@Nonnull Urn prevLatest,
156+
ChangeMCP changeMCP,
157+
@Nonnull RetrieverContext retrieverContext) {
158+
versionProperties.setIsLatest(true);
159+
160+
Urn entityUrn = changeMCP.getUrn();
161+
Urn versionSetUrn = versionProperties.getVersionSet();
162+
163+
VersionSetProperties versionSetPropertiesWithNewLatest =
164+
new VersionSetProperties()
165+
.setVersioningScheme(versionProperties.getVersioningScheme())
166+
.setLatest(entityUrn);
167+
ChangeMCP updateVersionSetProperties =
168+
ChangeItemImpl.builder()
169+
.urn(versionSetUrn)
170+
.aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME)
171+
.changeType(ChangeType.UPSERT)
172+
.recordTemplate(versionSetPropertiesWithNewLatest)
173+
.auditStamp(changeMCP.getAuditStamp())
174+
.systemMetadata(changeMCP.getSystemMetadata())
175+
.build(retrieverContext.getAspectRetriever());
176+
177+
if (prevLatestVersionProperties == null) {
178+
return Stream.of(updateVersionSetProperties);
179+
}
180+
181+
EntitySpec entitySpec =
182+
retrieverContext
183+
.getAspectRetriever()
184+
.getEntityRegistry()
185+
.getEntitySpec(prevLatest.getEntityType());
186+
GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp();
187+
patchOp.setOp(PatchOperationType.ADD.getValue());
188+
patchOp.setPath("/isLatest");
189+
patchOp.setValue(false);
190+
ChangeMCP updateOldLatestVersionProperties =
191+
PatchItemImpl.builder()
192+
.urn(prevLatest)
193+
.entitySpec(entitySpec)
194+
.aspectName(VERSION_PROPERTIES_ASPECT_NAME)
195+
.aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME))
196+
.patch(GenericJsonPatch.builder().patch(List.of(patchOp)).build().getJsonPatch())
197+
.auditStamp(changeMCP.getAuditStamp())
198+
.systemMetadata(changeMCP.getSystemMetadata())
199+
.build(retrieverContext.getAspectRetriever().getEntityRegistry())
200+
.applyPatch(prevLatestVersionProperties, retrieverContext.getAspectRetriever());
201+
202+
return Stream.of(updateVersionSetProperties, updateOldLatestVersionProperties);
203+
}
204+
}

0 commit comments

Comments
 (0)