Skip to content

Commit 927fdf9

Browse files
committed
feat(restore-indices): SQL read-only mode
1 parent 7e749ff commit 927fdf9

File tree

23 files changed

+120
-41
lines changed

23 files changed

+120
-41
lines changed

datahub-upgrade/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ task runRestoreIndices(type: Exec) {
137137
"-jar",
138138
"-Dkafka.schemaRegistry.url=http://localhost:8080/schema-registry/api",
139139
"-Dserver.port=8083",
140-
bootJar.getArchiveFile().get(), "-u", "RestoreIndices", "-a", "batchSize=100"
140+
bootJar.getArchiveFile().get(), "-u", "RestoreIndices", "-a", "batchSize=100", "-a", "readOnly=true"
141141
}
142142

143143
task runRestoreIndicesUrn(type: Exec) {

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class RestoreIndices implements Upgrade {
2626
public static final String URN_ARG_NAME = "urn";
2727
public static final String URN_LIKE_ARG_NAME = "urnLike";
2828
public static final String URN_BASED_PAGINATION_ARG_NAME = "urnBasedPagination";
29+
public static final String READ_ONLY_ARG_NAME = "readOnly";
2930

3031
public static final String STARTING_OFFSET_ARG_NAME = "startingOffset";
3132
public static final String LAST_URN_ARG_NAME = "lastUrn";

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java

+9
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
101101
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
102102
result.start = getStartingOffset(context.parsedArgs());
103103
result.urnBasedPagination = getUrnBasedPagination(context.parsedArgs());
104+
result.readOnly = getReadOnly(context.parsedArgs());
104105
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) {
105106
result.aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME).get();
106107
context.report().addLine(String.format("aspect is %s", result.aspectName));
@@ -315,6 +316,14 @@ private long getBatchDelayMs(final Map<String, Optional<String>> parsedArgs) {
315316
return resolvedBatchDelayMs;
316317
}
317318

319+
private boolean getReadOnly(final Map<String, Optional<String>> parsedArgs) {
320+
Boolean readOnly = null;
321+
if (containsKey(parsedArgs, RestoreIndices.READ_ONLY_ARG_NAME)) {
322+
readOnly = Boolean.parseBoolean(parsedArgs.get(RestoreIndices.READ_ONLY_ARG_NAME).get());
323+
}
324+
return readOnly != null ? readOnly : false;
325+
}
326+
318327
private int getThreadCount(final Map<String, Optional<String>> parsedArgs) {
319328
return getInt(parsedArgs, DEFAULT_THREADS, RestoreIndices.NUM_THREADS_ARG_NAME);
320329
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
113113
List<Pair<Future<?>, SystemAspect>> futures;
114114
futures =
115115
EntityUtils.toSystemAspectFromEbeanAspects(
116-
opContext.getRetrieverContext(), batch.collect(Collectors.toList()))
116+
opContext.getRetrieverContext(),
117+
batch.collect(Collectors.toList()),
118+
false)
117119
.stream()
118120
.map(
119121
systemAspect -> {

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
175175
ebeanAspectV2 ->
176176
EntityUtils.toSystemAspectFromEbeanAspects(
177177
opContext.getRetrieverContext(),
178-
Set.of(ebeanAspectV2))
178+
Set.of(ebeanAspectV2),
179+
true)
179180
.stream())
180181
.map(
181182
systemAspect ->

docs/how/restore-indices.md

+10-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ The following configurations are available:
3939
* `aspectNames`: Comma-separated list of aspects to restore (e.g., "ownership,status")
4040
* `urnLike`: SQL LIKE pattern to filter URNs (e.g., "urn:li:dataset%")
4141

42+
43+
### Elasticsearch Only
44+
45+
* `readOnly`: Enable SQL read-only mode
46+
47+
During the restore indices process, it will create missing default aspects in SQL by default. While this may be
48+
desired in some situations, disabling this feature is required when using a read-only SQL replica.
49+
4250
### Nuclear option
4351
* `clean`: This option wipes out the current indices by running deletes of all the documents to guarantee a consistent state with SQL. This is generally not recommended unless there is significant data corruption on the instance.
4452

@@ -264,7 +272,8 @@ Remember to reset this after restoration completes!
264272

265273
#### SQL/Primary Storage
266274

267-
Consider using a read replica as the source of the job's data.
275+
Consider using a read replica as the source of the job's data. If you enable configure a read-only replica
276+
you must also provide the parameter `readOnly=true`.
268277

269278
#### Kafka & Consumers
270279

metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java

+5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ public class KafkaEventProducer extends EventProducer {
3939
private final TopicConvention _topicConvention;
4040
private final KafkaHealthChecker _kafkaHealthChecker;
4141

42+
@Override
43+
public void flush() {
44+
_producer.flush();
45+
}
46+
4247
/**
4348
* Constructor.
4449
*

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

+34-25
Original file line numberDiff line numberDiff line change
@@ -1674,9 +1674,10 @@ public List<RestoreIndicesResult> restoreIndices(
16741674

16751675
List<SystemAspect> systemAspects =
16761676
EntityUtils.toSystemAspectFromEbeanAspects(
1677-
opContext.getRetrieverContext(), batch.collect(Collectors.toList()));
1677+
opContext.getRetrieverContext(), batch.collect(Collectors.toList()), false);
16781678

1679-
RestoreIndicesResult result = restoreIndices(opContext, systemAspects, logger);
1679+
RestoreIndicesResult result =
1680+
restoreIndices(opContext, systemAspects, logger, args.readOnly());
16801681
result.timeSqlQueryMs = timeSqlQueryMs;
16811682

16821683
logger.accept("Batch completed.");
@@ -1698,7 +1699,8 @@ public List<RestoreIndicesResult> restoreIndices(
16981699
@Nonnull OperationContext opContext,
16991700
@Nonnull Set<Urn> urns,
17001701
@Nullable Set<String> inputAspectNames,
1701-
@Nullable Integer inputBatchSize)
1702+
@Nullable Integer inputBatchSize,
1703+
boolean readOnly)
17021704
throws RemoteInvocationException, URISyntaxException {
17031705
int batchSize = inputBatchSize != null ? inputBatchSize : 100;
17041706

@@ -1723,7 +1725,7 @@ public List<RestoreIndicesResult> restoreIndices(
17231725
false);
17241726
long timeSqlQueryMs = System.currentTimeMillis() - startTime;
17251727

1726-
RestoreIndicesResult result = restoreIndices(opContext, systemAspects, s -> {});
1728+
RestoreIndicesResult result = restoreIndices(opContext, systemAspects, s -> {}, readOnly);
17271729
result.timeSqlQueryMs = timeSqlQueryMs;
17281730
results.add(result);
17291731
}
@@ -1742,7 +1744,8 @@ public List<RestoreIndicesResult> restoreIndices(
17421744
private RestoreIndicesResult restoreIndices(
17431745
@Nonnull OperationContext opContext,
17441746
List<SystemAspect> systemAspects,
1745-
@Nonnull Consumer<String> logger) {
1747+
@Nonnull Consumer<String> logger,
1748+
boolean readOnly) {
17461749
RestoreIndicesResult result = new RestoreIndicesResult();
17471750
long startTime = System.currentTimeMillis();
17481751
int ignored = 0;
@@ -1844,26 +1847,29 @@ private RestoreIndicesResult restoreIndices(
18441847
.getFirst());
18451848

18461849
// 6. Ensure default aspects are in existence in SQL
1847-
List<MCPItem> keyAspect =
1848-
List.of(
1849-
ChangeItemImpl.builder()
1850-
.urn(urn)
1851-
.aspectName(entitySpec.getKeyAspectName())
1852-
.changeType(ChangeType.UPSERT)
1853-
.entitySpec(entitySpec)
1854-
.aspectSpec(entitySpec.getKeyAspectSpec())
1855-
.auditStamp(auditStamp)
1856-
.systemMetadata(latestSystemMetadata)
1857-
.recordTemplate(EntityApiUtils.buildKeyAspect(opContext.getEntityRegistry(), urn))
1858-
.build(opContext.getAspectRetriever()));
1859-
Stream<IngestResult> defaultAspectsResult =
1860-
ingestProposalSync(
1861-
opContext,
1862-
AspectsBatchImpl.builder()
1863-
.retrieverContext(opContext.getRetrieverContext())
1864-
.items(keyAspect)
1865-
.build());
1866-
defaultAspectsCreated += defaultAspectsResult.count();
1850+
if (!readOnly) {
1851+
List<MCPItem> keyAspect =
1852+
List.of(
1853+
ChangeItemImpl.builder()
1854+
.urn(urn)
1855+
.aspectName(entitySpec.getKeyAspectName())
1856+
.changeType(ChangeType.UPSERT)
1857+
.entitySpec(entitySpec)
1858+
.aspectSpec(entitySpec.getKeyAspectSpec())
1859+
.auditStamp(auditStamp)
1860+
.systemMetadata(latestSystemMetadata)
1861+
.recordTemplate(
1862+
EntityApiUtils.buildKeyAspect(opContext.getEntityRegistry(), urn))
1863+
.build(opContext.getAspectRetriever()));
1864+
Stream<IngestResult> defaultAspectsResult =
1865+
ingestProposalSync(
1866+
opContext,
1867+
AspectsBatchImpl.builder()
1868+
.retrieverContext(opContext.getRetrieverContext())
1869+
.items(keyAspect)
1870+
.build());
1871+
defaultAspectsCreated += defaultAspectsResult.count();
1872+
}
18671873

18681874
result.sendMessageMs += System.currentTimeMillis() - startTime;
18691875

@@ -1883,6 +1889,9 @@ private RestoreIndicesResult restoreIndices(
18831889
result.ignored = ignored;
18841890
result.rowsMigrated = rowsMigrated;
18851891
result.defaultAspectsCreated = defaultAspectsCreated;
1892+
1893+
producer.flush();
1894+
18861895
return result;
18871896
}
18881897

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityUtils.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,13 @@ public static Map<String, Map<String, SystemAspect>> toSystemAspects(
217217

218218
@Nonnull
219219
public static List<SystemAspect> toSystemAspectFromEbeanAspects(
220-
@Nonnull RetrieverContext retrieverContext, @Nonnull Collection<EbeanAspectV2> rawAspects) {
220+
@Nonnull RetrieverContext retrieverContext,
221+
@Nonnull Collection<EbeanAspectV2> rawAspects,
222+
boolean forUpdate) {
221223
return toSystemAspects(
222224
retrieverContext,
223225
rawAspects.stream().map(EbeanAspectV2::toEntityAspect).collect(Collectors.toList()),
224-
true);
226+
forUpdate);
225227
}
226228

227229
/**

metadata-io/src/main/java/com/linkedin/metadata/event/EventProducer.java

+3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
@Slf4j
2525
public abstract class EventProducer {
2626

27+
/** Flush the producer * */
28+
public abstract void flush();
29+
2730
/**
2831
* Produces a {@link com.linkedin.mxe.MetadataChangeLog} from a new & previous aspect.
2932
*

metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/operations/elastic/ElasticsearchController.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,8 @@ public ResponseEntity<List<RestoreIndicesResult>> restoreIndices(
490490
@RequestParam(required = false, name = "limit", defaultValue = "0") @Nullable Integer limit,
491491
@RequestParam(required = false, name = "gePitEpochMs", defaultValue = "0") @Nullable
492492
Long gePitEpochMs,
493-
@RequestParam(required = false, name = "lePitEpochMs") @Nullable Long lePitEpochMs) {
493+
@RequestParam(required = false, name = "lePitEpochMs") @Nullable Long lePitEpochMs,
494+
@RequestParam(value = "readOnly", defaultValue = "false") Boolean readOnly) {
494495

495496
Authentication authentication = AuthenticationContext.getAuthentication();
496497
OperationContext opContext =
@@ -519,7 +520,8 @@ public ResponseEntity<List<RestoreIndicesResult>> restoreIndices(
519520
.batchSize(batchSize)
520521
.limit(limit)
521522
.gePitEpochMs(gePitEpochMs)
522-
.lePitEpochMs(lePitEpochMs);
523+
.lePitEpochMs(lePitEpochMs)
524+
.readOnly(readOnly);
523525

524526
return ResponseEntity.of(Optional.of(entityService.restoreIndices(opContext, args, log::info)));
525527
}
@@ -532,6 +534,7 @@ public ResponseEntity<List<RestoreIndicesResult>> restoreIndices(
532534
@RequestParam(required = false, name = "aspectNames") @Nullable Set<String> aspectNames,
533535
@RequestParam(required = false, name = "batchSize", defaultValue = "100") @Nullable
534536
Integer batchSize,
537+
@RequestParam(value = "readOnly", defaultValue = "false") Boolean readOnly,
535538
@RequestBody @Nonnull Set<String> urns)
536539
throws RemoteInvocationException, URISyntaxException {
537540

@@ -556,6 +559,7 @@ public ResponseEntity<List<RestoreIndicesResult>> restoreIndices(
556559
opContext,
557560
urns.stream().map(UrnUtils::getUrn).collect(Collectors.toSet()),
558561
aspectNames,
559-
batchSize)));
562+
batchSize,
563+
readOnly)));
560564
}
561565
}

metadata-service/restli-api/src/main/idl/com.linkedin.entity.aspects.restspec.json

+4
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@
134134
"name" : "lePitEpochMs",
135135
"type" : "long",
136136
"optional" : true
137+
}, {
138+
"name" : "readOnly",
139+
"type" : "boolean",
140+
"optional" : true
137141
} ],
138142
"returns" : "string"
139143
} ],

metadata-service/restli-api/src/main/idl/com.linkedin.operations.operations.restspec.json

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@
6767
"name" : "lePitEpochMs",
6868
"type" : "long",
6969
"optional" : true
70+
}, {
71+
"name" : "readOnly",
72+
"type" : "boolean",
73+
"optional" : true
7074
} ],
7175
"returns" : "string"
7276
}, {

metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json

+5
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@
361361
"type" : "string",
362362
"doc" : "Title of the chart",
363363
"Searchable" : {
364+
"boostScore" : 10.0,
364365
"enableAutocomplete" : true,
365366
"fieldNameAliases" : [ "_entityName" ],
366367
"fieldType" : "WORD_GRAM"
@@ -4425,6 +4426,10 @@
44254426
"name" : "lePitEpochMs",
44264427
"type" : "long",
44274428
"optional" : true
4429+
}, {
4430+
"name" : "readOnly",
4431+
"type" : "boolean",
4432+
"optional" : true
44284433
} ],
44294434
"returns" : "string"
44304435
} ],

metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
"type" : "string",
9595
"doc" : "Title of the chart",
9696
"Searchable" : {
97+
"boostScore" : 10.0,
9798
"enableAutocomplete" : true,
9899
"fieldNameAliases" : [ "_entityName" ],
99100
"fieldType" : "WORD_GRAM"

metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
"type" : "string",
9595
"doc" : "Title of the chart",
9696
"Searchable" : {
97+
"boostScore" : 10.0,
9798
"enableAutocomplete" : true,
9899
"fieldNameAliases" : [ "_entityName" ],
99100
"fieldType" : "WORD_GRAM"

metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json

+5
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
"type" : "string",
9595
"doc" : "Title of the chart",
9696
"Searchable" : {
97+
"boostScore" : 10.0,
9798
"enableAutocomplete" : true,
9899
"fieldNameAliases" : [ "_entityName" ],
99100
"fieldType" : "WORD_GRAM"
@@ -4042,6 +4043,10 @@
40424043
"name" : "lePitEpochMs",
40434044
"type" : "long",
40444045
"optional" : true
4046+
}, {
4047+
"name" : "readOnly",
4048+
"type" : "boolean",
4049+
"optional" : true
40454050
} ],
40464051
"returns" : "string"
40474052
}, {

metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json

+1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
"type" : "string",
9595
"doc" : "Title of the chart",
9696
"Searchable" : {
97+
"boostScore" : 10.0,
9798
"enableAutocomplete" : true,
9899
"fieldNameAliases" : [ "_entityName" ],
99100
"fieldType" : "WORD_GRAM"

metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,8 @@ public Task<String> restoreIndices(
373373
@ActionParam("batchSize") @Optional @Nullable Integer batchSize,
374374
@ActionParam("limit") @Optional @Nullable Integer limit,
375375
@ActionParam("gePitEpochMs") @Optional @Nullable Long gePitEpochMs,
376-
@ActionParam("lePitEpochMs") @Optional @Nullable Long lePitEpochMs) {
376+
@ActionParam("lePitEpochMs") @Optional @Nullable Long lePitEpochMs,
377+
@ActionParam("readOnly") @Optional @Nullable Boolean readOnly) {
377378
return RestliUtils.toTask(systemOperationContext,
378379
() -> {
379380

@@ -390,7 +391,7 @@ public Task<String> restoreIndices(
390391
}
391392

392393
return Utils.restoreIndices(systemOperationContext, getContext(),
393-
aspectName, urn, urnLike, start, batchSize, limit, gePitEpochMs, lePitEpochMs, _authorizer, _entityService);
394+
aspectName, urn, urnLike, start, batchSize, limit, gePitEpochMs, lePitEpochMs, _authorizer, _entityService, readOnly != null ? readOnly : false);
394395
},
395396
MetricRegistry.name(this.getClass(), "restoreIndices"));
396397
}

metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/operations/OperationsResource.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,11 @@ public Task<String> restoreIndices(
103103
@ActionParam("batchSize") @Optional @Nullable Integer batchSize,
104104
@ActionParam("limit") @Optional @Nullable Integer limit,
105105
@ActionParam("gePitEpochMs") @Optional @Nullable Long gePitEpochMs,
106-
@ActionParam("lePitEpochMs") @Optional @Nullable Long lePitEpochMs) {
106+
@ActionParam("lePitEpochMs") @Optional @Nullable Long lePitEpochMs,
107+
@ActionParam("readOnly") @Optional @Nullable Boolean readOnly) {
107108
return RestliUtils.toTask(systemOperationContext,
108109
() -> Utils.restoreIndices(systemOperationContext, getContext(),
109-
aspectName, urn, urnLike, start, batchSize, limit, gePitEpochMs, lePitEpochMs, _authorizer, _entityService),
110+
aspectName, urn, urnLike, start, batchSize, limit, gePitEpochMs, lePitEpochMs, _authorizer, _entityService, readOnly != null ? readOnly : false),
110111
MetricRegistry.name(this.getClass(), "restoreIndices"));
111112
}
112113

metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/operations/Utils.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ public static String restoreIndices(
4444
@Nullable Long gePitEpochMs,
4545
@Nullable Long lePitEpochMs,
4646
@Nonnull Authorizer authorizer,
47-
@Nonnull EntityService<?> entityService) {
47+
@Nonnull EntityService<?> entityService,
48+
boolean readOnly) {
4849

4950
EntitySpec resourceSpec = null;
5051
if (StringUtils.isNotBlank(urn)) {
@@ -73,7 +74,8 @@ public static String restoreIndices(
7374
.batchSize(batchSize)
7475
.limit(limit)
7576
.gePitEpochMs(gePitEpochMs)
76-
.lePitEpochMs(lePitEpochMs);
77+
.lePitEpochMs(lePitEpochMs)
78+
.readOnly(readOnly);
7779
Map<String, Object> result = new HashMap<>();
7880
result.put("args", args);
7981
result.put("result", entityService

0 commit comments

Comments
 (0)