Skip to content

Commit b16d99b

Browse files
committed
Ignore dynamically added streams with mismatching region, rather than propagating exception
1 parent 16e8404 commit b16d99b

File tree

2 files changed

+39
-3
lines changed

2 files changed

+39
-3
lines changed

amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ public class Scheduler implements Runnable {
123123
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
124124
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
125125
private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count";
126+
private static final String IGNORED_STREAMS_COUNT = "IgnoredStreams.Count";
126127

127128
private final SchedulerLog slog = new SchedulerLog();
128129

@@ -507,8 +508,14 @@ Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
507508
log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig);
508509
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(streamConfig);
509510
shardSyncTaskManager.submitShardSyncTask();
510-
currentStreamConfigMap.put(streamIdentifier, streamConfig);
511-
streamsSynced.add(streamIdentifier);
511+
try {
512+
currentStreamConfigMap.put(streamIdentifier, streamConfig);
513+
streamsSynced.add(streamIdentifier);
514+
} catch (Exception e) {
515+
log.error("Failed to add stream {} to application. This stream will not be processed.",
516+
streamConfig.streamIdentifier(), e);
517+
MetricsUtil.addCount(metricsScope, IGNORED_STREAMS_COUNT, 1, MetricsLevel.DETAILED);
518+
}
512519
} else {
513520
log.debug("{} is already being processed - skipping shard sync.", streamIdentifier);
514521
}

amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -1154,7 +1154,7 @@ public void testOrphanStreamConfigIsPopulatedWithArn() {
11541154
}
11551155

11561156
@Test
1157-
public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() {
1157+
public void testMismatchingArnRegionAndKinesisClientRegionOnSchedulerConstructionThrowsException() {
11581158
final Region streamArnRegion = Region.US_WEST_1;
11591159
Assert.assertNotEquals(streamArnRegion, kinesisClient.serviceClientConfiguration().region());
11601160

@@ -1169,6 +1169,35 @@ public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() {
11691169
leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig));
11701170
}
11711171

1172+
@Test
1173+
public void testDynamicallyAddedStreamsWithRegionMismatchingKinesisClientRegionAreIgnored() throws Exception {
1174+
final Region mismatchingStreamRegion = Region.US_WEST_1;
1175+
final Region kinesisClientRegion = kinesisClient.serviceClientConfiguration().region();
1176+
Assert.assertNotEquals(mismatchingStreamRegion, kinesisClientRegion);
1177+
1178+
final StreamIdentifier streamWithMismatchingRegion = StreamIdentifier.multiStreamInstance(
1179+
Arn.fromString(constructStreamArnStr(mismatchingStreamRegion, TEST_ACCOUNT, "stream-1")), TEST_EPOCH);
1180+
1181+
final StreamIdentifier streamWithMatchingRegion = StreamIdentifier.multiStreamInstance(
1182+
Arn.fromString(constructStreamArnStr(kinesisClientRegion, TEST_ACCOUNT, "stream-2")), TEST_EPOCH);
1183+
1184+
when(multiStreamTracker.streamConfigList()).thenReturn(
1185+
Collections.emptyList(), // returned on scheduler construction
1186+
Arrays.asList( // returned on stream sync
1187+
new StreamConfig(streamWithMismatchingRegion, TEST_INITIAL_POSITION),
1188+
new StreamConfig(streamWithMatchingRegion, TEST_INITIAL_POSITION)));
1189+
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
1190+
.retrievalFactory(retrievalFactory);
1191+
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig,
1192+
leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig));
1193+
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
1194+
1195+
final Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
1196+
final Set<StreamIdentifier> currentStreamConfigMapKeys = scheduler.currentStreamConfigMap().keySet();
1197+
assertFalse(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMismatchingRegion));
1198+
assertTrue(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMatchingRegion));
1199+
}
1200+
11721201
private static String constructStreamIdentifierSer(long accountId, String streamName) {
11731202
return String.join(":", String.valueOf(accountId), streamName, String.valueOf(TEST_EPOCH));
11741203
}

0 commit comments

Comments
 (0)