Skip to content

Commit 1166998

Browse files
[Pull-based Ingestion] Add support for dynamically updating ingestion error handling strategy with minor fixes (#17565)
* Fix global checkpoint for p2p segrep in ingestion mode Signed-off-by: Varun Bharadwaj <[email protected]> * Support updating ingestion error strategy Signed-off-by: Varun Bharadwaj <[email protected]> * Handle race condition on calling flush before poller is initialized Signed-off-by: Varun Bharadwaj <[email protected]> --------- Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 6c0a95b commit 1166998

File tree

13 files changed

+216
-61
lines changed

13 files changed

+216
-61
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

+12-18
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.indices.pollingingest.PollingIngestStats;
2020
import org.opensearch.plugins.PluginInfo;
2121
import org.opensearch.test.OpenSearchIntegTestCase;
22+
import org.opensearch.transport.client.Requests;
2223
import org.junit.Assert;
2324

2425
import java.util.List;
@@ -56,27 +57,14 @@ public void testPluginsAreInstalled() {
5657
public void testKafkaIngestion() {
5758
produceData("1", "name1", "24");
5859
produceData("2", "name2", "20");
59-
60-
createIndex(
61-
"test",
62-
Settings.builder()
63-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
64-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
65-
.put("ingestion_source.type", "kafka")
66-
.put("ingestion_source.pointer.init.reset", "earliest")
67-
.put("ingestion_source.param.topic", "test")
68-
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
69-
.put("index.replication.type", "SEGMENT")
70-
.build(),
71-
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
72-
);
60+
createIndexWithDefaultSettings(1, 0);
7361

7462
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
7563
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
76-
refresh("test");
77-
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
64+
refresh(indexName);
65+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
7866
assertThat(response.getHits().getTotalHits().value(), is(1L));
79-
PollingIngestStats stats = client().admin().indices().prepareStats("test").get().getIndex("test").getShards()[0]
67+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
8068
.getPollingIngestStats();
8169
assertNotNull(stats);
8270
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2L));
@@ -135,10 +123,16 @@ public void testKafkaIngestion_RewindByOffset() {
135123
);
136124

137125
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
138-
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
126+
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
139127
refresh("test_rewind_by_offset");
140128
SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get();
141129
assertThat(response.getHits().getTotalHits().value(), is(1L));
142130
});
143131
}
132+
133+
public void testCloseIndex() throws Exception {
134+
createIndexWithDefaultSettings(1, 0);
135+
ensureGreen(indexName);
136+
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
137+
}
144138
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

+31
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.apache.kafka.clients.producer.ProducerRecord;
1616
import org.apache.kafka.common.serialization.StringSerializer;
1717
import org.opensearch.action.search.SearchResponse;
18+
import org.opensearch.cluster.metadata.IndexMetadata;
19+
import org.opensearch.common.settings.Settings;
1820
import org.opensearch.plugins.Plugin;
1921
import org.opensearch.test.OpenSearchIntegTestCase;
2022
import org.junit.After;
@@ -25,6 +27,7 @@
2527
import java.util.List;
2628
import java.util.Locale;
2729
import java.util.Properties;
30+
import java.util.concurrent.Callable;
2831
import java.util.concurrent.TimeUnit;
2932

3033
import org.testcontainers.containers.KafkaContainer;
@@ -108,4 +111,32 @@ protected void waitForSearchableDocs(long docCount, List<String> nodes) throws E
108111
}
109112
}, 1, TimeUnit.MINUTES);
110113
}
114+
115+
protected void waitForState(Callable<Boolean> checkState) throws Exception {
116+
assertBusy(() -> {
117+
if (checkState.call() == false) {
118+
fail("Provided state requirements not met");
119+
}
120+
}, 1, TimeUnit.MINUTES);
121+
}
122+
123+
protected String getSettings(String indexName, String setting) {
124+
return client().admin().indices().prepareGetSettings(indexName).get().getSetting(indexName, setting);
125+
}
126+
127+
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
128+
createIndex(
129+
indexName,
130+
Settings.builder()
131+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards)
132+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas)
133+
.put("ingestion_source.type", "kafka")
134+
.put("ingestion_source.pointer.init.reset", "earliest")
135+
.put("ingestion_source.param.topic", topicName)
136+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
137+
.put("index.replication.type", "SEGMENT")
138+
.build(),
139+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
140+
);
141+
}
111142
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+52-14
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.index.query.RangeQueryBuilder;
1717
import org.opensearch.test.InternalTestCluster;
1818
import org.opensearch.test.OpenSearchIntegTestCase;
19+
import org.opensearch.transport.client.Requests;
1920

2021
import java.nio.file.Path;
2122
import java.util.Arrays;
@@ -46,20 +47,7 @@ public void testSegmentReplicationWithRemoteStore() throws Exception {
4647

4748
internalCluster().startClusterManagerOnlyNode();
4849
final String nodeA = internalCluster().startDataOnlyNode();
49-
50-
createIndex(
51-
indexName,
52-
Settings.builder()
53-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
54-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
55-
.put("ingestion_source.type", "kafka")
56-
.put("ingestion_source.pointer.init.reset", "earliest")
57-
.put("ingestion_source.param.topic", topicName)
58-
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
59-
.put("index.replication.type", "SEGMENT")
60-
.build(),
61-
mapping
62-
);
50+
createIndexWithDefaultSettings(1, 1);
6351

6452
ensureYellowAndNoInitializingShards(indexName);
6553
final String nodeB = internalCluster().startDataOnlyNode();
@@ -117,6 +105,56 @@ public void testSegmentReplicationWithRemoteStore() throws Exception {
117105
waitForSearchableDocs(6, Arrays.asList(nodeB, nodeC));
118106
}
119107

108+
public void testCloseIndex() throws Exception {
109+
produceData("1", "name1", "24");
110+
produceData("2", "name2", "20");
111+
internalCluster().startClusterManagerOnlyNode();
112+
final String nodeA = internalCluster().startDataOnlyNode();
113+
final String nodeB = internalCluster().startDataOnlyNode();
114+
115+
createIndexWithDefaultSettings(1, 1);
116+
ensureGreen(indexName);
117+
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));
118+
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
119+
}
120+
121+
public void testErrorStrategy() throws Exception {
122+
produceData("1", "name1", "25");
123+
// malformed message
124+
produceData("2", "", "");
125+
produceData("3", "name3", "25");
126+
127+
internalCluster().startClusterManagerOnlyNode();
128+
final String node = internalCluster().startDataOnlyNode();
129+
130+
createIndex(
131+
indexName,
132+
Settings.builder()
133+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
134+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
135+
.put("ingestion_source.type", "kafka")
136+
.put("ingestion_source.error_strategy", "block")
137+
.put("ingestion_source.pointer.init.reset", "earliest")
138+
.put("ingestion_source.param.topic", topicName)
139+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
140+
.put("index.replication.type", "SEGMENT")
141+
.build(),
142+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
143+
);
144+
145+
ensureGreen(indexName);
146+
waitForState(() -> "block".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
147+
waitForSearchableDocs(1, Arrays.asList(node));
148+
149+
client().admin()
150+
.indices()
151+
.prepareUpdateSettings(indexName)
152+
.setSettings(Settings.builder().put("ingestion_source.error_strategy", "drop"))
153+
.get();
154+
waitForState(() -> "drop".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
155+
waitForSearchableDocs(2, Arrays.asList(node));
156+
}
157+
120158
private void verifyRemoteStoreEnabled(String node) {
121159
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
122160
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -771,13 +771,17 @@ public Iterator<Setting<?>> settings() {
771771
Property.Final
772772
);
773773

774+
/**
775+
* Defines the error strategy for pull-based ingestion.
776+
*/
774777
public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error_strategy";
775778
public static final Setting<IngestionErrorStrategy.ErrorStrategy> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = new Setting<>(
776779
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
777780
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
778781
IngestionErrorStrategy.ErrorStrategy::parseFromString,
779782
(errorStrategy) -> {},
780-
Property.IndexScope
783+
Property.IndexScope,
784+
Property.Dynamic
781785
);
782786

783787
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+26-3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory inges
5757
super(engineConfig);
5858
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
5959
this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get();
60-
60+
registerDynamicIndexSettingsHandlers();
6161
}
6262

6363
/**
@@ -215,8 +215,14 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog
215215
commitData.put(HISTORY_UUID_KEY, historyUUID);
216216
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
217217

218-
// ingestion engine needs to record batch start pointer
219-
commitData.put(StreamPoller.BATCH_START, streamPoller.getBatchStartPointer().asString());
218+
/*
219+
* Ingestion engine needs to record batch start pointer.
220+
* Batch start pointer can be null at index creation time, if flush is called before the stream
221+
* poller has been completely initialized.
222+
*/
223+
if (streamPoller.getBatchStartPointer() != null) {
224+
commitData.put(StreamPoller.BATCH_START, streamPoller.getBatchStartPointer().asString());
225+
}
220226
final String currentForceMergeUUID = forceMergeUUID;
221227
if (currentForceMergeUUID != null) {
222228
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
@@ -304,4 +310,21 @@ protected Map<String, String> commitDataAsMap() {
304310
public PollingIngestStats pollingIngestStats() {
305311
return streamPoller.getStats();
306312
}
313+
314+
private void registerDynamicIndexSettingsHandlers() {
315+
engineConfig.getIndexSettings()
316+
.getScopedSettings()
317+
.addSettingsUpdateConsumer(IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING, this::updateErrorHandlingStrategy);
318+
}
319+
320+
/**
321+
* Handler for updating ingestion error strategy in the stream poller on dynamic index settings update.
322+
*/
323+
private void updateErrorHandlingStrategy(IngestionErrorStrategy.ErrorStrategy errorStrategy) {
324+
IngestionErrorStrategy updatedIngestionErrorStrategy = IngestionErrorStrategy.create(
325+
errorStrategy,
326+
engineConfig.getIndexSettings().getIndexMetadata().getIngestionSource().getType()
327+
);
328+
streamPoller.updateErrorStrategy(updatedIngestionErrorStrategy);
329+
}
307330
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@
238238
import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
239239
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
240240
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
241+
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
241242
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
242243
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_SEEDED;
243244
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_UNSEEDED;
@@ -451,7 +452,7 @@ public IndexShard(
451452
aId,
452453
indexSettings,
453454
primaryTerm,
454-
UNASSIGNED_SEQ_NO,
455+
getInitialGlobalCheckpointForShard(indexSettings),
455456
globalCheckpointListeners::globalCheckpointUpdated,
456457
threadPool::absoluteTimeInMillis,
457458
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener),
@@ -499,6 +500,19 @@ public boolean shouldCache(Query query) {
499500
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
500501
}
501502

503+
/**
504+
* By default, UNASSIGNED_SEQ_NO is used as the initial global checkpoint for new shard initialization. Ingestion
505+
* source does not track sequence numbers explicitly and hence defaults to NO_OPS_PERFORMED for compatibility.
506+
*
507+
*/
508+
private long getInitialGlobalCheckpointForShard(IndexSettings indexSettings) {
509+
if (indexSettings.getIndexMetadata().useIngestionSource()) {
510+
return NO_OPS_PERFORMED;
511+
}
512+
513+
return UNASSIGNED_SEQ_NO;
514+
}
515+
502516
public ThreadPool getThreadPool() {
503517
return this.threadPool;
504518
}

server/src/main/java/org/opensearch/indices/pollingingest/BlockIngestionErrorStrategy.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void handleError(Throwable e, ErrorStage stage) {
3030
}
3131

3232
@Override
33-
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34-
return true;
33+
public boolean shouldIgnoreError(Throwable e, ErrorStage stage) {
34+
return false;
3535
}
3636
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class DefaultStreamPoller implements StreamPoller {
4242
private volatile boolean started;
4343
private volatile boolean closed;
4444
private volatile boolean paused;
45+
private volatile IngestionErrorStrategy errorStrategy;
4546

4647
private IngestionShardConsumer consumer;
4748

@@ -67,8 +68,6 @@ public class DefaultStreamPoller implements StreamPoller {
6768
@Nullable
6869
private IngestionShardPointer maxPersistedPointer;
6970

70-
private IngestionErrorStrategy errorStrategy;
71-
7271
public DefaultStreamPoller(
7372
IngestionShardPointer startPointer,
7473
Set<IngestionShardPointer> persistedPointers,
@@ -231,14 +230,14 @@ protected void startPoll() {
231230
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
232231
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
233232

234-
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
235-
// Blocking error encountered. Pause poller to stop processing remaining updates.
236-
pause();
237-
} else {
233+
if (errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
238234
// Advance the batch start pointer to ignore the error and continue from next record
239235
batchStartPointer = lastSuccessfulPointer == null
240236
? consumer.nextPointer(batchStartPointer)
241237
: consumer.nextPointer(lastSuccessfulPointer);
238+
} else {
239+
// Blocking error encountered. Pause poller to stop processing remaining updates.
240+
pause();
242241
}
243242
}
244243
}
@@ -332,4 +331,15 @@ public PollingIngestStats getStats() {
332331
public State getState() {
333332
return state;
334333
}
334+
335+
@Override
336+
public IngestionErrorStrategy getErrorStrategy() {
337+
return this.errorStrategy;
338+
}
339+
340+
@Override
341+
public void updateErrorStrategy(IngestionErrorStrategy errorStrategy) {
342+
this.errorStrategy = errorStrategy;
343+
processorRunnable.setErrorStrategy(errorStrategy);
344+
}
335345
}

server/src/main/java/org/opensearch/indices/pollingingest/DropIngestionErrorStrategy.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public void handleError(Throwable e, ErrorStage stage) {
3030
}
3131

3232
@Override
33-
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34-
return false;
33+
public boolean shouldIgnoreError(Throwable e, ErrorStage stage) {
34+
return true;
3535
}
3636

3737
}

server/src/main/java/org/opensearch/indices/pollingingest/IngestionErrorStrategy.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ public interface IngestionErrorStrategy {
2525
void handleError(Throwable e, ErrorStage stage);
2626

2727
/**
28-
* Indicates if ingestion must be paused, blocking further writes.
28+
* Indicates if the error should be ignored.
2929
*/
30-
boolean shouldPauseIngestion(Throwable e, ErrorStage stage);
30+
boolean shouldIgnoreError(Throwable e, ErrorStage stage);
3131

3232
static IngestionErrorStrategy create(ErrorStrategy errorStrategy, String ingestionSource) {
3333
switch (errorStrategy) {

0 commit comments

Comments
 (0)