Skip to content

Commit 48f3cc5

Browse files
fix(pgsql): Postgres doesn't support UNION select with FOR UPDATE (#12169)
1 parent 4392d72 commit 48f3cc5

File tree

3 files changed

+85
-4
lines changed

3 files changed

+85
-4
lines changed

metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java

+83-4
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,14 @@ public class EbeanAspectDao implements AspectDao, AspectMigrationsDao {
9393
*/
9494
private final LoadingCache<String, Lock> locks;
9595

96+
private final String batchGetMethod;
97+
9698
public EbeanAspectDao(@Nonnull final Database server, EbeanConfiguration ebeanConfiguration) {
9799
_server = server;
100+
this.batchGetMethod =
101+
ebeanConfiguration.getBatchGetMethod() != null
102+
? ebeanConfiguration.getBatchGetMethod()
103+
: "IN";
98104
if (ebeanConfiguration.getLocking().isEnabled()) {
99105
this.locks =
100106
CacheBuilder.newBuilder()
@@ -371,23 +377,37 @@ private List<EbeanAspectV2> batchGet(
371377

372378
final int totalPageCount = QueryUtils.getTotalPageCount(keys.size(), keysCount);
373379
final List<EbeanAspectV2> finalResult =
374-
batchGetUnion(new ArrayList<>(keys), keysCount, position, forUpdate);
380+
batchGetSelectString(new ArrayList<>(keys), keysCount, position, forUpdate);
375381

376382
while (QueryUtils.hasMore(position, keysCount, totalPageCount)) {
377383
position += keysCount;
378384
final List<EbeanAspectV2> oneStatementResult =
379-
batchGetUnion(new ArrayList<>(keys), keysCount, position, forUpdate);
385+
batchGetSelectString(new ArrayList<>(keys), keysCount, position, forUpdate);
380386
finalResult.addAll(oneStatementResult);
381387
}
382388

383389
return finalResult;
384390
}
385391

392+
@Nonnull
393+
private List<EbeanAspectV2> batchGetSelectString(
394+
@Nonnull final List<EbeanAspectV2.PrimaryKey> keys,
395+
final int keysCount,
396+
final int position,
397+
boolean forUpdate) {
398+
399+
if (batchGetMethod.equals("IN")) {
400+
return batchGetIn(keys, keysCount, position, forUpdate);
401+
}
402+
403+
return batchGetUnion(keys, keysCount, position, forUpdate);
404+
}
405+
386406
/**
387407
* Builds a single SELECT statement for batch get, which selects one entity, and then can be
388408
* UNION'd with other SELECT statements.
389409
*/
390-
private String batchGetSelect(
410+
private String batchGetSelectString(
391411
final int selectId,
392412
@Nonnull final String urn,
393413
@Nonnull final String aspect,
@@ -434,7 +454,7 @@ private List<EbeanAspectV2> batchGetUnion(
434454
final Map<String, Object> params = new HashMap<>();
435455
for (int index = position; index < end; index++) {
436456
sb.append(
437-
batchGetSelect(
457+
batchGetSelectString(
438458
index - position,
439459
keys.get(index).getUrn(),
440460
keys.get(index).getAspect(),
@@ -467,6 +487,65 @@ private List<EbeanAspectV2> batchGetUnion(
467487
return query.findList();
468488
}
469489

490+
@Nonnull
491+
private List<EbeanAspectV2> batchGetIn(
492+
@Nonnull final List<EbeanAspectV2.PrimaryKey> keys,
493+
final int keysCount,
494+
final int position,
495+
boolean forUpdate) {
496+
validateConnection();
497+
498+
// Build a single SELECT with IN clause using composite key comparison
499+
// Query will look like:
500+
// SELECT * FROM metadata_aspect WHERE (urn, aspect, version) IN
501+
// (('urn0', 'aspect0', 0), ('urn1', 'aspect1', 1))
502+
final StringBuilder sb = new StringBuilder();
503+
sb.append(
504+
"SELECT urn, aspect, version, metadata, systemMetadata, createdOn, createdBy, createdFor ");
505+
sb.append("FROM metadata_aspect_v2 WHERE (urn, aspect, version) IN (");
506+
507+
final int end = Math.min(keys.size(), position + keysCount);
508+
final Map<String, Object> params = new HashMap<>();
509+
510+
for (int index = position; index < end; index++) {
511+
int paramIndex = index - position;
512+
String urnParam = "urn" + paramIndex;
513+
String aspectParam = "aspect" + paramIndex;
514+
String versionParam = "version" + paramIndex;
515+
516+
params.put(urnParam, keys.get(index).getUrn());
517+
params.put(aspectParam, keys.get(index).getAspect());
518+
params.put(versionParam, keys.get(index).getVersion());
519+
520+
sb.append("(:" + urnParam + ", :" + aspectParam + ", :" + versionParam + ")");
521+
522+
if (index != end - 1) {
523+
sb.append(",");
524+
}
525+
}
526+
527+
sb.append(")");
528+
529+
if (forUpdate) {
530+
sb.append(" FOR UPDATE");
531+
}
532+
533+
final RawSql rawSql =
534+
RawSqlBuilder.parse(sb.toString())
535+
.columnMapping(EbeanAspectV2.URN_COLUMN, "key.urn")
536+
.columnMapping(EbeanAspectV2.ASPECT_COLUMN, "key.aspect")
537+
.columnMapping(EbeanAspectV2.VERSION_COLUMN, "key.version")
538+
.create();
539+
540+
final Query<EbeanAspectV2> query = _server.find(EbeanAspectV2.class).setRawSql(rawSql);
541+
542+
for (Map.Entry<String, Object> param : params.entrySet()) {
543+
query.setParameter(param.getKey(), param.getValue());
544+
}
545+
546+
return query.findList();
547+
}
548+
470549
@Override
471550
@Nonnull
472551
public ListResult<String> listUrns(

metadata-service/configuration/src/main/java/com/linkedin/metadata/config/EbeanConfiguration.java

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public class EbeanConfiguration {
2323
private boolean autoCreateDdl;
2424
private boolean postgresUseIamAuth;
2525
private LockingConfiguration locking;
26+
private String batchGetMethod;
2627

2728
public static final EbeanConfiguration testDefault =
2829
EbeanConfiguration.builder().locking(LockingConfiguration.testDefault).build();

metadata-service/configuration/src/main/resources/application.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ ebean:
164164
waitTimeoutMillis: ${EBEAN_WAIT_TIMEOUT_MILLIS:1000}
165165
autoCreateDdl: ${EBEAN_AUTOCREATE:false}
166166
postgresUseIamAuth: ${EBEAN_POSTGRES_USE_AWS_IAM_AUTH:false}
167+
batchGetMethod: ${EBEAN_BATCH_GET_METHOD:IN} # Alternative UNION
167168
locking:
168169
enabled: ${EBEAN_LOCKING_ENABLED:false}
169170
durationSeconds: ${EBEAN_LOCKING_DURATION_SECONDS:60}

0 commit comments

Comments
 (0)