From a348a35d42af578437bd6bb5accf5c233cfc3bcc Mon Sep 17 00:00:00 2001 From: Furqaan Ali Date: Fri, 3 May 2024 01:27:02 -0700 Subject: [PATCH] Ignore dynamically added streams with mismatching region, rather than propagating exception --- .../amazon/kinesis/coordinator/Scheduler.java | 10 +++++- .../kinesis/coordinator/SchedulerTest.java | 31 ++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e4e63078b..5cbaa9c9e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -123,6 +123,7 @@ public class Scheduler implements Runnable { private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count"; private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count"; private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count"; + private static final String IGNORED_STREAMS_COUNT = "IgnoredStreams.Count"; private final SchedulerLog slog = new SchedulerLog(); @@ -504,10 +505,17 @@ Set checkAndSyncStreamShardsAndLeases() for (StreamIdentifier streamIdentifier : newStreamConfigMap.keySet()) { if (!currentStreamConfigMap.containsKey(streamIdentifier)) { final StreamConfig streamConfig = newStreamConfigMap.get(streamIdentifier); + try { + currentStreamConfigMap.put(streamIdentifier, streamConfig); + } catch (IllegalArgumentException e) { + log.error("Failed to add stream {} to application. This stream will not be processed.", + streamConfig.streamIdentifier(), e); + MetricsUtil.addCount(metricsScope, IGNORED_STREAMS_COUNT, 1, MetricsLevel.DETAILED); + continue; + } log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig); ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(streamConfig); shardSyncTaskManager.submitShardSyncTask(); - currentStreamConfigMap.put(streamIdentifier, streamConfig); streamsSynced.add(streamIdentifier); } else { log.debug("{} is already being processed - skipping shard sync.", streamIdentifier); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index f29c23410..1aa344324 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -1154,7 +1154,7 @@ public void testOrphanStreamConfigIsPopulatedWithArn() { } @Test - public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() { + public void testMismatchingArnRegionAndKinesisClientRegionOnSchedulerConstructionThrowsException() { final Region streamArnRegion = Region.US_WEST_1; Assert.assertNotEquals(streamArnRegion, kinesisClient.serviceClientConfiguration().region()); @@ -1169,6 +1169,35 @@ public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() { leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); } + @Test + public void testDynamicallyAddedStreamsWithRegionMismatchingKinesisClientRegionAreIgnored() throws Exception { + final Region mismatchingStreamRegion = Region.US_WEST_1; + final Region kinesisClientRegion = kinesisClient.serviceClientConfiguration().region(); + Assert.assertNotEquals(mismatchingStreamRegion, kinesisClientRegion); + + final StreamIdentifier streamWithMismatchingRegion = StreamIdentifier.multiStreamInstance( + Arn.fromString(constructStreamArnStr(mismatchingStreamRegion, TEST_ACCOUNT, "stream-1")), TEST_EPOCH); + + final StreamIdentifier streamWithMatchingRegion = StreamIdentifier.multiStreamInstance( + Arn.fromString(constructStreamArnStr(kinesisClientRegion, TEST_ACCOUNT, "stream-2")), TEST_EPOCH); + + when(multiStreamTracker.streamConfigList()).thenReturn( + Collections.emptyList(), // returned on scheduler construction + Arrays.asList( // returned on stream sync + new StreamConfig(streamWithMismatchingRegion, TEST_INITIAL_POSITION), + new StreamConfig(streamWithMatchingRegion, TEST_INITIAL_POSITION))); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, + leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + + final Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + final Set currentStreamConfigMapKeys = scheduler.currentStreamConfigMap().keySet(); + assertFalse(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMismatchingRegion)); + assertTrue(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMatchingRegion)); + } + private static String constructStreamIdentifierSer(long accountId, String streamName) { return String.join(":", String.valueOf(accountId), streamName, String.valueOf(TEST_EPOCH)); }