Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(restore-indices): createDefaultAspects argument #12859

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datahub-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ task runRestoreIndices(type: Exec) {
"-jar",
"-Dkafka.schemaRegistry.url=http://localhost:8080/schema-registry/api",
"-Dserver.port=8083",
bootJar.getArchiveFile().get(), "-u", "RestoreIndices", "-a", "batchSize=100"
bootJar.getArchiveFile().get(), "-u", "RestoreIndices", "-a", "batchSize=100", "-a", "readOnly=true"
}

task runRestoreIndicesUrn(type: Exec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class RestoreIndices implements Upgrade {
public static final String URN_ARG_NAME = "urn";
public static final String URN_LIKE_ARG_NAME = "urnLike";
public static final String URN_BASED_PAGINATION_ARG_NAME = "urnBasedPagination";
public static final String READ_ONLY_ARG_NAME = "readOnly";

public static final String STARTING_OFFSET_ARG_NAME = "startingOffset";
public static final String LAST_URN_ARG_NAME = "lastUrn";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
result.start = getStartingOffset(context.parsedArgs());
result.urnBasedPagination = getUrnBasedPagination(context.parsedArgs());
result.readOnly = getReadOnly(context.parsedArgs());
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) {
result.aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME).get();
context.report().addLine(String.format("aspect is %s", result.aspectName));
Expand Down Expand Up @@ -315,6 +316,14 @@ private long getBatchDelayMs(final Map<String, Optional<String>> parsedArgs) {
return resolvedBatchDelayMs;
}

private boolean getReadOnly(final Map<String, Optional<String>> parsedArgs) {
Boolean readOnly = null;
if (containsKey(parsedArgs, RestoreIndices.READ_ONLY_ARG_NAME)) {
readOnly = Boolean.parseBoolean(parsedArgs.get(RestoreIndices.READ_ONLY_ARG_NAME).get());
}
return readOnly != null ? readOnly : false;
}

private int getThreadCount(final Map<String, Optional<String>> parsedArgs) {
return getInt(parsedArgs, DEFAULT_THREADS, RestoreIndices.NUM_THREADS_ARG_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
List<Pair<Future<?>, SystemAspect>> futures;
futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext(), batch.collect(Collectors.toList()))
opContext.getRetrieverContext(),
batch.collect(Collectors.toList()),
false)
.stream()
.map(
systemAspect -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
ebeanAspectV2 ->
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext(),
Set.of(ebeanAspectV2))
Set.of(ebeanAspectV2),
true)
.stream())
.map(
systemAspect ->
Expand Down
11 changes: 10 additions & 1 deletion docs/how/restore-indices.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ The following configurations are available:
* `aspectNames`: Comma-separated list of aspects to restore (e.g., "ownership,status")
* `urnLike`: SQL LIKE pattern to filter URNs (e.g., "urn:li:dataset%")


### Elasticsearch Only

* `readOnly`: Enable SQL read-only mode

During the restore indices process, it will create missing default aspects in SQL by default. While this may be
desired in some situations, disabling this feature is required when using a read-only SQL replica.

### Nuclear option
* `clean`: This option wipes out the current indices by running deletes of all the documents to guarantee a consistent state with SQL. This is generally not recommended unless there is significant data corruption on the instance.

Expand Down Expand Up @@ -264,7 +272,8 @@ Remember to reset this after restoration completes!

#### SQL/Primary Storage

Consider using a read replica as the source of the job's data.
Consider using a read replica as the source of the job's data. If you enable configure a read-only replica
you must also provide the parameter `readOnly=true`.

#### Kafka & Consumers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public class KafkaEventProducer extends EventProducer {
private final TopicConvention _topicConvention;
private final KafkaHealthChecker _kafkaHealthChecker;

@Override
public void flush() {
_producer.flush();
}

/**
* Constructor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1674,9 +1674,10 @@ public List<RestoreIndicesResult> restoreIndices(

List<SystemAspect> systemAspects =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext(), batch.collect(Collectors.toList()));
opContext.getRetrieverContext(), batch.collect(Collectors.toList()), false);

RestoreIndicesResult result = restoreIndices(opContext, systemAspects, logger);
RestoreIndicesResult result =
restoreIndices(opContext, systemAspects, logger, args.readOnly());
result.timeSqlQueryMs = timeSqlQueryMs;

logger.accept("Batch completed.");
Expand All @@ -1698,7 +1699,8 @@ public List<RestoreIndicesResult> restoreIndices(
@Nonnull OperationContext opContext,
@Nonnull Set<Urn> urns,
@Nullable Set<String> inputAspectNames,
@Nullable Integer inputBatchSize)
@Nullable Integer inputBatchSize,
boolean readOnly)
throws RemoteInvocationException, URISyntaxException {
int batchSize = inputBatchSize != null ? inputBatchSize : 100;

Expand All @@ -1723,7 +1725,7 @@ public List<RestoreIndicesResult> restoreIndices(
false);
long timeSqlQueryMs = System.currentTimeMillis() - startTime;

RestoreIndicesResult result = restoreIndices(opContext, systemAspects, s -> {});
RestoreIndicesResult result = restoreIndices(opContext, systemAspects, s -> {}, readOnly);
result.timeSqlQueryMs = timeSqlQueryMs;
results.add(result);
}
Expand All @@ -1742,7 +1744,8 @@ public List<RestoreIndicesResult> restoreIndices(
private RestoreIndicesResult restoreIndices(
@Nonnull OperationContext opContext,
List<SystemAspect> systemAspects,
@Nonnull Consumer<String> logger) {
@Nonnull Consumer<String> logger,
boolean readOnly) {
RestoreIndicesResult result = new RestoreIndicesResult();
long startTime = System.currentTimeMillis();
int ignored = 0;
Expand Down Expand Up @@ -1844,26 +1847,29 @@ private RestoreIndicesResult restoreIndices(
.getFirst());

// 6. Ensure default aspects are in existence in SQL
List<MCPItem> keyAspect =
List.of(
ChangeItemImpl.builder()
.urn(urn)
.aspectName(entitySpec.getKeyAspectName())
.changeType(ChangeType.UPSERT)
.entitySpec(entitySpec)
.aspectSpec(entitySpec.getKeyAspectSpec())
.auditStamp(auditStamp)
.systemMetadata(latestSystemMetadata)
.recordTemplate(EntityApiUtils.buildKeyAspect(opContext.getEntityRegistry(), urn))
.build(opContext.getAspectRetriever()));
Stream<IngestResult> defaultAspectsResult =
ingestProposalSync(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext())
.items(keyAspect)
.build());
defaultAspectsCreated += defaultAspectsResult.count();
if (!readOnly) {
List<MCPItem> keyAspect =
List.of(
ChangeItemImpl.builder()
.urn(urn)
.aspectName(entitySpec.getKeyAspectName())
.changeType(ChangeType.UPSERT)
.entitySpec(entitySpec)
.aspectSpec(entitySpec.getKeyAspectSpec())
.auditStamp(auditStamp)
.systemMetadata(latestSystemMetadata)
.recordTemplate(
EntityApiUtils.buildKeyAspect(opContext.getEntityRegistry(), urn))
.build(opContext.getAspectRetriever()));
Stream<IngestResult> defaultAspectsResult =
ingestProposalSync(
opContext,
AspectsBatchImpl.builder()
.retrieverContext(opContext.getRetrieverContext())
.items(keyAspect)
.build());
defaultAspectsCreated += defaultAspectsResult.count();
}

result.sendMessageMs += System.currentTimeMillis() - startTime;

Expand All @@ -1883,6 +1889,9 @@ private RestoreIndicesResult restoreIndices(
result.ignored = ignored;
result.rowsMigrated = rowsMigrated;
result.defaultAspectsCreated = defaultAspectsCreated;

producer.flush();

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ public static Map<String, Map<String, SystemAspect>> toSystemAspects(

@Nonnull
public static List<SystemAspect> toSystemAspectFromEbeanAspects(
@Nonnull RetrieverContext retrieverContext, @Nonnull Collection<EbeanAspectV2> rawAspects) {
@Nonnull RetrieverContext retrieverContext,
@Nonnull Collection<EbeanAspectV2> rawAspects,
boolean forUpdate) {
return toSystemAspects(
retrieverContext,
rawAspects.stream().map(EbeanAspectV2::toEntityAspect).collect(Collectors.toList()),
true);
forUpdate);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
@Slf4j
public abstract class EventProducer {

/** Flush the producer * */
public abstract void flush();

/**
* Produces a {@link com.linkedin.mxe.MetadataChangeLog} from a new & previous aspect.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,8 @@ public ResponseEntity<List<RestoreIndicesResult>> restoreIndices(
@RequestParam(required = false, name = "limit", defaultValue = "0") @Nullable Integer limit,
@RequestParam(required = false, name = "gePitEpochMs", defaultValue = "0") @Nullable
Long gePitEpochMs,
@RequestParam(required = false, name = "lePitEpochMs") @Nullable Long lePitEpochMs) {
@RequestParam(required = false, name = "lePitEpochMs") @Nullable Long lePitEpochMs,
@RequestParam(value = "readOnly", defaultValue = "false") Boolean readOnly) {

Authentication authentication = AuthenticationContext.getAuthentication();
OperationContext opContext =
Expand Down Expand Up @@ -519,7 +520,8 @@ public ResponseEntity<List<RestoreIndicesResult>> restoreIndices(
.batchSize(batchSize)
.limit(limit)
.gePitEpochMs(gePitEpochMs)
.lePitEpochMs(lePitEpochMs);
.lePitEpochMs(lePitEpochMs)
.readOnly(readOnly);

return ResponseEntity.of(Optional.of(entityService.restoreIndices(opContext, args, log::info)));
}
Expand All @@ -532,6 +534,7 @@ public ResponseEntity<List<RestoreIndicesResult>> restoreIndices(
@RequestParam(required = false, name = "aspectNames") @Nullable Set<String> aspectNames,
@RequestParam(required = false, name = "batchSize", defaultValue = "100") @Nullable
Integer batchSize,
@RequestParam(value = "readOnly", defaultValue = "false") Boolean readOnly,
@RequestBody @Nonnull Set<String> urns)
throws RemoteInvocationException, URISyntaxException {

Expand All @@ -556,6 +559,7 @@ public ResponseEntity<List<RestoreIndicesResult>> restoreIndices(
opContext,
urns.stream().map(UrnUtils::getUrn).collect(Collectors.toSet()),
aspectNames,
batchSize)));
batchSize,
readOnly)));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.datahubproject.openapi.operations.elastic;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anySet;
Expand Down Expand Up @@ -308,7 +309,8 @@ public void testRestoreIndices() throws Exception {
when(restoreResult.getLastAspect()).thenReturn("");
restoreResults.add(restoreResult);

when(mockEntityService.restoreIndices(any(OperationContext.class), any(), any(), anyInt()))
when(mockEntityService.restoreIndices(
any(OperationContext.class), any(), any(), anyInt(), anyBoolean()))
.thenReturn(restoreResults);

// Prepare request body
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@
"name" : "lePitEpochMs",
"type" : "long",
"optional" : true
}, {
"name" : "readOnly",
"type" : "boolean",
"optional" : true
} ],
"returns" : "string"
} ],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
"name" : "lePitEpochMs",
"type" : "long",
"optional" : true
}, {
"name" : "readOnly",
"type" : "boolean",
"optional" : true
} ],
"returns" : "string"
}, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@
"type" : "string",
"doc" : "Title of the chart",
"Searchable" : {
"boostScore" : 10.0,
"enableAutocomplete" : true,
"fieldNameAliases" : [ "_entityName" ],
"fieldType" : "WORD_GRAM"
Expand Down Expand Up @@ -4425,6 +4426,10 @@
"name" : "lePitEpochMs",
"type" : "long",
"optional" : true
}, {
"name" : "readOnly",
"type" : "boolean",
"optional" : true
} ],
"returns" : "string"
} ],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"type" : "string",
"doc" : "Title of the chart",
"Searchable" : {
"boostScore" : 10.0,
"enableAutocomplete" : true,
"fieldNameAliases" : [ "_entityName" ],
"fieldType" : "WORD_GRAM"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"type" : "string",
"doc" : "Title of the chart",
"Searchable" : {
"boostScore" : 10.0,
"enableAutocomplete" : true,
"fieldNameAliases" : [ "_entityName" ],
"fieldType" : "WORD_GRAM"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"type" : "string",
"doc" : "Title of the chart",
"Searchable" : {
"boostScore" : 10.0,
"enableAutocomplete" : true,
"fieldNameAliases" : [ "_entityName" ],
"fieldType" : "WORD_GRAM"
Expand Down Expand Up @@ -4042,6 +4043,10 @@
"name" : "lePitEpochMs",
"type" : "long",
"optional" : true
}, {
"name" : "readOnly",
"type" : "boolean",
"optional" : true
} ],
"returns" : "string"
}, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"type" : "string",
"doc" : "Title of the chart",
"Searchable" : {
"boostScore" : 10.0,
"enableAutocomplete" : true,
"fieldNameAliases" : [ "_entityName" ],
"fieldType" : "WORD_GRAM"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ public Task<String> restoreIndices(
@ActionParam("batchSize") @Optional @Nullable Integer batchSize,
@ActionParam("limit") @Optional @Nullable Integer limit,
@ActionParam("gePitEpochMs") @Optional @Nullable Long gePitEpochMs,
@ActionParam("lePitEpochMs") @Optional @Nullable Long lePitEpochMs) {
@ActionParam("lePitEpochMs") @Optional @Nullable Long lePitEpochMs,
@ActionParam("readOnly") @Optional @Nullable Boolean readOnly) {
return RestliUtils.toTask(systemOperationContext,
() -> {

Expand All @@ -390,7 +391,7 @@ public Task<String> restoreIndices(
}

return Utils.restoreIndices(systemOperationContext, getContext(),
aspectName, urn, urnLike, start, batchSize, limit, gePitEpochMs, lePitEpochMs, _authorizer, _entityService);
aspectName, urn, urnLike, start, batchSize, limit, gePitEpochMs, lePitEpochMs, _authorizer, _entityService, readOnly != null ? readOnly : false);
},
MetricRegistry.name(this.getClass(), "restoreIndices"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ public Task<String> restoreIndices(
@ActionParam("batchSize") @Optional @Nullable Integer batchSize,
@ActionParam("limit") @Optional @Nullable Integer limit,
@ActionParam("gePitEpochMs") @Optional @Nullable Long gePitEpochMs,
@ActionParam("lePitEpochMs") @Optional @Nullable Long lePitEpochMs) {
@ActionParam("lePitEpochMs") @Optional @Nullable Long lePitEpochMs,
@ActionParam("readOnly") @Optional @Nullable Boolean readOnly) {
return RestliUtils.toTask(systemOperationContext,
() -> Utils.restoreIndices(systemOperationContext, getContext(),
aspectName, urn, urnLike, start, batchSize, limit, gePitEpochMs, lePitEpochMs, _authorizer, _entityService),
aspectName, urn, urnLike, start, batchSize, limit, gePitEpochMs, lePitEpochMs, _authorizer, _entityService, readOnly != null ? readOnly : false),
MetricRegistry.name(this.getClass(), "restoreIndices"));
}

Expand Down
Loading
Loading