discoverNewLeases() throws ProvisionedThroughputException, InvalidStateException, DependencyException;
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
index 2d4e041c0..ef750f461 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java
@@ -16,7 +16,9 @@
package software.amazon.kinesis.leases;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
@@ -25,6 +27,7 @@
import java.util.function.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Builder;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
@@ -34,14 +37,17 @@
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.kinesis.common.DdbTableConfig;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
+import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
+import software.amazon.kinesis.worker.metric.WorkerMetric;
/**
* Used by the KCL to configure lease management.
@@ -209,6 +215,9 @@ public class LeaseManagementConfig {
private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
+ private WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig =
+ new WorkerUtilizationAwareAssignmentConfig();
+
/**
* Whether to enable deletion protection on the DynamoDB lease table created by KCL. This does not update
* already existing tables.
@@ -276,14 +285,17 @@ public LeaseManagementConfig(
}
public LeaseManagementConfig(
- String tableName,
- DynamoDbAsyncClient dynamoDBClient,
- KinesisAsyncClient kinesisClient,
- String workerIdentifier) {
+ final String tableName,
+ final String applicationName,
+ final DynamoDbAsyncClient dynamoDBClient,
+ final KinesisAsyncClient kinesisClient,
+ final String workerIdentifier) {
this.tableName = tableName;
this.dynamoDBClient = dynamoDBClient;
this.kinesisClient = kinesisClient;
this.workerIdentifier = workerIdentifier;
+ this.workerUtilizationAwareAssignmentConfig.workerMetricsTableConfig =
+ new WorkerMetricsTableConfig(applicationName);
}
/**
@@ -350,10 +362,18 @@ static class LeaseManagementThreadPool extends ThreadPoolExecutor {
*/
private TableCreatorCallback tableCreatorCallback = TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK;
+ /**
+ * @deprecated never used and will be removed in future releases
+ */
+ @Deprecated
private HierarchicalShardSyncer hierarchicalShardSyncer;
private LeaseManagementFactory leaseManagementFactory;
+ /**
+ * @deprecated never used and will be removed in future releases
+ */
+ @Deprecated
public HierarchicalShardSyncer hierarchicalShardSyncer() {
if (hierarchicalShardSyncer == null) {
hierarchicalShardSyncer = new HierarchicalShardSyncer();
@@ -361,39 +381,63 @@ public HierarchicalShardSyncer hierarchicalShardSyncer() {
return hierarchicalShardSyncer;
}
+ /**
+ * Configuration class for controlling the graceful handoff of leases.
+ * This configuration allows tuning of the shutdown behavior during lease transfers.
+ *
+ * It provides settings to control the timeout period for waiting on the record processor
+ * to shut down and an option to enable or disable graceful lease handoff.
+ *
+ */
+ @Data
+ @Builder
+ @Accessors(fluent = true)
+ public static class GracefulLeaseHandoffConfig {
+ /**
+ * The minimum amount of time (in milliseconds) to wait for the current shard's RecordProcessor
+ * to gracefully shut down before forcefully transferring the lease to the next owner.
+ *
+ * If each call to {@code processRecords} is expected to run longer than the default value,
+ * it makes sense to set this to a higher value to ensure the RecordProcessor has enough
+ * time to complete its processing.
+ *
+ *
+ * Default value is 30,000 milliseconds (30 seconds).
+ *
+ */
+ @Builder.Default
+ private long gracefulLeaseHandoffTimeoutMillis = 30_000L;
+ /**
+ * Flag to enable or disable the graceful lease handoff mechanism.
+ *
+ * When set to {@code true}, the KCL will attempt to gracefully transfer leases by
+ * allowing the shard's RecordProcessor sufficient time to complete processing before
+ * handing off the lease to another worker. When {@code false}, the lease will be
+ * handed off without waiting for the RecordProcessor to shut down gracefully. Note
+ * that checkpointing is expected to be implemented inside {@code shutdownRequested}
+ * for this feature to work end to end.
+ *
+ *
+ * Default value is {@code true}.
+ *
+ */
+ @Builder.Default
+ private boolean isGracefulLeaseHandoffEnabled = true;
+ }
+
+ private GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig =
+ GracefulLeaseHandoffConfig.builder().build();
+
+ /**
+ * @deprecated This is no longer invoked, but {@code leaseManagementFactory(LeaseSerializer, boolean)}
+ * is invoked instead. Please remove implementation for this method as future
+ * releases will remove this API.
+ */
@Deprecated
public LeaseManagementFactory leaseManagementFactory() {
if (leaseManagementFactory == null) {
Validate.notEmpty(streamName(), "Stream name is empty");
- leaseManagementFactory = new DynamoDBLeaseManagementFactory(
- kinesisClient(),
- streamName(),
- dynamoDBClient(),
- tableName(),
- workerIdentifier(),
- executorService(),
- initialPositionInStream(),
- failoverTimeMillis(),
- epsilonMillis(),
- maxLeasesForWorker(),
- maxLeasesToStealAtOneTime(),
- maxLeaseRenewalThreads(),
- cleanupLeasesUponShardCompletion(),
- ignoreUnexpectedChildShards(),
- shardSyncIntervalMillis(),
- consistentReads(),
- listShardsBackoffTimeInMillis(),
- maxListShardsRetryAttempts(),
- maxCacheMissesBeforeReload(),
- listShardsCacheAllowedAgeInSeconds(),
- cacheMissWarningModulus(),
- initialLeaseTableReadCapacity(),
- initialLeaseTableWriteCapacity(),
- hierarchicalShardSyncer(),
- tableCreatorCallback(),
- dynamoDbRequestTimeout(),
- billingMode(),
- tags());
+ leaseManagementFactory(new DynamoDBLeaseSerializer(), false);
}
return leaseManagementFactory;
}
@@ -430,7 +474,6 @@ public LeaseManagementFactory leaseManagementFactory(
cacheMissWarningModulus(),
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
- hierarchicalShardSyncer(),
tableCreatorCallback(),
dynamoDbRequestTimeout(),
billingMode(),
@@ -440,7 +483,9 @@ public LeaseManagementFactory leaseManagementFactory(
leaseSerializer,
customShardDetectorProvider(),
isMultiStreamingMode,
- leaseCleanupConfig());
+ leaseCleanupConfig(),
+ workerUtilizationAwareAssignmentConfig(),
+ gracefulLeaseHandoffConfig);
}
return leaseManagementFactory;
}
@@ -454,4 +499,90 @@ public LeaseManagementConfig leaseManagementFactory(final LeaseManagementFactory
this.leaseManagementFactory = leaseManagementFactory;
return this;
}
+
+ @Data
+ @Accessors(fluent = true)
+ public static class WorkerUtilizationAwareAssignmentConfig {
+ /**
+ * This defines the frequency of capturing worker metric stats in memory. Default is 1s
+ */
+ private long inMemoryWorkerMetricsCaptureFrequencyMillis =
+ Duration.ofSeconds(1L).toMillis();
+ /**
+ * This defines the frequency of reporting worker metric stats to storage. Default is 30s
+ */
+ private long workerMetricsReporterFreqInMillis = Duration.ofSeconds(30).toMillis();
+ /**
+ * These are the no. of metrics that are persisted in storage in WorkerMetricStats ddb table.
+ */
+ private int noOfPersistedMetricsPerWorkerMetrics = 10;
+ /**
+ * Option to disable workerMetrics to use in lease balancing.
+ */
+ private boolean disableWorkerMetrics = false;
+ /**
+ * List of workerMetrics for the application.
+ */
+ private List workerMetricList = new ArrayList<>();
+ /**
+ * Max throughput per host KBps, default is unlimited.
+ */
+ private double maxThroughputPerHostKBps = Double.MAX_VALUE;
+ /**
+ * Percentage of value to achieve critical dampening during this case
+ */
+ private int dampeningPercentage = 60;
+ /**
+ * Percentage value used to trigger reBalance. If fleet has workers which are have metrics value more or less
+ * than 10% of fleet level average then reBalance is triggered.
+ * Leases are taken from workers with metrics value more than fleet level average. The load to take from these
+ * workers is determined by evaluating how far they are with respect to fleet level average.
+ */
+ private int reBalanceThresholdPercentage = 10;
+
+ /**
+ * The allowThroughputOvershoot flag determines whether leases should still be taken even if
+ * it causes the total assigned throughput to exceed the desired throughput to take for re-balance.
+ * Enabling this flag provides more flexibility for the LeaseAssignmentManager to explore additional
+ * assignment possibilities, which can lead to faster throughput convergence.
+ */
+ private boolean allowThroughputOvershoot = true;
+
+ /**
+ * Duration after which workerMetricStats entry from WorkerMetricStats table will be cleaned up. When an entry's
+ * lastUpdateTime is older than staleWorkerMetricsEntryCleanupDuration from current time, entry will be removed
+ * from the table.
+ */
+ private Duration staleWorkerMetricsEntryCleanupDuration = Duration.ofDays(1);
+
+ /**
+ * configuration to configure how to create the WorkerMetricStats table, such as table name,
+ * billing mode, provisioned capacity. If no table name is specified, the table name will
+ * default to applicationName-WorkerMetricStats. If no billing more is chosen, default is
+ * On-Demand.
+ */
+ private WorkerMetricsTableConfig workerMetricsTableConfig;
+
+ /**
+ * Frequency to perform worker variance balancing. This value is used with respect to the LAM frequency,
+ * that is every third (as default) iteration of LAM the worker variance balancing will be performed.
+ * Setting it to 1 will make varianceBalancing run on every iteration of LAM and 2 on every 2nd iteration
+ * and so on.
+ * NOTE: LAM frequency = failoverTimeMillis
+ */
+ private int varianceBalancingFrequency = 3;
+
+ /**
+ * Alpha value used for calculating exponential moving average of worker's metricStats. Selecting
+ * higher alpha value gives more weightage to recent value and thus low smoothing effect on computed average
+ * and selecting smaller alpha values gives more weightage to past value and high smoothing effect.
+ */
+ private double workerMetricsEMAAlpha = 0.5;
+ }
+
+ public static class WorkerMetricsTableConfig extends DdbTableConfig {
+ public WorkerMetricsTableConfig(final String applicationName) {
+ super(applicationName, "WorkerMetricStats");
+ }
+ }
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
index 9ed77a537..788034d1e 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
@@ -15,9 +15,12 @@
package software.amazon.kinesis.leases;
+import java.util.concurrent.ConcurrentMap;
+
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
+import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.metrics.MetricsFactory;
/**
@@ -26,10 +29,27 @@
public interface LeaseManagementFactory {
LeaseCoordinator createLeaseCoordinator(MetricsFactory metricsFactory);
- ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory);
+ default LeaseCoordinator createLeaseCoordinator(
+ MetricsFactory metricsFactory, ConcurrentMap shardInfoShardConsumerMap) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+ /**
+ * @deprecated This method is never invoked, please remove implementation of this method
+ * as it will be removed in future releases.
+ */
+ @Deprecated
+ default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory) {
+ throw new UnsupportedOperationException("Deprecated");
+ }
+
+ /**
+ * @deprecated This method is never invoked, please remove implementation of this method
+ * as it will be removed in future releases.
+ */
+ @Deprecated
default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Deprecated");
}
default ShardSyncTaskManager createShardSyncTaskManager(
@@ -41,10 +61,17 @@ default ShardSyncTaskManager createShardSyncTaskManager(
DynamoDBLeaseRefresher createLeaseRefresher();
- ShardDetector createShardDetector();
+ /**
+ * @deprecated This method is never invoked, please remove implementation of this method
+ * as it will be removed in future releases.
+ */
+ @Deprecated
+ default ShardDetector createShardDetector() {
+ throw new UnsupportedOperationException("Deprecated");
+ }
default ShardDetector createShardDetector(StreamConfig streamConfig) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Not implemented");
}
LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
index c38d442a8..fc71621d1 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java
@@ -15,6 +15,9 @@
package software.amazon.kinesis.leases;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.DependencyException;
@@ -75,6 +78,37 @@ boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
*/
boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException;
+ /**
+ * Creates the LeaseOwnerToLeaseKey index on the lease table if it doesn't exist and returns the status of index.
+ *
+ * @return indexStatus status of the index.
+ * @throws DependencyException if storage's describe API fails in an unexpected way
+ */
+ default String createLeaseOwnerToLeaseKeyIndexIfNotExists() throws DependencyException {
+ return null;
+ }
+
+ /**
+ * Blocks until the index exists by polling storage till either the index is ACTIVE or else timeout has
+ * happened.
+ *
+ * @param secondsBetweenPolls time to wait between polls in seconds
+ * @param timeoutSeconds total time to wait in seconds
+ *
+ * @return true if index on the table exists and is ACTIVE, false if timeout was reached
+ */
+ default boolean waitUntilLeaseOwnerToLeaseKeyIndexExists(
+ final long secondsBetweenPolls, final long timeoutSeconds) {
+ return false;
+ }
+
+ /**
+ * Check if leaseOwner GSI is ACTIVE
+ * @return true if index is active, false otherwise
+ * @throws DependencyException if storage's describe API fails in an unexpected way
+ */
+ boolean isLeaseOwnerToLeaseKeyIndexActive() throws DependencyException;
+
/**
* List all leases for a given stream synchronously.
*
@@ -87,6 +121,24 @@ boolean createLeaseTableIfNotExists(Long readCapacity, Long writeCapacity)
List listLeasesForStream(StreamIdentifier streamIdentifier)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
+ /**
+ * List all leases for a given workerIdentifier synchronously.
+ * Default implementation calls listLeases() and filters the results.
+ *
+ * @throws DependencyException if DynamoDB scan fails in an unexpected way
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
+ *
+ * @return list of leases
+ */
+ default List listLeaseKeysForWorker(final String workerIdentifier)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ return listLeases().stream()
+ .filter(lease -> lease.leaseOwner().equals(workerIdentifier))
+ .map(Lease::leaseKey)
+ .collect(Collectors.toList());
+ }
+
/**
* List all objects in table synchronously.
*
@@ -98,6 +150,23 @@ List listLeasesForStream(StreamIdentifier streamIdentifier)
*/
List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
+ /**
+ * List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey
+ * that failed deserialize separately.
+ *
+ * @param threadPool threadpool to use for parallel scan
+ * @param parallelismFactor no. of parallel scans
+ * @return Pair of List of leases from the storage and List of items failed to deserialize
+ * @throws DependencyException if DynamoDB scan fails in an unexpected way
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
+ */
+ default Map.Entry, List> listLeasesParallely(
+ final ExecutorService threadPool, final int parallelismFactor)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ throw new UnsupportedOperationException("listLeasesParallely is not implemented");
+ }
+
/**
* Create a new lease. Conditional on a lease not already existing with this shardId.
*
@@ -154,6 +223,47 @@ boolean createLeaseIfNotExists(Lease lease)
boolean takeLease(Lease lease, String owner)
throws DependencyException, InvalidStateException, ProvisionedThroughputException;
+ /**
+ * Assigns given lease to newOwner owner by incrementing its leaseCounter and setting its owner field. Conditional
+ * on the leaseOwner in DynamoDB matching the leaseOwner of the input lease. Mutates the leaseCounter and owner of
+ * the passed-in lease object after updating DynamoDB.
+ *
+ * @param lease the lease to be assigned
+ * @param newOwner the new owner
+ *
+ * @return true if lease was successfully assigned, false otherwise
+ *
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
+ * @throws DependencyException if DynamoDB update fails in an unexpected way
+ */
+ default boolean assignLease(final Lease lease, final String newOwner)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+
+ throw new UnsupportedOperationException("assignLease is not implemented");
+ }
+
+ /**
+ * Initiates a graceful handoff of the given lease to the specified new owner, allowing the current owner
+ * to complete its processing before transferring ownership.
+ *
+ * This method updates the lease with the new owner information but ensures that the current owner
+ * is given time to gracefully finish its work (e.g., processing records) before the lease is reassigned.
+ *
+ *
+ * @param lease the lease to be assigned
+ * @param newOwner the new owner
+ * @return true if a graceful handoff was successfully initiated
+ * @throws InvalidStateException if lease table does not exist
+ * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
+ * @throws DependencyException if DynamoDB update fails in an unexpected way
+ */
+ default boolean initiateGracefulLeaseHandoff(final Lease lease, final String newOwner)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+
+ throw new UnsupportedOperationException("assignLeaseWithWait is not implemented");
+ }
+
/**
* Evict the current owner of lease by setting owner to null. Conditional on the owner in DynamoDB matching the owner of
* the input. Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB.
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java
index 5d7bea63d..3c4692a92 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseSerializer.java
@@ -15,6 +15,7 @@
package software.amazon.kinesis.leases;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
@@ -100,6 +101,15 @@ default Map getDynamoExistentExpectation(String
*/
Map getDynamoTakeLeaseUpdate(Lease lease, String newOwner);
+ /**
+ * @param lease lease that needs to be assigned
+ * @param newOwner newLeaseOwner
+ * @return the attribute value map that takes a lease for a new owner
+ */
+ default Map getDynamoAssignLeaseUpdate(Lease lease, String newOwner) {
+ throw new UnsupportedOperationException("getDynamoAssignLeaseUpdate is not implemented");
+ }
+
/**
* @param lease
* @return the attribute value map that voids a lease
@@ -127,8 +137,22 @@ default Map getDynamoUpdateLeaseUpdate(Lease lease
*/
Collection getKeySchema();
+ default Collection getWorkerIdToLeaseKeyIndexKeySchema() {
+ return Collections.EMPTY_LIST;
+ }
+
+ default Collection getWorkerIdToLeaseKeyIndexAttributeDefinitions() {
+ return Collections.EMPTY_LIST;
+ }
+
/**
* @return attribute definitions for creating a DynamoDB table to store leases
*/
Collection getAttributeDefinitions();
+
+ /**
+ * @param lease
+ * @return the attribute value map that includes lease throughput
+ */
+ Map getDynamoLeaseThroughputKbpsUpdate(Lease lease);
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseStatsRecorder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseStatsRecorder.java
new file mode 100644
index 000000000..dcb5d6de4
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseStatsRecorder.java
@@ -0,0 +1,158 @@
+package software.amazon.kinesis.leases;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import software.amazon.awssdk.annotations.ThreadSafe;
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+import software.amazon.kinesis.utils.ExponentialMovingAverage;
+
+import static java.util.Objects.isNull;
+
+/**
+ * This class records the stats for the leases.
+ * The stats are recorded in a thread safe queue, and the throughput is calculated by summing up the bytes and dividing
+ * by interval in seconds.
+ * This class is thread safe and backed by thread safe data structures.
+ */
+@RequiredArgsConstructor
+@KinesisClientInternalApi
+@ThreadSafe
+public class LeaseStatsRecorder {
+
+ /**
+ * This default alpha is chosen based on the testing so far between simple average and moving average with 0.5.
+ * In the future, if one value does not fit all use cases, inject this via config.
+ */
+ private static final double DEFAULT_ALPHA = 0.5;
+
+ public static final int BYTES_PER_KB = 1024;
+
+ private final Long renewerFrequencyInMillis;
+ private final Map> leaseStatsMap = new ConcurrentHashMap<>();
+ private final Map leaseKeyToExponentialMovingAverageMap =
+ new ConcurrentHashMap<>();
+ private final Callable timeProviderInMillis;
+
+ /**
+ * This method provides happens-before semantics (i.e., the action (access or removal) from a thread happens
+ * before the action from subsequent thread) for the stats recording in multithreaded environment.
+ */
+ public void recordStats(@NonNull final LeaseStats leaseStats) {
+ final Queue leaseStatsQueue =
+ leaseStatsMap.computeIfAbsent(leaseStats.getLeaseKey(), lease -> new ConcurrentLinkedQueue<>());
+ leaseStatsQueue.add(leaseStats);
+ }
+
+ /**
+ * Calculates the throughput in KBps for the given leaseKey.
+ * Method first clears the items that are older than {@link #renewerFrequencyInMillis} from the queue and then
+ * calculates the throughput per second during {@link #renewerFrequencyInMillis} interval and then returns the
+ * ExponentialMovingAverage of the throughput. If method is called in quick succession with or without new stats
+ * the result can be different as ExponentialMovingAverage decays old values on every new call.
+ * This method is thread safe.
+ * @param leaseKey leaseKey for which stats are required
+ * @return throughput in Kbps, returns null if there is no stats available for the leaseKey.
+ */
+ public Double getThroughputKBps(final String leaseKey) {
+ final Queue leaseStatsQueue = leaseStatsMap.get(leaseKey);
+
+ if (isNull(leaseStatsQueue)) {
+ // This means there is no entry for this leaseKey yet
+ return null;
+ }
+
+ filterExpiredEntries(leaseStatsQueue);
+
+ // Convert bytes into KB and divide by interval in second to get throughput per second.
+ final ExponentialMovingAverage exponentialMovingAverage = leaseKeyToExponentialMovingAverageMap.computeIfAbsent(
+ leaseKey, leaseId -> new ExponentialMovingAverage(DEFAULT_ALPHA));
+
+ // Specifically dividing by 1000.0 rather than using Duration class to get seconds, because Duration class
+ // implementation rounds off to seconds and precision is lost.
+ final double frequency = renewerFrequencyInMillis / 1000.0;
+ final double throughput = readQueue(leaseStatsQueue).stream()
+ .mapToDouble(LeaseStats::getBytes)
+ .sum()
+ / BYTES_PER_KB
+ / frequency;
+ exponentialMovingAverage.add(throughput);
+ return exponentialMovingAverage.getValue();
+ }
+
+ /**
+ * Gets the currentTimeMillis and then iterates over the queue to get the stats with creation time less than
+ * currentTimeMillis.
+ * This is specifically done to avoid potential race between with high-frequency put thread blocking get thread.
+ */
+ private Queue readQueue(final Queue leaseStatsQueue) {
+ final long currentTimeMillis = getCurrenTimeInMillis();
+ final Queue response = new LinkedList<>();
+ for (LeaseStats leaseStats : leaseStatsQueue) {
+ if (leaseStats.creationTimeMillis > currentTimeMillis) {
+ break;
+ }
+ response.add(leaseStats);
+ }
+ return response;
+ }
+
+ private long getCurrenTimeInMillis() {
+ try {
+ return timeProviderInMillis.call();
+ } catch (final Exception e) {
+ // Fallback to using the System.currentTimeMillis if failed.
+ return System.currentTimeMillis();
+ }
+ }
+
+ private void filterExpiredEntries(final Queue leaseStatsQueue) {
+ final long currentTime = getCurrenTimeInMillis();
+ while (!leaseStatsQueue.isEmpty()) {
+ final LeaseStats leaseStats = leaseStatsQueue.peek();
+ if (isNull(leaseStats) || currentTime - leaseStats.getCreationTimeMillis() < renewerFrequencyInMillis) {
+ break;
+ }
+ leaseStatsQueue.poll();
+ }
+ }
+
+ /**
+ * Clear the in-memory stats for the lease when a lease is reassigned (due to shut down or lease stealing)
+ * @param leaseKey leaseKey, for which stats are supposed to be clear.
+ */
+ public void dropLeaseStats(final String leaseKey) {
+ leaseStatsMap.remove(leaseKey);
+ leaseKeyToExponentialMovingAverageMap.remove(leaseKey);
+ }
+
+ @Builder
+ @Getter
+ @ToString
+ @KinesisClientInternalApi
+ public static final class LeaseStats {
+ /**
+ * Lease key for which this leaseStats object is created.
+ */
+ private final String leaseKey;
+ /**
+ * Bytes that are processed for a lease
+ */
+ private final long bytes;
+ /**
+ * Wall time in epoch millis at which this leaseStats object was created. This time is used to determine the
+ * expiry of the lease stats.
+ */
+ @Builder.Default
+ private final long creationTimeMillis = System.currentTimeMillis();
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java
index add8cf4f6..9b63883bd 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java
@@ -71,7 +71,7 @@ public class ShardSyncTaskManager {
/**
* Constructor.
*
- * NOTE: This constructor is deprecated and will be removed in a future release.
+ * @deprecated This constructor is deprecated and will be removed in a future release.
*
* @param shardDetector
* @param leaseRefresher
@@ -92,18 +92,16 @@ public ShardSyncTaskManager(
long shardSyncIdleTimeMillis,
ExecutorService executorService,
MetricsFactory metricsFactory) {
- this.shardDetector = shardDetector;
- this.leaseRefresher = leaseRefresher;
- this.initialPositionInStream = initialPositionInStream;
- this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
- this.garbageCollectLeases = true;
- this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
- this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
- this.executorService = executorService;
- this.hierarchicalShardSyncer = new HierarchicalShardSyncer();
- this.metricsFactory = metricsFactory;
- this.shardSyncRequestPending = new AtomicBoolean(false);
- this.lock = new ReentrantLock();
+ this(
+ shardDetector,
+ leaseRefresher,
+ initialPositionInStream,
+ cleanupLeasesUponShardCompletion,
+ ignoreUnexpectedChildShards,
+ shardSyncIdleTimeMillis,
+ executorService,
+ new HierarchicalShardSyncer(),
+ metricsFactory);
}
/**
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
index bef76ef05..7eb4c4f1a 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
@@ -30,13 +31,17 @@
import java.util.stream.Collectors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
+import software.amazon.kinesis.coordinator.MigrationAdaptiveLeaseAssignmentModeProvider;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
+import software.amazon.kinesis.leases.LeaseDiscoverer;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer;
+import software.amazon.kinesis.leases.LeaseStatsRecorder;
import software.amazon.kinesis.leases.LeaseTaker;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardInfo;
@@ -44,6 +49,8 @@
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
+import software.amazon.kinesis.lifecycle.LeaseGracefulShutdownHandler;
+import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
@@ -70,115 +77,34 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
.setNameFormat("LeaseRenewer-%04d")
.setDaemon(true)
.build();
+ private static final ThreadFactory LEASE_DISCOVERY_THREAD_FACTORY = new ThreadFactoryBuilder()
+ .setNameFormat("LeaseDiscovery-%04d")
+ .setDaemon(true)
+ .build();
private final LeaseRenewer leaseRenewer;
private final LeaseTaker leaseTaker;
+ private final LeaseDiscoverer leaseDiscoverer;
private final long renewerIntervalMillis;
private final long takerIntervalMillis;
+ private final long leaseDiscovererIntervalMillis;
private final ExecutorService leaseRenewalThreadpool;
+ private final ExecutorService leaseDiscoveryThreadPool;
private final LeaseRefresher leaseRefresher;
+ private final LeaseStatsRecorder leaseStatsRecorder;
+ private final LeaseGracefulShutdownHandler leaseGracefulShutdownHandler;
private long initialLeaseTableReadCapacity;
private long initialLeaseTableWriteCapacity;
protected final MetricsFactory metricsFactory;
private final Object shutdownLock = new Object();
-
+ private final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig;
private ScheduledExecutorService leaseCoordinatorThreadPool;
+ private ScheduledFuture> leaseDiscoveryFuture;
private ScheduledFuture> takerFuture;
private volatile boolean running = false;
- /**
- * Constructor.
- *
- * NOTE: This constructor is deprecated and will be removed in a future release.
- *
- * @param leaseRefresher
- * LeaseRefresher instance to use
- * @param workerIdentifier
- * Identifies the worker (e.g. useful to track lease ownership)
- * @param leaseDurationMillis
- * Duration of a lease
- * @param epsilonMillis
- * Allow for some variance when calculating lease expirations
- * @param maxLeasesForWorker
- * Max leases this Worker can handle at a time
- * @param maxLeasesToStealAtOneTime
- * Steal up to these many leases at a time (for load balancing)
- * @param metricsFactory
- * Used to publish metrics about lease operations
- */
- @Deprecated
- public DynamoDBLeaseCoordinator(
- final LeaseRefresher leaseRefresher,
- final String workerIdentifier,
- final long leaseDurationMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewerThreadCount,
- final MetricsFactory metricsFactory) {
- this(
- leaseRefresher,
- workerIdentifier,
- leaseDurationMillis,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewerThreadCount,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
- metricsFactory);
- }
-
- /**
- * Constructor.
- *
- * @param leaseRefresher
- * LeaseRefresher instance to use
- * @param workerIdentifier
- * Identifies the worker (e.g. useful to track lease ownership)
- * @param leaseDurationMillis
- * Duration of a lease
- * @param epsilonMillis
- * Allow for some variance when calculating lease expirations
- * @param maxLeasesForWorker
- * Max leases this Worker can handle at a time
- * @param maxLeasesToStealAtOneTime
- * Steal up to these many leases at a time (for load balancing)
- * @param initialLeaseTableReadCapacity
- * Initial dynamodb lease table read iops if creating the lease table
- * @param initialLeaseTableWriteCapacity
- * Initial dynamodb lease table write iops if creating the lease table
- * @param metricsFactory
- * Used to publish metrics about lease operations
- */
- @Deprecated
- public DynamoDBLeaseCoordinator(
- final LeaseRefresher leaseRefresher,
- final String workerIdentifier,
- final long leaseDurationMillis,
- final long epsilonMillis,
- final int maxLeasesForWorker,
- final int maxLeasesToStealAtOneTime,
- final int maxLeaseRenewerThreadCount,
- final long initialLeaseTableReadCapacity,
- final long initialLeaseTableWriteCapacity,
- final MetricsFactory metricsFactory) {
- this(
- leaseRefresher,
- workerIdentifier,
- leaseDurationMillis,
- LeaseManagementConfig.DEFAULT_ENABLE_PRIORITY_LEASE_ASSIGNMENT,
- epsilonMillis,
- maxLeasesForWorker,
- maxLeasesToStealAtOneTime,
- maxLeaseRenewerThreadCount,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
- TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY,
- metricsFactory);
- }
-
/**
* Constructor.
*
@@ -214,17 +140,35 @@ public DynamoDBLeaseCoordinator(
final int maxLeaseRenewerThreadCount,
final long initialLeaseTableReadCapacity,
final long initialLeaseTableWriteCapacity,
- final MetricsFactory metricsFactory) {
+ final MetricsFactory metricsFactory,
+ final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig,
+ final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig,
+ final ConcurrentMap shardInfoShardConsumerMap) {
this.leaseRefresher = leaseRefresher;
- this.leaseRenewalThreadpool = getLeaseRenewalExecutorService(maxLeaseRenewerThreadCount);
+ this.leaseRenewalThreadpool = createExecutorService(maxLeaseRenewerThreadCount, LEASE_RENEWAL_THREAD_FACTORY);
this.leaseTaker = new DynamoDBLeaseTaker(leaseRefresher, workerIdentifier, leaseDurationMillis, metricsFactory)
.withMaxLeasesForWorker(maxLeasesForWorker)
.withMaxLeasesToStealAtOneTime(maxLeasesToStealAtOneTime)
.withEnablePriorityLeaseAssignment(enablePriorityLeaseAssignment);
- this.leaseRenewer = new DynamoDBLeaseRenewer(
- leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
+ // Should run once every leaseDurationMillis to identify new leases before expiry.
+ this.leaseDiscovererIntervalMillis = leaseDurationMillis - epsilonMillis;
+ this.leaseStatsRecorder = new LeaseStatsRecorder(renewerIntervalMillis, System::currentTimeMillis);
+ this.leaseGracefulShutdownHandler = LeaseGracefulShutdownHandler.create(
+ gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis(), shardInfoShardConsumerMap, this);
+ this.leaseRenewer = new DynamoDBLeaseRenewer(
+ leaseRefresher,
+ workerIdentifier,
+ leaseDurationMillis,
+ leaseRenewalThreadpool,
+ metricsFactory,
+ leaseStatsRecorder,
+ leaseGracefulShutdownHandler::enqueueShutdown);
+ this.leaseDiscoveryThreadPool =
+ createExecutorService(maxLeaseRenewerThreadCount, LEASE_DISCOVERY_THREAD_FACTORY);
+ this.leaseDiscoverer = new DynamoDBLeaseDiscoverer(
+ this.leaseRefresher, this.leaseRenewer, metricsFactory, workerIdentifier, leaseDiscoveryThreadPool);
if (initialLeaseTableReadCapacity <= 0) {
throw new IllegalArgumentException("readCapacity should be >= 1");
}
@@ -234,6 +178,7 @@ public DynamoDBLeaseCoordinator(
}
this.initialLeaseTableWriteCapacity = initialLeaseTableWriteCapacity;
this.metricsFactory = metricsFactory;
+ this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
log.info(
"With failover time {} ms and epsilon {} ms, LeaseCoordinator will renew leases every {} ms, take"
@@ -246,11 +191,49 @@ public DynamoDBLeaseCoordinator(
maxLeasesToStealAtOneTime);
}
+ @RequiredArgsConstructor
+ private class LeaseDiscoveryRunnable implements Runnable {
+ private final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider;
+
+ @Override
+ public void run() {
+ try {
+ // LeaseDiscoverer is run in WORKER_UTILIZATION_AWARE_ASSIGNMENT mode only
+ synchronized (shutdownLock) {
+ if (!leaseAssignmentModeProvider
+ .getLeaseAssignmentMode()
+ .equals(
+ MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode
+ .WORKER_UTILIZATION_AWARE_ASSIGNMENT)) {
+ return;
+ }
+ if (running) {
+ leaseRenewer.addLeasesToRenew(leaseDiscoverer.discoverNewLeases());
+ }
+ }
+ } catch (Exception e) {
+ log.error("Failed to execute lease discovery", e);
+ }
+ }
+ }
+
+ @RequiredArgsConstructor
private class TakerRunnable implements Runnable {
+ private final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider;
@Override
public void run() {
try {
+ // LeaseTaker is run in DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT mode only
+ synchronized (shutdownLock) {
+ if (!leaseAssignmentModeProvider
+ .getLeaseAssignmentMode()
+ .equals(
+ MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode
+ .DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT)) {
+ return;
+ }
+ }
runLeaseTaker();
} catch (LeasingException e) {
log.error("LeasingException encountered in lease taking thread", e);
@@ -290,18 +273,35 @@ public void initialize() throws ProvisionedThroughputException, DependencyExcept
}
@Override
- public void start() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
+ public void start(final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider)
+ throws DependencyException, InvalidStateException, ProvisionedThroughputException {
leaseRenewer.initialize();
+ // At max, we need 3 threads - lease renewer, lease taker, lease discoverer - to run without contention.
+ leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(3, LEASE_COORDINATOR_THREAD_FACTORY);
+
+ // During migration to KCLv3.x from KCLv2.x, lease assignment mode can change dynamically, so
+ // both lease assignment algorithms will be started but only one will execute based on
+ // leaseAssignmentModeProvider.getLeaseAssignmentMode(). However for new applications starting in
+ // KCLv3.x or applications successfully migrated to KCLv3.x, lease assignment mode will not
+ // change dynamically and will always be WORKER_UTILIZATION_AWARE_ASSIGNMENT, therefore
+ // don't initialize KCLv2.x lease assignment algorithm components that are not needed.
+ if (leaseAssignmentModeProvider.dynamicModeChangeSupportNeeded()) {
+ // Taker runs with fixed DELAY because we want it to run slower in the event of performance degradation.
+ takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(
+ new TakerRunnable(leaseAssignmentModeProvider), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS);
+ }
- // 2 because we know we'll have at most 2 concurrent tasks at a time.
- leaseCoordinatorThreadPool = Executors.newScheduledThreadPool(2, LEASE_COORDINATOR_THREAD_FACTORY);
+ leaseDiscoveryFuture = leaseCoordinatorThreadPool.scheduleAtFixedRate(
+ new LeaseDiscoveryRunnable(leaseAssignmentModeProvider),
+ 0L,
+ leaseDiscovererIntervalMillis,
+ TimeUnit.MILLISECONDS);
- // Taker runs with fixed DELAY because we want it to run slower in the event of performance degredation.
- takerFuture = leaseCoordinatorThreadPool.scheduleWithFixedDelay(
- new TakerRunnable(), 0L, takerIntervalMillis, TimeUnit.MILLISECONDS);
- // Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degredation.
+ // Renewer runs at fixed INTERVAL because we want it to run at the same rate in the event of degradation.
leaseCoordinatorThreadPool.scheduleAtFixedRate(
new RenewerRunnable(), 0L, renewerIntervalMillis, TimeUnit.MILLISECONDS);
+
+ leaseGracefulShutdownHandler.start();
running = true;
}
@@ -383,6 +383,8 @@ public void stop() {
}
leaseRenewalThreadpool.shutdownNow();
+ leaseCoordinatorThreadPool.shutdownNow();
+ leaseGracefulShutdownHandler.stop();
synchronized (shutdownLock) {
leaseRenewer.clearCurrentlyHeldLeases();
running = false;
@@ -393,6 +395,10 @@ public void stop() {
public void stopLeaseTaker() {
if (takerFuture != null) {
takerFuture.cancel(false);
+ leaseDiscoveryFuture.cancel(false);
+ // the method is called in worker graceful shutdown. We want to stop any further lease shutdown
+ // so we don't interrupt worker shutdown.
+ leaseGracefulShutdownHandler.stop();
}
}
@@ -418,20 +424,15 @@ public boolean updateLease(
}
/**
- * Returns executor service that should be used for lease renewal.
+ * Returns executor service for given ThreadFactory.
* @param maximumPoolSize Maximum allowed thread pool size
- * @return Executor service that should be used for lease renewal.
+ * @return Executor service
*/
- private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSize) {
+ private static ExecutorService createExecutorService(final int maximumPoolSize, final ThreadFactory threadFactory) {
int coreLeaseCount = Math.max(maximumPoolSize / 4, 2);
return new ThreadPoolExecutor(
- coreLeaseCount,
- maximumPoolSize,
- 60,
- TimeUnit.SECONDS,
- new LinkedTransferQueue<>(),
- LEASE_RENEWAL_THREAD_FACTORY);
+ coreLeaseCount, maximumPoolSize, 60, TimeUnit.SECONDS, new LinkedTransferQueue<>(), threadFactory);
}
@Override
@@ -472,6 +473,8 @@ public static ShardInfo convertLeaseToAssignment(final Lease lease) {
* {@inheritDoc}
*
* NOTE: This method is deprecated. Please set the initial capacity through the constructor.
+ *
+ * This is a method of the public lease coordinator interface.
*/
@Override
@Deprecated
@@ -487,6 +490,8 @@ public DynamoDBLeaseCoordinator initialLeaseTableReadCapacity(long readCapacity)
* {@inheritDoc}
*
* NOTE: This method is deprecated. Please set the initial capacity through the constructor.
+ *
+ * This is a method of the public lease coordinator interface.
*/
@Override
@Deprecated
@@ -497,4 +502,9 @@ public DynamoDBLeaseCoordinator initialLeaseTableWriteCapacity(long writeCapacit
initialLeaseTableWriteCapacity = writeCapacity;
return this;
}
+
+ @Override
+ public LeaseStatsRecorder leaseStatsRecorder() {
+ return leaseStatsRecorder;
+ }
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseDiscoverer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseDiscoverer.java
new file mode 100644
index 000000000..ce5605ee3
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseDiscoverer.java
@@ -0,0 +1,120 @@
+package software.amazon.kinesis.leases.dynamodb;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.leases.Lease;
+import software.amazon.kinesis.leases.LeaseDiscoverer;
+import software.amazon.kinesis.leases.LeaseRefresher;
+import software.amazon.kinesis.leases.LeaseRenewer;
+import software.amazon.kinesis.leases.exceptions.DependencyException;
+import software.amazon.kinesis.leases.exceptions.InvalidStateException;
+import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
+import software.amazon.kinesis.metrics.MetricsFactory;
+import software.amazon.kinesis.metrics.MetricsLevel;
+import software.amazon.kinesis.metrics.MetricsScope;
+import software.amazon.kinesis.metrics.MetricsUtil;
+
+import static java.util.Objects.isNull;
+
+/**
+ * An implementation of {@link LeaseDiscoverer}, it uses {@link LeaseRefresher} to query
+ * {@link DynamoDBLeaseRefresher#LEASE_OWNER_TO_LEASE_KEY_INDEX_NAME } and find the leases assigned
+ * to current worker and then filter and returns the leases that have not started processing (looks at
+ * {@link LeaseRenewer#getCurrentlyHeldLeases()} to find out which leases are currently held leases).
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class DynamoDBLeaseDiscoverer implements LeaseDiscoverer {
+
+ private final LeaseRefresher leaseRefresher;
+ private final LeaseRenewer leaseRenewer;
+ private final MetricsFactory metricsFactory;
+ private final String workerIdentifier;
+ private final ExecutorService executorService;
+
+ @Override
+ public List discoverNewLeases()
+ throws ProvisionedThroughputException, InvalidStateException, DependencyException {
+ final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, "LeaseDiscovery");
+ long startTime = System.currentTimeMillis();
+ boolean success = false;
+ try {
+ final Set currentHeldLeaseKeys =
+ leaseRenewer.getCurrentlyHeldLeases().keySet();
+
+ final long listLeaseKeysForWorkerStartTime = System.currentTimeMillis();
+ final List leaseKeys = leaseRefresher.listLeaseKeysForWorker(workerIdentifier);
+ MetricsUtil.addLatency(
+ metricsScope, "ListLeaseKeysForWorker", listLeaseKeysForWorkerStartTime, MetricsLevel.DETAILED);
+
+ final List newLeaseKeys = leaseKeys.stream()
+ .filter(leaseKey -> !currentHeldLeaseKeys.contains(leaseKey))
+ .collect(Collectors.toList());
+
+ final long fetchNewLeasesStartTime = System.currentTimeMillis();
+ final List> completableFutures = newLeaseKeys.stream()
+ .map(leaseKey ->
+ CompletableFuture.supplyAsync(() -> fetchLease(leaseKey, metricsScope), executorService))
+ .collect(Collectors.toList());
+
+ final List newLeases = completableFutures.stream()
+ .map(CompletableFuture::join)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ log.info(
+ "New leases assigned to worker : {}, count : {}, leases : {}",
+ workerIdentifier,
+ newLeases.size(),
+ newLeases.stream().map(Lease::leaseKey).collect(Collectors.toList()));
+
+ MetricsUtil.addLatency(metricsScope, "FetchNewLeases", fetchNewLeasesStartTime, MetricsLevel.DETAILED);
+
+ success = true;
+ MetricsUtil.addCount(metricsScope, "NewLeasesDiscovered", newLeases.size(), MetricsLevel.DETAILED);
+ return newLeases;
+ } finally {
+ MetricsUtil.addWorkerIdentifier(metricsScope, workerIdentifier);
+ MetricsUtil.addSuccessAndLatency(metricsScope, success, startTime, MetricsLevel.SUMMARY);
+ MetricsUtil.endScope(metricsScope);
+ }
+ }
+
+ private Lease fetchLease(final String leaseKey, final MetricsScope metricsScope) {
+ try {
+ final Lease lease = leaseRefresher.getLease(leaseKey);
+ if (isNull(lease)) {
+ return null;
+ }
+ // GSI is eventually consistent thus, validate that the fetched lease is indeed assigned to this
+ // worker, if not just pass in this run.
+ if (!lease.leaseOwner().equals(workerIdentifier)) {
+ MetricsUtil.addCount(metricsScope, "OwnerMismatch", 1, MetricsLevel.DETAILED);
+ return null;
+ }
+ // if checkpointOwner is not null, it means that the lease is still pending shutdown for the last owner.
+ // Don't add the lease to the in-memory map yet.
+ if (lease.checkpointOwner() != null) {
+ return null;
+ }
+ // when a new lease is discovered, set the lastCounterIncrementNanos to current time as the time
+ // when it has become visible, on next renewer interval this will be updated by LeaseRenewer to
+ // correct time.
+ lease.lastCounterIncrementNanos(System.nanoTime());
+ return lease;
+ } catch (final Exception e) {
+ // if getLease on some lease key fail, continue and fetch other leases, the one failed will
+ // be fetched in the next iteration or will be reassigned if stayed idle for long.
+ MetricsUtil.addCount(metricsScope, "GetLease:Error", 1, MetricsLevel.SUMMARY);
+ log.error("GetLease failed for leaseKey : {}", leaseKey, e);
+ return null;
+ }
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
index e5435bfc7..7d902afd3 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
@@ -17,22 +17,24 @@
import java.time.Duration;
import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
-import software.amazon.awssdk.core.util.DefaultSdkAutoConstructList;
+import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.Tag;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
-import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.DdbTableConfig;
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
-import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
@@ -42,12 +44,15 @@
import software.amazon.kinesis.leases.LeaseManagementFactory;
import software.amazon.kinesis.leases.LeaseSerializer;
import software.amazon.kinesis.leases.ShardDetector;
+import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
+import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.metrics.MetricsFactory;
/**
*
*/
+@Slf4j
@Data
@KinesisClientInternalApi
public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
@@ -68,849 +73,41 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final ExecutorService executorService;
@NonNull
- private final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer;
-
- @NonNull
- private final LeaseSerializer leaseSerializer;
-
- @NonNull
- private StreamConfig streamConfig;
-
- private Function