Skip to content

Commit 3757a44

Browse files
feat(metadata-io): enable rollback transaction support (#12509)
1 parent 1e0f993 commit 3757a44

File tree

16 files changed

+260
-285
lines changed

16 files changed

+260
-285
lines changed

docs/how/updating-datahub.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
3131
## 0.15.0
3232

3333
- OpenAPI Update: PIT Keep Alive parameter added to scroll endpoints. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set.
34-
- OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings.
34+
- OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings.
35+
36+
### Known Issues
37+
38+
- Persistence Exception: No Rows Updated may occur if a transaction does not change any aspect's data.
3539

3640
### Breaking Changes
3741

docs/managed-datahub/release-notes/v_0_3_7.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ If you are using an older CLI/SDK version, then please upgrade it. This applies
1313

1414
## Known Issues
1515

16+
- Persistence Exception: No Rows Updated may occur if a transaction does not change any aspect's data.
17+
1618
### v0.3.7.8
1719
* Notes Feature
1820
* Adding a Note to an entity will result in that note showing up in the Settings > Home Page list of announcements as well as the profile page of the entity.

docs/managed-datahub/release-notes/v_0_3_8.md

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Recommended CLI/SDK
1212
## Known Issues
1313

1414
- Async APIs - DataHub's asynchronous APIs perform only basic schema validation when receiving MCP requests, similar to direct production to MCP Kafka topics. While requests must conform to the MCP schema to be accepted, actual processing happens later in the pipeline. Any processing failures that occur after the initial acceptance are captured in the Failed MCP topic, but these failures are not immediately surfaced to the API caller since they happen asynchronously.
15+
- Persistence Exception: No Rows Updated may occur if a transaction does not change any aspect's data.
1516

1617
## Release Changelog
1718
---

metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.sql.Timestamp;
1111
import java.util.List;
1212
import java.util.Map;
13+
import java.util.Optional;
1314
import java.util.Set;
1415
import java.util.function.Function;
1516
import java.util.stream.Stream;
@@ -169,15 +170,16 @@ default Map<String, Long> getNextVersions(
169170
void setWritable(boolean canWrite);
170171

171172
@Nonnull
172-
<T> T runInTransactionWithRetry(
173-
@Nonnull final Function<TransactionContext, T> block, final int maxTransactionRetry);
173+
<T> Optional<T> runInTransactionWithRetry(
174+
@Nonnull final Function<TransactionContext, TransactionResult<T>> block,
175+
final int maxTransactionRetry);
174176

175177
@Nonnull
176-
default <T> List<T> runInTransactionWithRetry(
177-
@Nonnull final Function<TransactionContext, T> block,
178+
default <T> Optional<T> runInTransactionWithRetry(
179+
@Nonnull final Function<TransactionContext, TransactionResult<T>> block,
178180
AspectsBatch batch,
179181
final int maxTransactionRetry) {
180-
return List.of(runInTransactionWithRetry(block, maxTransactionRetry));
182+
return runInTransactionWithRetry(block, maxTransactionRetry);
181183
}
182184

183185
default void incrementWriteMetrics(String aspectName, long count, long bytes) {

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

+169-157
Large diffs are not rendered by default.

metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java

+6
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,10 @@ public void flush() {
7272
tx.flush();
7373
}
7474
}
75+
76+
public void rollback() {
77+
if (tx != null) {
78+
tx.rollback();
79+
}
80+
}
7581
}

metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.linkedin.metadata.entity.EntityAspectIdentifier;
3131
import com.linkedin.metadata.entity.ListResult;
3232
import com.linkedin.metadata.entity.TransactionContext;
33+
import com.linkedin.metadata.entity.TransactionResult;
3334
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
3435
import com.linkedin.metadata.entity.ebean.PartitionedStream;
3536
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
@@ -44,6 +45,7 @@
4445
import java.util.List;
4546
import java.util.Map;
4647
import java.util.Objects;
48+
import java.util.Optional;
4749
import java.util.Set;
4850
import java.util.function.Function;
4951
import java.util.stream.Collectors;
@@ -294,17 +296,17 @@ public ListResult<String> listAspectMetadata(
294296
aspectMetadatas, listResultMetadata, start, pageNumber, pageSize, totalCount);
295297
}
296298

297-
@Override
298299
@Nonnull
299-
public <T> T runInTransactionWithRetry(
300-
@Nonnull final Function<TransactionContext, T> block, final int maxTransactionRetry) {
300+
@Override
301+
public <T> Optional<T> runInTransactionWithRetry(
302+
@Nonnull Function<TransactionContext, TransactionResult<T>> block, int maxTransactionRetry) {
301303
validateConnection();
302304
TransactionContext txContext = TransactionContext.empty(maxTransactionRetry);
303305
do {
304306
try {
305307
// TODO: Try to bend this code to make use of Cassandra batches. This method is called from
306308
// single-urn operations, so perf should not suffer much
307-
return block.apply(txContext);
309+
return block.apply(txContext).getResults();
308310
} catch (DriverException exception) {
309311
txContext.addException(exception);
310312
}

metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java

+17-85
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
import com.codahale.metrics.MetricRegistry;
66
import com.datahub.util.exception.ModelConversionException;
77
import com.datahub.util.exception.RetryLimitReached;
8-
import com.google.common.cache.CacheBuilder;
9-
import com.google.common.cache.CacheLoader;
10-
import com.google.common.cache.LoadingCache;
118
import com.linkedin.common.AuditStamp;
129
import com.linkedin.common.urn.Urn;
1310
import com.linkedin.metadata.aspect.RetrieverContext;
@@ -20,6 +17,7 @@
2017
import com.linkedin.metadata.entity.EntityAspectIdentifier;
2118
import com.linkedin.metadata.entity.ListResult;
2219
import com.linkedin.metadata.entity.TransactionContext;
20+
import com.linkedin.metadata.entity.TransactionResult;
2321
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
2422
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
2523
import com.linkedin.metadata.models.AspectSpec;
@@ -53,14 +51,10 @@
5351
import java.util.Collection;
5452
import java.util.Collections;
5553
import java.util.HashMap;
56-
import java.util.LinkedList;
5754
import java.util.List;
5855
import java.util.Map;
56+
import java.util.Optional;
5957
import java.util.Set;
60-
import java.util.concurrent.ExecutionException;
61-
import java.util.concurrent.TimeUnit;
62-
import java.util.concurrent.locks.Lock;
63-
import java.util.concurrent.locks.ReentrantLock;
6458
import java.util.function.Function;
6559
import java.util.stream.Collectors;
6660
import java.util.stream.Stream;
@@ -89,12 +83,6 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
8983
// more testing.
9084
private int _queryKeysCount = 375; // 0 means no pagination on keys
9185

92-
/**
93-
* Used to control write concurrency when an entity key aspect is present. If a batch contains an
94-
* entity key aspect, only allow a single execution per URN
95-
*/
96-
private final LoadingCache<String, Lock> locks;
97-
9886
private final String batchGetMethod;
9987

10088
public EbeanAspectDao(@Nonnull final Database server, EbeanConfiguration ebeanConfiguration) {
@@ -103,21 +91,6 @@ public EbeanAspectDao(@Nonnull final Database server, EbeanConfiguration ebeanCo
10391
ebeanConfiguration.getBatchGetMethod() != null
10492
? ebeanConfiguration.getBatchGetMethod()
10593
: "IN";
106-
if (ebeanConfiguration.getLocking().isEnabled()) {
107-
this.locks =
108-
CacheBuilder.newBuilder()
109-
.maximumSize(ebeanConfiguration.getLocking().getMaximumLocks())
110-
.expireAfterWrite(
111-
ebeanConfiguration.getLocking().getDurationSeconds(), TimeUnit.SECONDS)
112-
.build(
113-
new CacheLoader<>() {
114-
public Lock load(String key) {
115-
return new ReentrantLock(true);
116-
}
117-
});
118-
} else {
119-
this.locks = null;
120-
}
12194
}
12295

12396
@Override
@@ -764,83 +737,42 @@ public ListResult<String> listLatestAspectMetadata(
764737

765738
@Override
766739
@Nonnull
767-
public <T> T runInTransactionWithRetry(
768-
@Nonnull final Function<TransactionContext, T> block, final int maxTransactionRetry) {
769-
return runInTransactionWithRetry(block, null, maxTransactionRetry).get(0);
740+
public <T> Optional<T> runInTransactionWithRetry(
741+
@Nonnull final Function<TransactionContext, TransactionResult<T>> block,
742+
final int maxTransactionRetry) {
743+
return runInTransactionWithRetry(block, null, maxTransactionRetry);
770744
}
771745

772746
@Override
773747
@Nonnull
774-
public <T> List<T> runInTransactionWithRetry(
775-
@Nonnull final Function<TransactionContext, T> block,
748+
public <T> Optional<T> runInTransactionWithRetry(
749+
@Nonnull final Function<TransactionContext, TransactionResult<T>> block,
776750
@Nullable AspectsBatch batch,
777751
final int maxTransactionRetry) {
778752

779-
LinkedList<T> result = new LinkedList<>();
780-
781-
if (locks != null && batch != null) {
782-
Set<Urn> urnsWithKeyAspects =
783-
batch.getMCPItems().stream()
784-
.filter(i -> i.getEntitySpec().getKeyAspectSpec().equals(i.getAspectSpec()))
785-
.map(MCPItem::getUrn)
786-
.collect(Collectors.toSet());
787-
788-
if (!urnsWithKeyAspects.isEmpty()) {
789-
790-
// Split into batches by urn with key aspect, remaining aspects in the pair's second
791-
Pair<List<AspectsBatch>, AspectsBatch> splitBatches =
792-
splitByUrn(batch, urnsWithKeyAspects, batch.getRetrieverContext());
793-
794-
// Run non-key aspect `other` batch per normal
795-
if (!splitBatches.getSecond().getItems().isEmpty()) {
796-
result.add(
797-
runInTransactionWithRetryUnlocked(
798-
block, splitBatches.getSecond(), maxTransactionRetry));
799-
}
800-
801-
// For each key aspect batch
802-
for (AspectsBatch splitBatch : splitBatches.getFirst()) {
803-
try {
804-
Lock lock =
805-
locks.get(splitBatch.getMCPItems().stream().findFirst().get().getUrn().toString());
806-
lock.lock();
807-
try {
808-
result.add(runInTransactionWithRetryUnlocked(block, splitBatch, maxTransactionRetry));
809-
} finally {
810-
lock.unlock();
811-
}
812-
} catch (ExecutionException e) {
813-
throw new RuntimeException(e);
814-
}
815-
}
816-
} else {
817-
// No key aspects found, run per normal
818-
result.add(runInTransactionWithRetryUnlocked(block, batch, maxTransactionRetry));
819-
}
820-
} else {
821-
// locks disabled or null batch
822-
result.add(runInTransactionWithRetryUnlocked(block, batch, maxTransactionRetry));
823-
}
824-
825-
return result;
753+
return runInTransactionWithRetryUnlocked(block, batch, maxTransactionRetry).getResults();
826754
}
827755

828756
@Nonnull
829-
public <T> T runInTransactionWithRetryUnlocked(
830-
@Nonnull final Function<TransactionContext, T> block,
757+
public <T> TransactionResult<T> runInTransactionWithRetryUnlocked(
758+
@Nonnull final Function<TransactionContext, TransactionResult<T>> block,
831759
@Nullable AspectsBatch batch,
832760
final int maxTransactionRetry) {
833761

834762
validateConnection();
835763
TransactionContext transactionContext = TransactionContext.empty(maxTransactionRetry);
836764

837-
T result = null;
765+
TransactionResult<T> result = null;
838766
do {
839767
try (Transaction transaction =
840768
_server.beginTransaction(TxScope.requiresNew().setIsolation(TX_ISOLATION))) {
841769
transaction.setBatchMode(true);
842770
result = block.apply(transactionContext.tx(transaction));
843-
transaction.commit();
771+
if (result.isCommitOrRollback()) {
772+
transaction.commit();
773+
} else {
774+
transaction.rollback();
775+
}
844776
break;
845777
} catch (PersistenceException exception) {
846778
if (exception instanceof DuplicateKeyException) {

metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.sql.Timestamp;
4242
import java.util.List;
4343
import java.util.Map;
44+
import java.util.Optional;
4445
import org.mockito.Mockito;
4546
import org.testng.annotations.Test;
4647

@@ -129,7 +130,7 @@ public void testDeleteUniqueRefGeneratesValidMCP() {
129130
1);
130131

131132
Mockito.when(_aspectDao.runInTransactionWithRetry(Mockito.any(), Mockito.anyInt()))
132-
.thenReturn(result);
133+
.thenReturn(Optional.of(result));
133134

134135
final DeleteReferencesResponse response =
135136
_deleteEntityService.deleteReferencesTo(opContext, container, false);

metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.linkedin.metadata.aspect.batch.AspectsBatch;
1313
import com.linkedin.metadata.config.EbeanConfiguration;
1414
import com.linkedin.metadata.entity.EntityAspectIdentifier;
15+
import com.linkedin.metadata.entity.TransactionResult;
1516
import io.ebean.Database;
1617
import io.ebean.test.LoggedSql;
1718
import java.util.List;
@@ -38,7 +39,7 @@ public void testGetNextVersionForUpdate() {
3839
(txContext) -> {
3940
testDao.getNextVersions(
4041
Map.of("urn:li:corpuser:testGetNextVersionForUpdate", Set.of("status")));
41-
return "";
42+
return TransactionResult.commit("");
4243
},
4344
mock(AspectsBatch.class),
4445
0);
@@ -61,7 +62,7 @@ public void testGetLatestAspectsForUpdate() throws JsonProcessingException {
6162
(txContext) -> {
6263
testDao.getLatestAspects(
6364
Map.of("urn:li:corpuser:testGetLatestAspectsForUpdate", Set.of("status")), true);
64-
return "";
65+
return TransactionResult.commit("");
6566
},
6667
mock(AspectsBatch.class),
6768
0);
@@ -94,7 +95,7 @@ public void testbatchGetForUpdate() throws JsonProcessingException {
9495
DATA_PLATFORM_INSTANCE_ASPECT_NAME,
9596
ASPECT_LATEST_VERSION)),
9697
true);
97-
return "";
98+
return TransactionResult.commit("");
9899
},
99100
mock(AspectsBatch.class),
100101
0);

metadata-service/configuration/src/main/java/com/linkedin/metadata/config/EbeanConfiguration.java

+1-20
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,7 @@ public class EbeanConfiguration {
2222
private long waitTimeoutMillis;
2323
private boolean autoCreateDdl;
2424
private boolean postgresUseIamAuth;
25-
private LockingConfiguration locking;
2625
private String batchGetMethod;
2726

28-
public static final EbeanConfiguration testDefault =
29-
EbeanConfiguration.builder().locking(LockingConfiguration.testDefault).build();
30-
31-
@Data
32-
@Builder
33-
@AllArgsConstructor
34-
@NoArgsConstructor
35-
public static class LockingConfiguration {
36-
private boolean enabled;
37-
private long durationSeconds;
38-
private long maximumLocks;
39-
40-
public static final LockingConfiguration testDefault =
41-
LockingConfiguration.builder()
42-
.enabled(true)
43-
.durationSeconds(60)
44-
.maximumLocks(10000)
45-
.build();
46-
}
27+
public static final EbeanConfiguration testDefault = EbeanConfiguration.builder().build();
4728
}

metadata-service/configuration/src/main/resources/application.yaml

-4
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,6 @@ ebean:
172172
autoCreateDdl: ${EBEAN_AUTOCREATE:false}
173173
postgresUseIamAuth: ${EBEAN_POSTGRES_USE_AWS_IAM_AUTH:false}
174174
batchGetMethod: ${EBEAN_BATCH_GET_METHOD:IN} # Alternative UNION
175-
locking:
176-
enabled: ${EBEAN_LOCKING_ENABLED:false}
177-
durationSeconds: ${EBEAN_LOCKING_DURATION_SECONDS:60}
178-
maximumLocks: ${EBEAN_LOCKING_MAXIMUM_LOCKS:20000}
179175

180176
# Only required if entityService.impl is cassandra
181177
cassandra:

metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.linkedin.metadata.entity.AspectDao;
1717
import com.linkedin.metadata.entity.IngestAspectsResult;
1818
import com.linkedin.metadata.entity.TransactionContext;
19+
import com.linkedin.metadata.entity.TransactionResult;
1920
import com.linkedin.metadata.entity.UpdateAspectResult;
2021
import com.linkedin.metadata.event.EventProducer;
2122
import io.datahubproject.metadata.context.OperationContext;
@@ -71,14 +72,16 @@ public void setup()
7172
OperationContext opContext = TestOperationContexts.systemContextNoSearchAuthorization();
7273
AspectDao aspectDao = Mockito.mock(AspectDao.class);
7374
when(aspectDao.runInTransactionWithRetry(
74-
ArgumentMatchers.<Function<TransactionContext, List<UpdateAspectResult>>>any(),
75+
ArgumentMatchers
76+
.<Function<TransactionContext, TransactionResult<List<UpdateAspectResult>>>>any(),
7577
any(AspectsBatch.class),
7678
anyInt()))
7779
.thenAnswer(
7880
i ->
79-
List.of(
80-
((Function<TransactionContext, IngestAspectsResult>) i.getArgument(0))
81-
.apply(TransactionContext.empty(Mockito.mock(Transaction.class), 0))));
81+
((Function<TransactionContext, TransactionResult<IngestAspectsResult>>)
82+
i.getArgument(0))
83+
.apply(TransactionContext.empty(Mockito.mock(Transaction.class), 0))
84+
.getResults());
8285

8386
EventProducer mockEntityEventProducer = Mockito.mock(EventProducer.class);
8487
PreProcessHooks preProcessHooks = new PreProcessHooks();

0 commit comments

Comments
 (0)