From 3b5e85895fce58a26d138a24532de8cd2a1c49db Mon Sep 17 00:00:00 2001 From: Furqaan Ali Date: Wed, 3 Apr 2024 03:38:34 -0700 Subject: [PATCH] Embed StreamConfig within ShardInfo --- .../kinesis/common/StreamIdentifier.java | 13 +- .../amazon/kinesis/coordinator/Scheduler.java | 65 +++--- .../kinesis/leases/LeaseCleanupManager.java | 9 +- .../kinesis/leases/LeaseCoordinator.java | 8 +- .../amazon/kinesis/leases/ShardInfo.java | 45 ++-- .../dynamodb/DynamoDBLeaseCoordinator.java | 32 --- .../exceptions/LeasePendingDeletion.java | 2 - .../kinesis/lifecycle/ConsumerStates.java | 4 +- .../kinesis/lifecycle/InitializeTask.java | 4 +- .../amazon/kinesis/lifecycle/ProcessTask.java | 7 +- .../kinesis/lifecycle/ShardConsumer.java | 2 +- .../lifecycle/ShardConsumerArgument.java | 6 - .../kinesis/lifecycle/ShutdownTask.java | 10 +- .../kinesis/retrieval/RetrievalFactory.java | 22 +- .../fanout/FanOutRetrievalFactory.java | 30 +-- .../SynchronousBlockingRetrievalFactory.java | 45 +--- ...ynchronousPrefetchingRetrievalFactory.java | 93 -------- ...dShardRecordProcessorCheckpointerTest.java | 12 +- .../kinesis/coordinator/SchedulerTest.java | 205 ++++++++++++++---- .../leases/LeaseCleanupManagerTest.java | 16 +- ...rentsFirstShardPrioritizationUnitTest.java | 11 +- .../amazon/kinesis/leases/ShardInfoTest.java | 60 +++-- .../lifecycle/BlockOnParentShardTaskTest.java | 56 +++-- .../kinesis/lifecycle/ConsumerStatesTest.java | 9 +- .../kinesis/lifecycle/ProcessTaskTest.java | 11 +- .../ShardConsumerSubscriberTest.java | 12 +- .../kinesis/lifecycle/ShardConsumerTest.java | 12 +- .../kinesis/lifecycle/ShutdownTaskTest.java | 15 +- .../retrieval/fanout/FanOutConfigTest.java | 30 ++- 29 files changed, 461 insertions(+), 385 deletions(-) delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java index 82cef04ba..889bc67af 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java @@ -67,8 +67,7 @@ public class StreamIdentifier { * or {@link #streamName} in single-stream mode. */ public String serialize() { - if (!streamCreationEpochOptional.isPresent()) { - // creation epoch is expected to be empty in single-stream mode + if (!isMultiStreamInstance()) { return streamName; } @@ -85,6 +84,16 @@ public String toString() { return serialize(); } + /** + * Determine whether this {@link StreamIdentifier} is a multi-stream instance. + * + * @return true if this is a multi-stream instance, false otherwise. + */ + public boolean isMultiStreamInstance() { + // creation epoch is expected to be present if and only if in multi-stream mode + return streamCreationEpochOptional.isPresent(); + } + /** * Create a multi stream instance for StreamIdentifier from serialized stream identifier * of format {@link #STREAM_IDENTIFIER_PATTERN} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index bb389ce94..c5d65f1ba 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -68,7 +68,6 @@ import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardPrioritization; import software.amazon.kinesis.leases.ShardSyncTaskManager; -import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; import software.amazon.kinesis.leases.exceptions.DependencyException; @@ -812,7 +811,7 @@ Callable createWorkerShutdownCallable() { for (Lease lease : leases) { ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, notificationCompleteLatch, shutdownCompleteLatch); - ShardInfo shardInfo = DynamoDBLeaseCoordinator.convertLeaseToAssignment(lease); + final ShardInfo shardInfo = constructShardInfoFromLease(lease); ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); if (consumer != null) { consumer.gracefulShutdown(shutdownNotification); @@ -895,7 +894,9 @@ private void finalShutdown() { } private List getShardInfoForAssignments() { - List assignedStreamShards = leaseCoordinator.getCurrentAssignments(); + final List assignedStreamShards = leaseCoordinator.getAssignments().stream() + .map(this::constructShardInfoFromLease) + .collect(Collectors.toList()); List prioritizedShards = shardPrioritization.prioritize(assignedStreamShards); if ((prioritizedShards != null) && (!prioritizedShards.isEmpty())) { @@ -952,26 +953,20 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, @NonNull final LeaseCleanupManager leaseCleanupManager) { ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, checkpoint); - // The only case where streamName is not available will be when multistreamtracker not set. In this case, - // get the default stream name for the single stream application. - final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt()); - - // Irrespective of single stream app or multi stream app, streamConfig should always be available. - // If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config - // to gracefully complete the reading. - StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); - if (streamConfig == null) { - streamConfig = streamTracker.createStreamConfig(streamIdentifier); + final StreamConfig streamConfig = shardInfo.streamConfig(); + if (!currentStreamConfigMap.containsKey(streamConfig.streamIdentifier())) { log.info("Created orphan {}", streamConfig); } - Validate.notNull(streamConfig, "StreamConfig should not be null"); + /* + * NOTE: RecordsPublisher#createGetRecordsCache(ShardInfo, StreamConfig, MetricsFactory) is deprecated. + * RecordsPublisher#createGetRecordsCache(ShardInfo, MetricsFactory) will be called directly in the future. + */ RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, - streamConfig.streamIdentifier(), leaseCoordinator, executorService, cache, - shardRecordProcessorFactory.shardRecordProcessor(streamIdentifier), + shardRecordProcessorFactory.shardRecordProcessor(streamConfig.streamIdentifier()), checkpoint, checkpointer, parentShardPollIntervalMillis, @@ -981,7 +976,6 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, maxListShardsRetryAttempts, processorConfig.callProcessRecordsEvenForEmptyRecordList(), shardConsumerDispatchPollIntervalMillis, - streamConfig.initialPositionInStreamExtended(), cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, shardDetectorProvider.apply(streamConfig), @@ -1039,18 +1033,6 @@ private void logExecutorState() { executorStateEvent.accept(diagnosticEventHandler); } - private StreamIdentifier getStreamIdentifier(Optional streamIdentifierString) { - final StreamIdentifier streamIdentifier; - if (streamIdentifierString.isPresent()) { - streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get()); - } else { - Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); - streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier(); - } - Validate.notNull(streamIdentifier, "Stream identifier should not be empty"); - return streamIdentifier; - } - /** * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on @@ -1090,6 +1072,31 @@ private void resetInfoLogging() { } } + private ShardInfo constructShardInfoFromLease(final Lease lease) { + final boolean isMultiStreamLease = lease instanceof MultiStreamLease; + + final Optional streamIdentifierSerialization = isMultiStreamLease ? + Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty(); + final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization); + + final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey(); + return new ShardInfo( + shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig); + } + + private StreamConfig getOrCreateStreamConfig(final Optional streamIdentifierSerialization) { + if (!streamIdentifierSerialization.isPresent()) { + Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); + final StreamConfig streamConfig = currentStreamConfigMap.values().iterator().next(); + Validate.notNull(streamConfig, "StreamConfig should not be null"); + return streamConfig; + } + + final StreamIdentifier streamIdentifier = + StreamIdentifier.multiStreamInstance(streamIdentifierSerialization.get()); + return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier)); + } + @Deprecated public Future requestShutdown() { return null; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 861626b63..44b2e91e3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -117,8 +117,8 @@ public void shutdown() { public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) { final Lease lease = leasePendingDeletion.lease(); if (lease == null) { - log.warn("Cannot enqueue {} for {} as instance doesn't hold the lease for that shard.", - leasePendingDeletion.shardInfo(), leasePendingDeletion.streamIdentifier()); + log.warn("Cannot enqueue {} as instance doesn't hold the lease for that shard.", + leasePendingDeletion.shardInfo()); } else { log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey()); if (!deletionQueue.add(leasePendingDeletion)) { @@ -166,7 +166,7 @@ public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException { final Lease lease = leasePendingDeletion.lease(); final ShardInfo shardInfo = leasePendingDeletion.shardInfo(); - final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier(); + final StreamIdentifier streamIdentifier = shardInfo.streamConfig().streamIdentifier(); final AWSExceptionManager exceptionManager = createExceptionManager(); @@ -328,7 +328,8 @@ void cleanupLeases() { while (!deletionQueue.isEmpty()) { final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll(); final String leaseKey = leasePendingDeletion.lease().leaseKey(); - final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier(); + final StreamIdentifier streamIdentifier = + leasePendingDeletion.shardInfo().streamConfig().streamIdentifier(); boolean deletionSucceeded = false; try { final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java index 6437f3390..73f83018b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java @@ -125,8 +125,14 @@ boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String /** * @return Current shard/lease assignments + * @deprecated This method is deprecated and will be removed in future versions. + * {@link LeaseCoordinator} implementations should not be required to construct and return + * {@link ShardInfo} objects. {@link #getAssignments()} can be used to return the currently held leases. */ - List getCurrentAssignments(); + @Deprecated + default List getCurrentAssignments() { + throw new UnsupportedOperationException("This method is deprecated and should not be used."); + } /** * Default implementation returns an empty list and concrete implementation is expected to return all leases diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java index aff3f6f01..5e048fecc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java @@ -18,8 +18,8 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.Optional; +import lombok.AccessLevel; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -27,6 +27,7 @@ import lombok.NonNull; import lombok.ToString; import lombok.experimental.Accessors; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; /** @@ -34,15 +35,20 @@ */ @Getter @Accessors(fluent = true) -@ToString +@ToString(exclude = {"isMultiStreamMode", "streamIdentifierStr"}) public class ShardInfo { - private final Optional streamIdentifierSerOpt; private final String shardId; private final String concurrencyToken; // Sorted list of parent shardIds. private final List parentShardIds; private final ExtendedSequenceNumber checkpoint; + private final StreamConfig streamConfig; + + @Getter(AccessLevel.NONE) + private final boolean isMultiStreamMode; + @Getter(AccessLevel.NONE) + private final String streamIdentifierStr; /** * Creates a new ShardInfo object. The checkpoint is not part of the equality, but is used for debugging output. @@ -55,28 +61,14 @@ public class ShardInfo { * Parent shards of the shard identified by Kinesis shardId * @param checkpoint * the latest checkpoint from lease - */ - public ShardInfo(@NonNull final String shardId, - final String concurrencyToken, - final Collection parentShardIds, - final ExtendedSequenceNumber checkpoint) { - this(shardId, concurrencyToken, parentShardIds, checkpoint, null); - } - - /** - * Creates a new ShardInfo object that has an option to pass a serialized streamIdentifier. - * The checkpoint is not part of the equality, but is used for debugging output. - * @param shardId - * @param concurrencyToken - * @param parentShardIds - * @param checkpoint - * @param streamIdentifierSer + * @param streamConfig + * The {@link StreamConfig} instance for the stream that the shard belongs to */ public ShardInfo(@NonNull final String shardId, final String concurrencyToken, final Collection parentShardIds, final ExtendedSequenceNumber checkpoint, - final String streamIdentifierSer) { + @NonNull final StreamConfig streamConfig) { this.shardId = shardId; this.concurrencyToken = concurrencyToken; this.parentShardIds = new LinkedList<>(); @@ -87,7 +79,9 @@ public ShardInfo(@NonNull final String shardId, // This makes it easy to check for equality in ShardInfo.equals method. Collections.sort(this.parentShardIds); this.checkpoint = checkpoint; - this.streamIdentifierSerOpt = Optional.ofNullable(streamIdentifierSer); + this.streamConfig = streamConfig; + this.isMultiStreamMode = streamConfig.streamIdentifier().isMultiStreamInstance(); + this.streamIdentifierStr = streamConfig.streamIdentifier().serialize(); } /** @@ -114,7 +108,8 @@ public boolean isCompleted() { @Override public int hashCode() { return new HashCodeBuilder() - .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierSerOpt.orElse("")).toHashCode(); + .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierStr) + .toHashCode(); } /** @@ -139,7 +134,7 @@ public boolean equals(Object obj) { ShardInfo other = (ShardInfo) obj; return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) - .append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals(); + .append(streamIdentifierStr, other.streamIdentifierStr).isEquals(); } @@ -159,8 +154,8 @@ public static String getLeaseKey(ShardInfo shardInfo) { * @return lease key */ public static String getLeaseKey(ShardInfo shardInfo, String shardIdOverride) { - return shardInfo.streamIdentifierSerOpt().isPresent() ? - MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardIdOverride) : + return shardInfo.isMultiStreamMode ? + MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierStr, shardIdOverride) : shardIdOverride; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index da6d8e075..825b6243d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -17,7 +17,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -29,7 +28,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; @@ -37,8 +35,6 @@ import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRenewer; import software.amazon.kinesis.leases.LeaseTaker; -import software.amazon.kinesis.leases.MultiStreamLease; -import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; @@ -366,34 +362,6 @@ private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSiz new LinkedTransferQueue<>(), LEASE_RENEWAL_THREAD_FACTORY); } - @Override - public List getCurrentAssignments() { - Collection leases = getAssignments(); - return convertLeasesToAssignments(leases); - } - - private static List convertLeasesToAssignments(final Collection leases) { - if (leases == null) { - return Collections.emptyList(); - } - return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList()); - } - - /** - * Utility method to convert the basic lease or multistream lease to ShardInfo - * @param lease - * @return ShardInfo - */ - public static ShardInfo convertLeaseToAssignment(final Lease lease) { - if (lease instanceof MultiStreamLease) { - return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(), - lease.checkpoint(), ((MultiStreamLease) lease).streamIdentifier()); - } else { - return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(), - lease.checkpoint()); - } - } - /** * {@inheritDoc} * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java index ab81d7cea..e1ebfae96 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java @@ -18,7 +18,6 @@ import lombok.EqualsAndHashCode; import lombok.Value; import lombok.experimental.Accessors; -import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -36,7 +35,6 @@ @Value public class LeasePendingDeletion { - StreamIdentifier streamIdentifier; Lease lease; ShardInfo shardInfo; ShardDetector shardDetector; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index 058b3009b..d818e9e40 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -188,7 +188,7 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con return new InitializeTask(argument.shardInfo(), argument.shardRecordProcessor(), argument.checkpoint(), - argument.recordProcessorCheckpointer(), argument.initialPositionInStream(), + argument.recordProcessorCheckpointer(), argument.recordsPublisher(), argument.taskBackoffTimeMillis(), argument.metricsFactory()); @@ -479,7 +479,6 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con argument.recordProcessorCheckpointer(), consumer.shutdownReason(), consumer.shutdownNotification(), - argument.initialPositionInStream(), argument.cleanupLeasesOfCompletedShards(), argument.ignoreUnexpectedChildShards(), argument.leaseCoordinator(), @@ -488,7 +487,6 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con argument.hierarchicalShardSyncer(), argument.metricsFactory(), input == null ? null : input.childShards(), - argument.streamIdentifier(), argument.leaseCleanupManager()); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java index 7816c1e14..d4a2e95b6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java @@ -51,8 +51,6 @@ public class InitializeTask implements ConsumerTask { @NonNull private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer; @NonNull - private final InitialPositionInStreamExtended initialPositionInStream; - @NonNull private final RecordsPublisher cache; // Back off for this interval if we encounter a problem (exception) @@ -78,6 +76,8 @@ public TaskResult call() { final String leaseKey = ShardInfo.getLeaseKey(shardInfo); Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(leaseKey); ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint(); + final InitialPositionInStreamExtended initialPositionInStream = + shardInfo.streamConfig().initialPositionInStreamExtended(); log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", leaseKey, initialCheckpoint, initialPositionInStream); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index fb398cdab..fcf1fd5e9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -23,7 +23,6 @@ import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; -import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; @@ -120,8 +119,7 @@ public TaskResult call() { */ final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION); final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); - shardInfo.streamIdentifierSerOpt() - .ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId))); + MetricsUtil.addStreamId(shardScope, shardInfo.streamConfig().streamIdentifier()); MetricsUtil.addShardId(shardScope, shardInfo.shardId()); long startTimeMillis = System.currentTimeMillis(); boolean success = false; @@ -218,8 +216,7 @@ private void callProcessRecords(ProcessRecordsInput input, List MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId))); + MetricsUtil.addStreamId(scope, shardInfo.streamConfig().streamIdentifier()); MetricsUtil.addShardId(scope, shardInfo.shardId()); final long startTime = System.currentTimeMillis(); try { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 4162ea812..6eb6d76ad 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -133,7 +133,7 @@ public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executor this.recordsPublisher = recordsPublisher; this.executorService = executorService; this.shardInfo = shardInfo; - this.streamIdentifier = shardInfo.streamIdentifierSerOpt().orElse("single_stream_mode"); + this.streamIdentifier = shardInfo.streamConfig().streamIdentifier().serialize(); this.shardConsumerArgument = shardConsumerArgument; this.logWarningForTaskAfterMillis = logWarningForTaskAfterMillis; this.taskExecutionListener = taskExecutionListener; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index 0518b830d..ec322d3a7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -20,8 +20,6 @@ import lombok.experimental.Accessors; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.ShardDetector; @@ -43,8 +41,6 @@ public class ShardConsumerArgument { @NonNull private final ShardInfo shardInfo; @NonNull - private final StreamIdentifier streamIdentifier; - @NonNull private final LeaseCoordinator leaseCoordinator; @NonNull private final ExecutorService executorService; @@ -63,8 +59,6 @@ public class ShardConsumerArgument { private final int maxListShardsRetryAttempts; private final boolean shouldCallProcessRecordsEvenForEmptyRecordList; private final long idleTimeInMilliseconds; - @NonNull - private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; private final boolean ignoreUnexpectedChildShards; @NonNull diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index d8c9d3791..ca1ed16d4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -27,8 +27,6 @@ import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; -import software.amazon.kinesis.common.InitialPositionInStreamExtended; -import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; @@ -88,8 +86,6 @@ public class ShutdownTask implements ConsumerTask { @NonNull private final ShutdownReason reason; private final ShutdownNotification shutdownNotification; - @NonNull - private final InitialPositionInStreamExtended initialPositionInStream; private final boolean cleanupLeasesOfCompletedShards; private final boolean ignoreUnexpectedChildShards; @NonNull @@ -106,8 +102,6 @@ public class ShutdownTask implements ConsumerTask { private final List childShards; @NonNull - private final StreamIdentifier streamIdentifier; - @NonNull private final LeaseCleanupManager leaseCleanupManager; /* @@ -198,8 +192,8 @@ private void takeShardEndAction(Lease currentShardLease, createLeasesForChildShardsIfNotExist(scope); updateLeaseWithChildShards(currentShardLease); } - final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, - shardInfo, shardDetector); + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion( + currentShardLease, shardInfo, shardDetector); if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { boolean isSuccess = false; try { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java index 5703e1af7..8edd6e4ae 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java @@ -23,11 +23,31 @@ * */ public interface RetrievalFactory { - GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory); + /** + * @deprecated This method was only used by specific implementations of {@link RetrievalFactory} and should not be + * required to be implemented; will be removed in future versions. + */ @Deprecated + default GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory) { + throw new UnsupportedOperationException("This method is deprecated and should not be used."); + } + + /** + * Creates a {@link RecordsPublisher} instance to retrieve records for the specified shard. + * + * @param shardInfo The {@link ShardInfo} representing the shard for which records are to be retrieved. + * @param metricsFactory The {@link MetricsFactory} for recording metrics. + * @return A {@link RecordsPublisher} instance for retrieving records from the shard. + */ RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory); + /** + * @deprecated {@link ShardInfo} now includes a reference to {@link StreamConfig}. + * Please use {@link #createGetRecordsCache(ShardInfo, MetricsFactory)} instead. + * This method will be removed in future versions. + */ + @Deprecated default RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, StreamConfig streamConfig, MetricsFactory metricsFactory) { return createGetRecordsCache(shardInfo, metricsFactory); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java index bcfb1081d..039d15335 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java @@ -19,17 +19,14 @@ import lombok.RequiredArgsConstructor; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalFactory; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.function.Function; @RequiredArgsConstructor @@ -41,36 +38,25 @@ public class FanOutRetrievalFactory implements RetrievalFactory { private final String defaultConsumerArn; private final Function consumerArnCreator; - private Map implicitConsumerArnTracker = new HashMap<>(); - - @Override - public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo, - final MetricsFactory metricsFactory) { - return null; - } + private final Map implicitConsumerArnTracker = new HashMap<>(); + /** + + {@inheritDoc} + */ @Override public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, - final StreamConfig streamConfig, final MetricsFactory metricsFactory) { - final Optional streamIdentifierStr = shardInfo.streamIdentifierSerOpt(); - if (streamIdentifierStr.isPresent()) { - final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get()); + final StreamIdentifier streamIdentifier = shardInfo.streamConfig().streamIdentifier(); + if (streamIdentifier.isMultiStreamInstance()) { return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), - getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()), - streamIdentifierStr.get()); + getOrCreateConsumerArn(streamIdentifier, shardInfo.streamConfig().consumerArn()), + streamIdentifier.serialize()); } else { - final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName); return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(), getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn)); } } - @Override - public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) { - throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info"); - } - private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) { return consumerArn != null ? consumerArn : implicitConsumerArnTracker .computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName())); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java index 071763fc4..21cf9ac71 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java @@ -21,11 +21,9 @@ import lombok.NonNull; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.DataFetcherProviderConfig; -import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; import software.amazon.kinesis.retrieval.RecordsFetcherFactory; import software.amazon.kinesis.retrieval.RecordsPublisher; @@ -50,20 +48,6 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory { private final Function dataFetcherProvider; - @Deprecated - public SynchronousBlockingRetrievalFactory(String streamName, - KinesisAsyncClient kinesisClient, - RecordsFetcherFactory recordsFetcherFactory, - int maxRecords, - Duration kinesisRequestTimeout) { - this(streamName, - kinesisClient, - recordsFetcherFactory, - maxRecords, - kinesisRequestTimeout, - defaultDataFetcherProvider(kinesisClient)); - } - public SynchronousBlockingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, RecordsFetcherFactory recordsFetcherFactory, @@ -79,28 +63,19 @@ public SynchronousBlockingRetrievalFactory(String streamName, defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider; } - @Deprecated - public SynchronousBlockingRetrievalFactory(String streamName, - KinesisAsyncClient kinesisClient, - RecordsFetcherFactory recordsFetcherFactory, - int maxRecords) { - this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT); - } - private static Function defaultDataFetcherProvider( KinesisAsyncClient kinesisClient) { return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig); } + /** + * {@inheritDoc} + */ @Override - public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, - @NonNull final MetricsFactory metricsFactory) { - final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? - StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : - StreamIdentifier.singleStreamInstance(streamName); - + public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, + @NonNull final MetricsFactory metricsFactory) { final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig( - streamIdentifier, + shardInfo.streamConfig().streamIdentifier(), shardInfo.shardId(), metricsFactory, maxRecords, @@ -108,13 +83,7 @@ public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull fi final DataFetcher dataFetcher = this.dataFetcherProvider.apply(kinesisDataFetcherProviderConfig); - return new SynchronousGetRecordsRetrievalStrategy(dataFetcher); - } - - @Override - public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, - @NonNull final MetricsFactory metricsFactory) { - return recordsFetcherFactory.createRecordsFetcher(createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), + return recordsFetcherFactory.createRecordsFetcher(new SynchronousGetRecordsRetrievalStrategy(dataFetcher), shardInfo.shardId(), metricsFactory, maxRecords); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java deleted file mode 100644 index efa11e701..000000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2019 Amazon.com, Inc. or its affiliates. - * Licensed under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package software.amazon.kinesis.retrieval.polling; - -import java.time.Duration; -import java.util.concurrent.ExecutorService; -import lombok.NonNull; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.common.StreamIdentifier; -import software.amazon.kinesis.leases.ShardInfo; -import software.amazon.kinesis.metrics.MetricsFactory; -import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy; -import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig; -import software.amazon.kinesis.retrieval.RecordsFetcherFactory; -import software.amazon.kinesis.retrieval.RecordsPublisher; -import software.amazon.kinesis.retrieval.RetrievalFactory; - -/** - * - */ -@KinesisClientInternalApi -public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory { - @NonNull - private final String streamName; - @NonNull - private final KinesisAsyncClient kinesisClient; - @NonNull - private final RecordsFetcherFactory recordsFetcherFactory; - private final int maxRecords; - @NonNull - private final ExecutorService executorService; - private final long idleMillisBetweenCalls; - private final Duration maxFutureWait; - - @Deprecated - public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, - RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService, - long idleMillisBetweenCalls) { - this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, executorService, idleMillisBetweenCalls, - PollingConfig.DEFAULT_REQUEST_TIMEOUT); - } - - public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient, - RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService, - long idleMillisBetweenCalls, Duration maxFutureWait) { - this.streamName = streamName; - this.kinesisClient = kinesisClient; - this.recordsFetcherFactory = recordsFetcherFactory; - this.maxRecords = maxRecords; - this.executorService = executorService; - this.idleMillisBetweenCalls = idleMillisBetweenCalls; - this.maxFutureWait = maxFutureWait; - } - - @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo, - @NonNull final MetricsFactory metricsFactory) { - final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ? - StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) : - StreamIdentifier.singleStreamInstance(streamName); - - return new SynchronousGetRecordsRetrievalStrategy( - new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig( - streamIdentifier, - shardInfo.shardId(), - metricsFactory, - maxRecords, - maxFutureWait - ))); - } - - @Override - public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo, - @NonNull final MetricsFactory metricsFactory) { - return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(), - recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords, - createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls, - metricsFactory, "Prefetching", shardInfo.shardId()); - } -} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java index 98ce1dc58..74ee4f687 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java @@ -30,6 +30,10 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.PreparedCheckpointer; @@ -40,6 +44,11 @@ */ @RunWith(MockitoJUnitRunner.class) public class ShardShardRecordProcessorCheckpointerTest { + private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final StreamConfig TEST_STREAM_CONFIG = + new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED); private String startingSequenceNumber = "13"; private ExtendedSequenceNumber startingExtendedSequenceNumber = new ExtendedSequenceNumber(startingSequenceNumber); private String testConcurrencyToken = "testToken"; @@ -57,7 +66,8 @@ public void setup() throws Exception { checkpoint.setCheckpoint(shardId, startingExtendedSequenceNumber, testConcurrencyToken); assertThat(this.startingExtendedSequenceNumber, equalTo(checkpoint.getCheckpoint(shardId))); - shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + shardInfo = new ShardInfo( + shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 9671bb78e..20faa671a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -17,6 +17,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; @@ -39,6 +40,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -47,11 +49,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.RejectedExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; +import com.amazonaws.util.CollectionUtils; import com.google.common.base.Joiner; import com.google.common.collect.Sets; import io.reactivex.rxjava3.plugins.RxJavaPlugins; @@ -61,11 +66,13 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.OngoingStubbing; +import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -78,6 +85,7 @@ import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException; +import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.LeaseCoordinator; @@ -111,6 +119,7 @@ import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessor; +import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.RetrievalFactory; @@ -129,6 +138,10 @@ public class SchedulerTest { private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final String TEST_SHARD_ID = "shardId-000000000001"; + private static final UUID TEST_CONCURRENCY_TOKEN = UUID.randomUUID(); private Scheduler scheduler; private ShardRecordProcessorFactory shardRecordProcessorFactory; @@ -139,6 +152,7 @@ public class SchedulerTest { private MetricsConfig metricsConfig; private ProcessorConfig processorConfig; private RetrievalConfig retrievalConfig; + private StreamConfig streamConfig; @Mock private KinesisAsyncClient kinesisClient; @@ -196,6 +210,7 @@ public void setup() { scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); + streamConfig = scheduler.currentStreamConfigMap().values().iterator().next(); } /** @@ -212,9 +227,9 @@ public void testGetStageName() { @Test public final void testCreateOrGetShardConsumer() { - final String shardId = "shardId-000000000000"; final String concurrencyToken = "concurrencyToken"; - final ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + final ShardInfo shardInfo = + new ShardInfo(TEST_SHARD_ID, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig); final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager); assertNotNull(shardConsumer1); final ShardConsumer shardConsumer2 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager); @@ -223,8 +238,8 @@ public final void testCreateOrGetShardConsumer() { assertSame(shardConsumer1, shardConsumer2); final String anotherConcurrencyToken = "anotherConcurrencyToken"; - final ShardInfo shardInfo2 = new ShardInfo(shardId, anotherConcurrencyToken, null, - ExtendedSequenceNumber.TRIM_HORIZON); + final ShardInfo shardInfo2 = new ShardInfo( + TEST_SHARD_ID, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig); final ShardConsumer shardConsumer3 = scheduler.createOrGetShardConsumer(shardInfo2, shardRecordProcessorFactory, leaseCleanupManager); assertNotNull(shardConsumer3); @@ -234,33 +249,29 @@ public final void testCreateOrGetShardConsumer() { // TODO: figure out the behavior of the test. @Test public void testWorkerLoopWithCheckpoint() throws Exception { - final String shardId = "shardId-000000000000"; - final String concurrencyToken = "concurrencyToken"; final ExtendedSequenceNumber firstSequenceNumber = ExtendedSequenceNumber.TRIM_HORIZON; final ExtendedSequenceNumber secondSequenceNumber = new ExtendedSequenceNumber("1000"); final ExtendedSequenceNumber finalSequenceNumber = new ExtendedSequenceNumber("2000"); - - final List initialShardInfo = Collections.singletonList( - new ShardInfo(shardId, concurrencyToken, null, firstSequenceNumber)); - final List firstShardInfo = Collections.singletonList( - new ShardInfo(shardId, concurrencyToken, null, secondSequenceNumber)); - final List secondShardInfo = Collections.singletonList( - new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber)); - final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null); + when(checkpoint.getCheckpointObject(eq(TEST_SHARD_ID))).thenReturn(firstCheckpoint); - when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); - when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint); + when(leaseCoordinator.getAssignments()).thenReturn( + Stream.of(firstSequenceNumber, secondSequenceNumber, finalSequenceNumber) + .map(SchedulerTest::constructLease) + .collect(Collectors.toList())); Scheduler schedulerSpy = spy(scheduler); schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop(); - verify(schedulerSpy).buildConsumer(same(initialShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)); - verify(schedulerSpy, never()).buildConsumer(same(firstShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)); - verify(schedulerSpy, never()).buildConsumer(same(secondShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)); - verify(checkpoint).getCheckpointObject(eq(shardId)); + final ShardInfo expectedShardInfo = + new ShardInfo(TEST_SHARD_ID, TEST_CONCURRENCY_TOKEN.toString(), null, null, streamConfig); + verify(schedulerSpy).buildConsumer( + eq(expectedShardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)); + // consumer is only built a total of once for all the three returned assignments + verify(schedulerSpy, times(1)).buildConsumer(any(), any(), any()); + verify(checkpoint).getCheckpointObject(eq(TEST_SHARD_ID)); } @Test @@ -270,10 +281,12 @@ public final void testCleanupShardConsumers() { final String concurrencyToken = "concurrencyToken"; final String anotherConcurrencyToken = "anotherConcurrencyToken"; - final ShardInfo shardInfo0 = new ShardInfo(shard0, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - final ShardInfo shardInfo0WithAnotherConcurrencyToken = new ShardInfo(shard0, anotherConcurrencyToken, null, - ExtendedSequenceNumber.TRIM_HORIZON); - final ShardInfo shardInfo1 = new ShardInfo(shard1, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + final ShardInfo shardInfo0 = + new ShardInfo(shard0, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig); + final ShardInfo shardInfo0WithAnotherConcurrencyToken = + new ShardInfo(shard0, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig); + final ShardInfo shardInfo1 = + new ShardInfo(shard1, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig); final ShardConsumer shardConsumer0 = scheduler.createOrGetShardConsumer(shardInfo0, shardRecordProcessorFactory, leaseCleanupManager); final ShardConsumer shardConsumer0WithAnotherConcurrencyToken = @@ -362,25 +375,23 @@ public final void testMultiStreamInitializationWithFailures() { @Test public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() throws KinesisClientLibException { - final String shardId = "shardId-000000000000"; - final String concurrencyToken = "concurrencyToken"; final ExtendedSequenceNumber firstSequenceNumber = ExtendedSequenceNumber.TRIM_HORIZON; final ExtendedSequenceNumber secondSequenceNumber = new ExtendedSequenceNumber("1000"); final ExtendedSequenceNumber finalSequenceNumber = new ExtendedSequenceNumber("2000"); - final List initialShardInfo = multiStreamTracker.streamConfigList().stream() - .map(sc -> new ShardInfo(shardId, concurrencyToken, null, firstSequenceNumber, - sc.streamIdentifier().serialize())).collect(Collectors.toList()); - final List firstShardInfo = multiStreamTracker.streamConfigList().stream() - .map(sc -> new ShardInfo(shardId, concurrencyToken, null, secondSequenceNumber, - sc.streamIdentifier().serialize())).collect(Collectors.toList()); - final List secondShardInfo = multiStreamTracker.streamConfigList().stream() - .map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber, - sc.streamIdentifier().serialize())).collect(Collectors.toList()); + final List initialLeases = multiStreamTracker.streamConfigList().stream() + .map(sc -> constructMultiStreamLease(sc.streamIdentifier().serialize(), firstSequenceNumber)) + .collect(Collectors.toList()); + final List firstLeases = multiStreamTracker.streamConfigList().stream() + .map(sc -> constructMultiStreamLease(sc.streamIdentifier().serialize(), secondSequenceNumber)) + .collect(Collectors.toList()); + final List secondLeases = multiStreamTracker.streamConfigList().stream() + .map(sc -> constructMultiStreamLease(sc.streamIdentifier().serialize(), finalSequenceNumber)) + .collect(Collectors.toList()); final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null); - when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo); + when(leaseCoordinator.getAssignments()).thenReturn(initialLeases, firstLeases, secondLeases); when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint); retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); @@ -391,12 +402,14 @@ public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() th schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop(); - initialShardInfo.forEach( - shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager))); - firstShardInfo.forEach( - shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager))); - secondShardInfo.forEach( - shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager))); + final List expectedShardInfos = multiStreamTracker.streamConfigList().stream() + .map(sc -> new ShardInfo(TEST_SHARD_ID, TEST_CONCURRENCY_TOKEN.toString(), null, null, sc)) + .collect(Collectors.toList()); + + expectedShardInfos.forEach(shardInfo -> verify(schedulerSpy).buildConsumer( + eq(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager))); + // consumer is only built once for each of the unique shards assigned to it + verify(schedulerSpy, times(multiStreamTracker.streamConfigList().size())).buildConsumer(any(), any(), any()); } @Test @@ -1080,6 +1093,114 @@ private void mockListLeases(List configs) throws ProvisionedThroug .shardId("some_random_shard_id")).collect(Collectors.toList())); } + @Test + public void testShardInfoConstructionFromSingleStreamLease() { + final List parentShardIds = Arrays.asList("shardId-000000000000", "shardId-000000000001"); + final ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("1234"); + + final Lease lease = constructLease(parentShardIds, checkpoint); + when(leaseCoordinator.getAssignments()).thenReturn(Collections.singletonList(lease)); + + final Scheduler schedulerSpy = spy(scheduler); + schedulerSpy.runProcessLoop(); + + final ArgumentCaptor shardInfoArgumentCaptor = ArgumentCaptor.forClass(ShardInfo.class); + verify(schedulerSpy).buildConsumer(shardInfoArgumentCaptor.capture(), any(), any()); + + final ShardInfo expectedShardInfo = new ShardInfo( + TEST_SHARD_ID, TEST_CONCURRENCY_TOKEN.toString(), parentShardIds, checkpoint, streamConfig); + final ShardInfo actualShardInfo = shardInfoArgumentCaptor.getValue(); + assertEquals(expectedShardInfo, actualShardInfo); + // checkpoint is not included in ShardInfo equality check + assertEquals(expectedShardInfo.checkpoint(), actualShardInfo.checkpoint()); + } + + @Test + public void testShardInfoConstructionFromMultiStreamLeases() { + final StreamConfig streamConfigWithSerialization = new StreamConfig(StreamIdentifier.multiStreamInstance( + "123456789012:streamBySerialization:1111111111"), TEST_INITIAL_POSITION); + final StreamConfig streamConfigWithArn = new StreamConfig(StreamIdentifier.multiStreamInstance( + Arn.fromString("arn:aws:kinesis:us-east-1:123456789012:stream/streamByArn"), 2222222222L), + TEST_INITIAL_POSITION); + final StreamConfig streamConfigForOrphanStream = new StreamConfig(StreamIdentifier.multiStreamInstance( + "123456789012:streamNotProvidedInStreamConfigList:3333333333"), TEST_INITIAL_POSITION); + + final StreamTracker streamTracker = new MultiStreamTracker() { + @Override + public List streamConfigList() { + return Arrays.asList(streamConfigWithSerialization, streamConfigWithArn); + } + @Override + public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() { + return new FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy(); + } + }; + + retrievalConfig = + new RetrievalConfig(kinesisClient, streamTracker, applicationName).retrievalFactory(retrievalFactory); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + when(leaseCoordinator.getAssignments()).thenReturn( + Stream.of(streamConfigWithSerialization, streamConfigWithArn, streamConfigForOrphanStream) + .map(streamConfig -> constructMultiStreamLease(streamConfig.streamIdentifier().serialize())) + .collect(Collectors.toList())); + + final Scheduler schedulerSpy = spy(scheduler); + schedulerSpy.runProcessLoop(); + + final ArgumentCaptor shardInfoArgumentCaptor = ArgumentCaptor.forClass(ShardInfo.class); + verify(schedulerSpy, times(3)).buildConsumer( + shardInfoArgumentCaptor.capture(), + any(ShardRecordProcessorFactory.class), + any(LeaseCleanupManager.class)); + + // shardInfo should be constructed with a reference to the original streamConfig from streamConfigList + assertSame(streamConfigWithSerialization, shardInfoArgumentCaptor.getAllValues().get(0).streamConfig()); + assertSame(streamConfigWithArn, shardInfoArgumentCaptor.getAllValues().get(1).streamConfig()); + + // for a streamConfig that is not present in the streamConfigList/currentStreamConfigMap, + // shardInfo is constructed with a newly created StreamConfig instance + final StreamConfig actualStreamConfigForOrphanStream = + shardInfoArgumentCaptor.getAllValues().get(2).streamConfig(); + assertEquals(streamConfigForOrphanStream.streamIdentifier(), + actualStreamConfigForOrphanStream.streamIdentifier()); + assertNotSame(streamConfigForOrphanStream, actualStreamConfigForOrphanStream); + assertNotEquals(streamConfigForOrphanStream.initialPositionInStreamExtended(), + actualStreamConfigForOrphanStream.initialPositionInStreamExtended()); + assertEquals(multiStreamTracker.orphanedStreamInitialPositionInStream(), + actualStreamConfigForOrphanStream.initialPositionInStreamExtended()); + } + + private static Lease constructMultiStreamLease(String streamIdentifier) { + return constructMultiStreamLease(streamIdentifier, ExtendedSequenceNumber.TRIM_HORIZON); + } + + private static Lease constructMultiStreamLease(String streamIdentifier, ExtendedSequenceNumber checkpoint) { + final MultiStreamLease lease = new MultiStreamLease(); + lease.streamIdentifier(streamIdentifier); + lease.shardId(TEST_SHARD_ID); + return updateLease(lease, String.join(":", streamIdentifier, TEST_SHARD_ID), null, checkpoint); + } + + private static Lease constructLease(ExtendedSequenceNumber checkpoint) { + return constructLease(null, checkpoint); + } + + private static Lease constructLease(Collection parentShardIds, ExtendedSequenceNumber checkpoint) { + return updateLease(new Lease(), TEST_SHARD_ID, parentShardIds, checkpoint); + } + + private static Lease updateLease(Lease leaseToUpdate, String leaseKey, Collection parentShardIds, + ExtendedSequenceNumber checkpoint) { + leaseToUpdate.leaseKey(leaseKey); + leaseToUpdate.concurrencyToken(TEST_CONCURRENCY_TOKEN); + if (!CollectionUtils.isNullOrEmpty(parentShardIds)) { + leaseToUpdate.parentShardIds(parentShardIds); + } + leaseToUpdate.checkpoint(checkpoint); + return leaseToUpdate; + } + /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { final int numberOfRecordsPerShard = 10; final String kinesisShardPrefix = "kinesis-0-"; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java index 9a731f802..d0ce2841b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java @@ -23,6 +23,9 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; import software.amazon.kinesis.metrics.MetricsFactory; @@ -45,10 +48,13 @@ @RunWith(MockitoJUnitRunner.class) public class LeaseCleanupManagerTest { + private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final StreamConfig TEST_STREAM_CONFIG = + new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED); private static final ShardInfo SHARD_INFO = new ShardInfo("shardId", "concurrencyToken", - Collections.emptySet(), ExtendedSequenceNumber.LATEST); - - private static final StreamIdentifier STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + Collections.emptySet(), ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); private final long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis(); private final long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis(); @@ -175,7 +181,7 @@ private void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenc @Test public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception { final ShardInfo shardInfo = new ShardInfo("shardId-0", "concurrencyToken", Collections.singleton("parent"), - ExtendedSequenceNumber.LATEST); + ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0); } @@ -296,6 +302,6 @@ private List childShardsForMerge() { } private LeasePendingDeletion createLeasePendingDeletion(final Lease lease, final ShardInfo shardInfo) { - return new LeasePendingDeletion(STREAM_IDENTIFIER, lease, shardInfo, shardDetector); + return new LeasePendingDeletion(lease, shardInfo, shardDetector); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java index 5147ba796..daa797045 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java @@ -25,9 +25,18 @@ import org.junit.Test; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class ParentsFirstShardPrioritizationUnitTest { + private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final StreamConfig TEST_STREAM_CONFIG = + new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED); @Test(expected = IllegalArgumentException.class) public void testMaxDepthNegativeShouldFail() { @@ -193,7 +202,7 @@ ShardInfoBuilder withCheckpoint(ExtendedSequenceNumber checkpoint) { } ShardInfo build() { - return new ShardInfo(shardId, concurrencyToken, parentShardIds, checkpoint); + return new ShardInfo(shardId, concurrencyToken, parentShardIds, checkpoint, TEST_STREAM_CONFIG); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java index 4ccafe523..f96f6048f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java @@ -17,6 +17,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -29,9 +30,18 @@ import org.junit.Before; import org.junit.Test; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class ShardInfoTest { + private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final StreamConfig TEST_STREAM_CONFIG = + new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED); private static final String CONCURRENCY_TOKEN = UUID.randomUUID().toString(); private static final String SHARD_ID = "shardId-test"; private final Set parentShardIds = new HashSet<>(); @@ -43,12 +53,14 @@ public void setUpPacboyShardInfo() { parentShardIds.add("shard-1"); parentShardIds.add("shard-2"); - testShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); + testShardInfo = new ShardInfo( + SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); } @Test public void testPacboyShardInfoEqualsWithSameArgs() { - ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); + final ShardInfo equalShardInfo = new ShardInfo( + SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); assertTrue("Equal should return true for arguments all the same", testShardInfo.equals(equalShardInfo)); } @@ -59,11 +71,15 @@ public void testPacboyShardInfoEqualsWithNull() { @Test public void testPacboyShardInfoEqualsForfToken() { - ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, UUID.randomUUID().toString(), parentShardIds, ExtendedSequenceNumber.LATEST); - assertFalse("Equal should return false with different concurrency token", - diffShardInfo.equals(testShardInfo)); - diffShardInfo = new ShardInfo(SHARD_ID, null, parentShardIds, ExtendedSequenceNumber.LATEST); - assertFalse("Equal should return false for null concurrency token", diffShardInfo.equals(testShardInfo)); + final ShardInfo shardInfoWithDifferentConcurrencyToken = new ShardInfo(SHARD_ID, UUID.randomUUID().toString(), + parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); + assertNotEquals("Equal should return false with different concurrency token", + shardInfoWithDifferentConcurrencyToken, testShardInfo); + + final ShardInfo shardInfoWithNullConcurrencyToken = + new ShardInfo(SHARD_ID, null, parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); + assertNotEquals("Equal should return false for null concurrency token", + shardInfoWithNullConcurrencyToken, testShardInfo); } @Test @@ -72,7 +88,8 @@ public void testPacboyShardInfoEqualsForDifferentlyOrderedParentIds() { differentlyOrderedParentShardIds.add("shard-2"); differentlyOrderedParentShardIds.add("shard-1"); ShardInfo shardInfoWithDifferentlyOrderedParentShardIds = - new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, differentlyOrderedParentShardIds, ExtendedSequenceNumber.LATEST); + new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, differentlyOrderedParentShardIds, + ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); assertTrue("Equal should return true even with parent shard Ids reordered", shardInfoWithDifferentlyOrderedParentShardIds.equals(testShardInfo)); } @@ -82,20 +99,24 @@ public void testPacboyShardInfoEqualsForParentIds() { Set diffParentIds = new HashSet<>(); diffParentIds.add("shard-3"); diffParentIds.add("shard-4"); - ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, diffParentIds, ExtendedSequenceNumber.LATEST); - assertFalse("Equal should return false with different parent shard Ids", - diffShardInfo.equals(testShardInfo)); - diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, null, ExtendedSequenceNumber.LATEST); - assertFalse("Equal should return false with null parent shard Ids", diffShardInfo.equals(testShardInfo)); + final ShardInfo shardInfoWithDifferentParents = new ShardInfo( + SHARD_ID, CONCURRENCY_TOKEN, diffParentIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); + assertNotEquals("Equal should return false with different parent shard Ids", + shardInfoWithDifferentParents, testShardInfo); + final ShardInfo shardInfoWithNullParents = new ShardInfo( + SHARD_ID, CONCURRENCY_TOKEN, null, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); + assertNotEquals("Equal should return false with null parent shard Ids", + shardInfoWithNullParents, testShardInfo); } @Test public void testShardInfoCheckpointEqualsHashCode() { - ShardInfo baseInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, - ExtendedSequenceNumber.TRIM_HORIZON); - ShardInfo differentCheckpoint = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, - new ExtendedSequenceNumber("1234")); - ShardInfo nullCheckpoint = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, null); + final ShardInfo baseInfo = new ShardInfo( + SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); + final ShardInfo differentCheckpoint = new ShardInfo( + SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, new ExtendedSequenceNumber("1234"), TEST_STREAM_CONFIG); + final ShardInfo nullCheckpoint = new ShardInfo( + SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, null, TEST_STREAM_CONFIG); assertThat("Checkpoint should not be included in equality.", baseInfo.equals(differentCheckpoint), is(true)); assertThat("Checkpoint should not be included in equality.", baseInfo.equals(nullCheckpoint), is(true)); @@ -108,7 +129,8 @@ public void testShardInfoCheckpointEqualsHashCode() { @Test public void testPacboyShardInfoSameHashCode() { - ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST); + final ShardInfo equalShardInfo = new ShardInfo( + SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); assertTrue("Shard info objects should have same hashCode for the same arguments", equalShardInfo.hashCode() == testShardInfo.hashCode()); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java index 61473833f..e58df5bd6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java @@ -26,6 +26,10 @@ import org.junit.Before; import org.junit.Test; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardInfo; @@ -38,16 +42,28 @@ * */ public class BlockOnParentShardTaskTest { + private static final String TEST_STREAM_NAME = "stream"; + private static final String TEST_ACCOUNT_ID = "123456789012"; + private static final long TEST_CREATION_EPOCH = 1234567890L; + private static final String TEST_STREAM_ID_SERIALIZATION = + String.join(":", TEST_ACCOUNT_ID, TEST_STREAM_NAME, String.valueOf(TEST_CREATION_EPOCH)); + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final StreamConfig TEST_STREAM_CONFIG = new StreamConfig( + StreamIdentifier.singleStreamInstance(TEST_STREAM_NAME), TEST_INITIAL_POSITION_IN_STREAM_EXTENDED); + private static final StreamConfig TEST_MULTI_STREAM_CONFIG = new StreamConfig( + StreamIdentifier.multiStreamInstance(TEST_STREAM_ID_SERIALIZATION), + TEST_INITIAL_POSITION_IN_STREAM_EXTENDED); private final long backoffTimeInMillis = 50L; private final String shardId = "shardId-97"; - private final String streamId = "123:stream:146"; private final String concurrencyToken = "testToken"; private final List emptyParentShardIds = new ArrayList<>(); private ShardInfo shardInfo; @Before public void setup() { - shardInfo = new ShardInfo(shardId, concurrencyToken, emptyParentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); + shardInfo = new ShardInfo(shardId, concurrencyToken, emptyParentShardIds, + ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); } /** @@ -94,14 +110,16 @@ public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinished() // test single parent parentShardIds.add(parent1ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); + shardInfo = new ShardInfo( + shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); result = task.call(); assertNull(result.getException()); // test two parents parentShardIds.add(parent2ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); + shardInfo = new ShardInfo( + shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); result = task.call(); assertNull(result.getException()); @@ -118,8 +136,8 @@ public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMu throws DependencyException, InvalidStateException, ProvisionedThroughputException { ShardInfo shardInfo = null; BlockOnParentShardTask task = null; - String parent1LeaseKey = streamId + ":" + "shardId-1"; - String parent2LeaseKey = streamId + ":" + "shardId-2"; + final String parent1LeaseKey = TEST_STREAM_ID_SERIALIZATION + ":" + "shardId-1"; + final String parent2LeaseKey = TEST_STREAM_ID_SERIALIZATION + ":" + "shardId-2"; String parent1ShardId = "shardId-1"; String parent2ShardId = "shardId-2"; List parentShardIds = new ArrayList<>(); @@ -136,15 +154,16 @@ public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMu // test single parent parentShardIds.add(parent1ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, - streamId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, + ExtendedSequenceNumber.TRIM_HORIZON, TEST_MULTI_STREAM_CONFIG); task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); result = task.call(); assertNull(result.getException()); // test two parents parentShardIds.add(parent2ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, + ExtendedSequenceNumber.TRIM_HORIZON, TEST_MULTI_STREAM_CONFIG); task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); result = task.call(); assertNull(result.getException()); @@ -178,14 +197,16 @@ public final void testCallWhenParentsHaveNotFinished() // test single parent parentShardIds.add(parent1ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); + shardInfo = new ShardInfo( + shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); result = task.call(); assertNotNull(result.getException()); // test two parents parentShardIds.add(parent2ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); + shardInfo = new ShardInfo( + shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); result = task.call(); assertNotNull(result.getException()); @@ -203,8 +224,8 @@ public final void testCallWhenParentsHaveNotFinishedMultiStream() ShardInfo shardInfo = null; BlockOnParentShardTask task = null; - String parent1LeaseKey = streamId + ":" + "shardId-1"; - String parent2LeaseKey = streamId + ":" + "shardId-2"; + String parent1LeaseKey = TEST_STREAM_ID_SERIALIZATION + ":" + "shardId-1"; + String parent2LeaseKey = TEST_STREAM_ID_SERIALIZATION + ":" + "shardId-2"; String parent1ShardId = "shardId-1"; String parent2ShardId = "shardId-2"; List parentShardIds = new ArrayList<>(); @@ -222,14 +243,16 @@ public final void testCallWhenParentsHaveNotFinishedMultiStream() // test single parent parentShardIds.add(parent1ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, + ExtendedSequenceNumber.TRIM_HORIZON, TEST_MULTI_STREAM_CONFIG); task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); result = task.call(); assertNotNull(result.getException()); // test two parents parentShardIds.add(parent2ShardId); - shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId); + shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, + ExtendedSequenceNumber.TRIM_HORIZON, TEST_MULTI_STREAM_CONFIG); task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis); result = task.call(); assertNotNull(result.getException()); @@ -249,7 +272,8 @@ public final void testCallBeforeAndAfterAParentFinishes() String parentShardId = "shardId-1"; List parentShardIds = new ArrayList<>(); parentShardIds.add(parentShardId); - ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON); + final ShardInfo shardInfo = new ShardInfo( + shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); TaskResult result = null; Lease parentLease = new Lease(); LeaseRefresher leaseRefresher = mock(LeaseRefresher.class); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 6551f9496..9e23f4d5e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -41,6 +41,7 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; @@ -113,15 +114,15 @@ public class ConsumerStatesTest { @Before public void setup() { - argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME), - leaseCoordinator, executorService, recordsPublisher, + argument = new ShardConsumerArgument(shardInfo, leaseCoordinator, executorService, recordsPublisher, shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, - INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager, schemaRegistryDecoder); when(shardInfo.shardId()).thenReturn("shardId-000000000000"); - when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize())); + when(shardInfo.streamConfig()).thenReturn(new StreamConfig( + StreamIdentifier.singleStreamInstance(STREAM_NAME), INITIAL_POSITION_IN_STREAM)); consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument, taskExecutionListener, 0)); when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index 1e89d8cc5..576d6d065 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -69,6 +69,10 @@ import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; +import software.amazon.kinesis.common.InitialPositionInStream; +import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; @@ -85,6 +89,11 @@ @RunWith(MockitoJUnitRunner.class) public class ProcessTaskTest { + private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final StreamConfig TEST_STREAM_CONFIG = + new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED); private static final long IDLE_TIME_IN_MILLISECONDS = 100L; private static final Schema SCHEMA_REGISTRY_SCHEMA = new Schema("{}", "AVRO", "demoSchema"); private static final byte[] SCHEMA_REGISTRY_PAYLOAD = new byte[] {01, 05, 03, 05}; @@ -119,7 +128,7 @@ public class ProcessTaskTest { public void setUpProcessTask() { when(checkpointer.checkpointer()).thenReturn(mock(Checkpointer.class)); - shardInfo = new ShardInfo(shardId, null, null, null); + shardInfo = new ShardInfo(shardId, null, null, null, TEST_STREAM_CONFIG); } private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index 4299c1632..348931c2f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -60,8 +60,11 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; @@ -73,6 +76,11 @@ @Slf4j @RunWith(MockitoJUnitRunner.class) public class ShardConsumerSubscriberTest { + private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final StreamConfig TEST_STREAM_CONFIG = + new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED); private final Object processedNotifier = new Object(); @@ -104,8 +112,8 @@ public void before() { .setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build()); recordsPublisher = new TestPublisher(); - ShardInfo shardInfo = new ShardInfo("shard-001", "", Collections.emptyList(), - ExtendedSequenceNumber.TRIM_HORIZON); + final ShardInfo shardInfo = new ShardInfo( + "shard-001", "", Collections.emptyList(), ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); when(shardConsumer.shardInfo()).thenReturn(shardInfo); processRecordsInput = ProcessRecordsInput.builder().records(Collections.emptyList()) diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 62fd13ef5..97444c85c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -71,8 +71,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; +import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput; @@ -88,7 +91,11 @@ @RunWith(MockitoJUnitRunner.class) @Slf4j public class ShardConsumerTest { - + private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + private static final StreamConfig TEST_STREAM_CONFIG = + new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED); private final String shardId = "shardId-0-0"; private final String concurrencyToken = "TestToken"; private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); @@ -148,7 +155,8 @@ public class ShardConsumerTest { @Before public void before() { - shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); + shardInfo = new ShardInfo( + shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG); ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test-" + testName.getMethodName() + "-%04d") .setDaemon(true).build(); executorService = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index b79ffc036..8b82e314d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -49,6 +49,7 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; @@ -86,13 +87,15 @@ public class ShutdownTaskTest { private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private static final StreamIdentifier STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + private static final StreamConfig TEST_STREAM_CONFIG = + new StreamConfig(STREAM_IDENTIFIER, INITIAL_POSITION_TRIM_HORIZON); /** * Shard id for the default-provided {@link ShardInfo} and {@link Lease}. */ private static final String SHARD_ID = "shardId-0"; private static final ShardInfo SHARD_INFO = new ShardInfo(SHARD_ID, "concurrencyToken", - Collections.emptySet(), ExtendedSequenceNumber.LATEST); + Collections.emptySet(), ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); private ShutdownTask task; @@ -274,7 +277,7 @@ public final void testMergeChildWhereBothParentsHaveLeases() throws Exception { public final void testCallWhenShardNotFound() throws Exception { final Lease lease = setupLease("shardId-4", Collections.emptyList()); final ShardInfo shardInfo = new ShardInfo(lease.leaseKey(), "concurrencyToken", Collections.emptySet(), - ExtendedSequenceNumber.LATEST); + ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG); final TaskResult result = createShutdownTask(SHARD_END, Collections.emptyList(), shardInfo).call(); @@ -394,16 +397,16 @@ private ShutdownTask createShutdownTask(final ShutdownReason reason, final List< private ShutdownTask createShutdownTask(final ShutdownReason reason, final List childShards, final ShardInfo shardInfo) { return new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - reason, null, INITIAL_POSITION_TRIM_HORIZON, false, false, + reason, null, false, false, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, - NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager); + NULL_METRICS_FACTORY, childShards, leaseCleanupManager); } private ShutdownTask createShutdownTaskWithNotification(final ShutdownReason reason, final List childShards) { return new ShutdownTask(SHARD_INFO, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - reason, shutdownNotification, INITIAL_POSITION_TRIM_HORIZON, false, false, + reason, shutdownNotification, false, false, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, - NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager); + NULL_METRICS_FACTORY, childShards, leaseCleanupManager); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java index 584540875..0b80d2005 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java @@ -37,13 +37,12 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.RetrievalFactory; -import java.util.Optional; - @RunWith(MockitoJUnitRunner.class) public class FanOutConfigTest { @@ -51,6 +50,12 @@ public class FanOutConfigTest { private static final String TEST_APPLICATION_NAME = "TestApplication"; private static final String TEST_STREAM_NAME = "TestStream"; private static final String TEST_CONSUMER_NAME = "TestConsumerName"; + private static final String TEST_ACCOUNT_ID = "123456789012"; + private static final long TEST_CREATION_EPOCH = 1234567890L; + private static final String TEST_STREAM_IDENTIFIER_SERIALIZATION = + String.join(":", TEST_ACCOUNT_ID, TEST_STREAM_NAME, String.valueOf(TEST_CREATION_EPOCH)); + private static final StreamIdentifier TEST_STREAM_IDENTIFIER = + StreamIdentifier.multiStreamInstance(TEST_STREAM_IDENTIFIER_SERIALIZATION); @Mock private FanOutConsumerRegistration consumerRegistration; @@ -85,23 +90,23 @@ public void testNoRegisterIfConsumerArnSet() { @Test public void testRegisterCalledWhenConsumerArnUnset() throws Exception { - getRecordsCache(null); + getRecordsCache(); verify(consumerRegistration).getOrCreateStreamConsumerArn(); } @Test public void testRegisterNotCalledWhenConsumerArnSetInMultiStreamMode() throws Exception { - when(streamConfig.consumerArn()).thenReturn("consumerArn"); + when(streamConfig.consumerArn()).thenReturn(TEST_CONSUMER_ARN); - getRecordsCache("123456789012:stream:12345"); + getRecordsCache(); verify(consumerRegistration, never()).getOrCreateStreamConsumerArn(); } @Test public void testRegisterCalledWhenConsumerArnNotSetInMultiStreamMode() throws Exception { - getRecordsCache("123456789012:stream:12345"); + getRecordsCache(); verify(consumerRegistration).getOrCreateStreamConsumerArn(); } @@ -112,7 +117,7 @@ public void testDependencyExceptionInConsumerCreation() throws Exception { when(consumerRegistration.getOrCreateStreamConsumerArn()).thenThrow(de); try { - getRecordsCache(null); + getRecordsCache(); Assert.fail("should throw"); } catch (RuntimeException e) { verify(consumerRegistration).getOrCreateStreamConsumerArn(); @@ -122,7 +127,7 @@ public void testDependencyExceptionInConsumerCreation() throws Exception { @Test public void testCreationWithApplicationName() { - getRecordsCache(null); + getRecordsCache(); assertEquals(TEST_STREAM_NAME, config.streamName()); assertEquals(TEST_APPLICATION_NAME, config.applicationName()); @@ -134,7 +139,7 @@ public void testCreationWithConsumerName() { // unset common parameters .applicationName(null); - getRecordsCache(null); + getRecordsCache(); assertEquals(TEST_STREAM_NAME, config.streamName()); assertEquals(TEST_CONSUMER_NAME, config.consumerName()); @@ -144,7 +149,7 @@ public void testCreationWithConsumerName() { public void testCreationWithBothConsumerApplication() { config = config.consumerName(TEST_CONSUMER_NAME); - getRecordsCache(null); + getRecordsCache(); assertEquals(TEST_STREAM_NAME, config.streamName()); assertEquals(TEST_CONSUMER_NAME, config.consumerName()); @@ -197,9 +202,10 @@ private void testInvalidState(final String streamName, final String consumerArn) } } - private void getRecordsCache(final String streamIdentifer) { + private void getRecordsCache() { final ShardInfo shardInfo = mock(ShardInfo.class); - when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.ofNullable(streamIdentifer)); + when(shardInfo.streamConfig()).thenReturn(streamConfig); + when(streamConfig.streamIdentifier()).thenReturn(TEST_STREAM_IDENTIFIER); final RetrievalFactory factory = config.retrievalFactory(); factory.createGetRecordsCache(shardInfo, streamConfig, mock(MetricsFactory.class));