Skip to content

Commit 7b5ebae

Browse files
authored
Calculate scan segment by table size (#1443)
* Fixes DDB usage spike issue * Removed un-necessary exception handling * made max total segment size 30 * Cached total scan segment * Reuse listLeasesParallely api to dynamically calculate total segment * Added unit tests and made getParallelScanTotalSegments synchronous * Simplified getParallelScanTotalSegments method * Fall back to previously calculated totalScan * fixed formating
1 parent e856e7c commit 7b5ebae

File tree

4 files changed

+216
-11
lines changed

4 files changed

+216
-11
lines changed

amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,6 @@ public final class LeaseAssignmentManager {
8686
*/
8787
private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2;
8888

89-
/**
90-
* Default parallelism factor for scaling lease table.
91-
*/
92-
private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
93-
9489
private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease";
9590

9691
/**
@@ -689,8 +684,8 @@ private CompletableFuture<List<WorkerMetricStats>> loadWorkerMetricStats() {
689684
}
690685

691686
private CompletableFuture<Map.Entry<List<Lease>, List<String>>> loadLeaseListAsync() {
692-
return CompletableFuture.supplyAsync(() -> loadWithRetry(() -> leaseRefresher.listLeasesParallely(
693-
LEASE_ASSIGNMENT_CALL_THREAD_POOL, DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR)));
687+
return CompletableFuture.supplyAsync(() ->
688+
loadWithRetry(() -> leaseRefresher.listLeasesParallely(LEASE_ASSIGNMENT_CALL_THREAD_POOL, 0)));
694689
}
695690

696691
private <T> T loadWithRetry(final Callable<T> loadFunction) {

amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,9 @@ default List<String> listLeaseKeysForWorker(final String workerIdentifier)
154154
* List all leases from the storage parallely and deserialize into Lease objects. Returns the list of leaseKey
155155
* that failed deserialize separately.
156156
*
157-
* @param threadPool threadpool to use for parallel scan
158-
* @param parallelismFactor no. of parallel scans
157+
* @param threadPool thread pool to use for parallel scan
158+
* @param parallelismFactor no. of parallel scans.
159+
* If parallelismFactor is 0 then parallelismFactor will be calculated based on table size
159160
* @return Pair of List of leases from the storage and List of items failed to deserialize
160161
* @throws DependencyException if DynamoDB scan fails in an unexpected way
161162
* @throws InvalidStateException if lease table does not exist

amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java

+60-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package software.amazon.kinesis.leases.dynamodb;
1616

1717
import java.time.Duration;
18+
import java.time.Instant;
1819
import java.util.AbstractMap;
1920
import java.util.ArrayList;
2021
import java.util.Collection;
@@ -122,6 +123,20 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
122123
private static final String LEASE_OWNER_INDEX_QUERY_CONDITIONAL_EXPRESSION =
123124
String.format("%s = %s", LEASE_OWNER_KEY, DDB_LEASE_OWNER);
124125

126+
/**
127+
* Default parallelism factor for scaling lease table.
128+
*/
129+
private static final int DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR = 10;
130+
131+
private static final long NUMBER_OF_BYTES_PER_GB = 1024 * 1024 * 1024;
132+
private static final double GB_PER_SEGMENT = 0.2;
133+
private static final int MIN_SCAN_SEGMENTS = 1;
134+
private static final int MAX_SCAN_SEGMENTS = 30;
135+
136+
private Integer cachedTotalSegments;
137+
private Instant expirationTimeForTotalSegmentsCache;
138+
private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2);
139+
125140
private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) {
126141
final DdbTableConfig tableConfig = new DdbTableConfig();
127142
tableConfig.billingMode(billingMode);
@@ -553,9 +568,17 @@ public Map.Entry<List<Lease>, List<String>> listLeasesParallely(
553568
final List<String> leaseItemFailedDeserialize = new ArrayList<>();
554569
final List<Lease> response = new ArrayList<>();
555570
final List<Future<List<Map<String, AttributeValue>>>> futures = new ArrayList<>();
556-
for (int i = 0; i < parallelScanTotalSegment; ++i) {
571+
572+
final int totalSegments;
573+
if (parallelScanTotalSegment > 0) {
574+
totalSegments = parallelScanTotalSegment;
575+
} else {
576+
totalSegments = getParallelScanTotalSegments();
577+
}
578+
579+
for (int i = 0; i < totalSegments; ++i) {
557580
final int segmentNumber = i;
558-
futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, parallelScanTotalSegment)));
581+
futures.add(parallelScanExecutorService.submit(() -> scanSegment(segmentNumber, totalSegments)));
559582
}
560583
try {
561584
for (final Future<List<Map<String, AttributeValue>>> future : futures) {
@@ -586,6 +609,41 @@ public Map.Entry<List<Lease>, List<String>> listLeasesParallely(
586609
return new AbstractMap.SimpleEntry<>(response, leaseItemFailedDeserialize);
587610
}
588611

612+
/**
613+
* Calculates the optimal number of parallel scan segments for a DynamoDB table based on its size.
614+
* The calculation follows these rules:
615+
* - Each segment handles 0.2GB (214,748,364 bytes) of data
616+
* - For empty tables or tables smaller than 0.2GB, uses 1 segment
617+
* - Number of segments scales linearly with table size
618+
*
619+
* @return The number of segments to use for parallel scan, minimum 1
620+
*/
621+
private synchronized int getParallelScanTotalSegments() throws DependencyException {
622+
if (isTotalSegmentsCacheValid()) {
623+
return cachedTotalSegments;
624+
}
625+
626+
int parallelScanTotalSegments =
627+
cachedTotalSegments == null ? DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR : cachedTotalSegments;
628+
final DescribeTableResponse describeTableResponse = describeLeaseTable();
629+
if (describeTableResponse == null) {
630+
log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments);
631+
} else {
632+
final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB;
633+
parallelScanTotalSegments = Math.min(
634+
Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS);
635+
636+
log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments);
637+
}
638+
cachedTotalSegments = parallelScanTotalSegments;
639+
expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS);
640+
return parallelScanTotalSegments;
641+
}
642+
643+
private boolean isTotalSegmentsCacheValid() {
644+
return cachedTotalSegments != null && Instant.now().isBefore(expirationTimeForTotalSegmentsCache);
645+
}
646+
589647
private List<Map<String, AttributeValue>> scanSegment(final int segment, final int parallelScanTotalSegment)
590648
throws DependencyException {
591649

amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherTest.java

+151
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import com.amazonaws.services.dynamodbv2.local.embedded.DynamoDBEmbedded;
1212
import com.google.common.collect.ImmutableMap;
1313
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.params.ParameterizedTest;
15+
import org.junit.jupiter.params.provider.CsvSource;
1416
import org.mockito.Mockito;
1517
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
1618
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -22,6 +24,8 @@
2224
import software.amazon.awssdk.services.dynamodb.model.IndexStatus;
2325
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
2426
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
27+
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
28+
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
2529
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
2630
import software.amazon.awssdk.services.dynamodb.model.TableStatus;
2731
import software.amazon.awssdk.services.dynamodb.model.UpdateContinuousBackupsRequest;
@@ -393,6 +397,153 @@ void listLeasesParallely_leaseWithFailingDeserialization_assertCorrectResponse()
393397
assertEquals("badLeaseKey", response.getValue().get(0));
394398
}
395399

400+
@Test
401+
public void listLeasesParallely_UseCachedTotalSegment()
402+
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
403+
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
404+
final long oneGBInBytes = 1073741824L;
405+
406+
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
407+
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
408+
.table(TableDescription.builder()
409+
.tableName(TEST_LEASE_TABLE)
410+
.tableStatus(TableStatus.ACTIVE)
411+
.tableSizeBytes(oneGBInBytes)
412+
.build())
413+
.build()));
414+
when(mockDdbClient.scan(any(ScanRequest.class)))
415+
.thenReturn(CompletableFuture.completedFuture(
416+
ScanResponse.builder().items(new ArrayList<>()).build()));
417+
418+
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
419+
TEST_LEASE_TABLE,
420+
mockDdbClient,
421+
new DynamoDBLeaseSerializer(),
422+
true,
423+
NOOP_TABLE_CREATOR_CALLBACK,
424+
Duration.ofSeconds(10),
425+
new DdbTableConfig(),
426+
true,
427+
true,
428+
new ArrayList<>());
429+
430+
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
431+
verify(mockDdbClient, times(5)).scan(any(ScanRequest.class));
432+
433+
// calling second to test cached value is used
434+
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
435+
436+
// verify if describe table is called once even when listLeasesParallely is called twice
437+
verify(mockDdbClient, times(1)).describeTable(any(DescribeTableRequest.class));
438+
verify(mockDdbClient, times(10)).scan(any(ScanRequest.class));
439+
}
440+
441+
@Test
442+
public void listLeasesParallely_DescribeTableNotCalledWhenSegmentGreaterThanZero()
443+
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
444+
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
445+
446+
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
447+
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
448+
.table(TableDescription.builder()
449+
.tableName(TEST_LEASE_TABLE)
450+
.tableStatus(TableStatus.ACTIVE)
451+
.tableSizeBytes(1000L)
452+
.build())
453+
.build()));
454+
when(mockDdbClient.scan(any(ScanRequest.class)))
455+
.thenReturn(CompletableFuture.completedFuture(
456+
ScanResponse.builder().items(new ArrayList<>()).build()));
457+
458+
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
459+
TEST_LEASE_TABLE,
460+
mockDdbClient,
461+
new DynamoDBLeaseSerializer(),
462+
true,
463+
NOOP_TABLE_CREATOR_CALLBACK,
464+
Duration.ofSeconds(10),
465+
new DdbTableConfig(),
466+
true,
467+
true,
468+
new ArrayList<>());
469+
470+
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 2);
471+
verify(mockDdbClient, times(0)).describeTable(any(DescribeTableRequest.class));
472+
}
473+
474+
@Test
475+
public void listLeasesParallely_TotalSegmentIsDefaultWhenDescribeTableThrowsException()
476+
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
477+
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
478+
479+
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
480+
.thenThrow(ResourceNotFoundException.builder()
481+
.message("Mock table does not exist scenario")
482+
.build());
483+
484+
when(mockDdbClient.scan(any(ScanRequest.class)))
485+
.thenReturn(CompletableFuture.completedFuture(
486+
ScanResponse.builder().items(new ArrayList<>()).build()));
487+
488+
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
489+
TEST_LEASE_TABLE,
490+
mockDdbClient,
491+
new DynamoDBLeaseSerializer(),
492+
true,
493+
NOOP_TABLE_CREATOR_CALLBACK,
494+
Duration.ofSeconds(10),
495+
new DdbTableConfig(),
496+
true,
497+
true,
498+
new ArrayList<>());
499+
500+
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
501+
verify(mockDdbClient, times(10)).scan(any(ScanRequest.class));
502+
}
503+
504+
@ParameterizedTest
505+
@CsvSource({
506+
"0, 1", // 0
507+
"1024, 1", // 1KB
508+
"104857600, 1", // 100MB
509+
"214748364, 1", // 0.2GB
510+
"322122547, 2", // 1.3GB
511+
"1073741824, 5", // 1GB
512+
"2147483648, 10", // 2GB
513+
"5368709120, 25", // 5GB
514+
})
515+
public void listLeasesParallely_TotalSegmentForDifferentTableSize(long tableSizeBytes, int totalSegments)
516+
throws ProvisionedThroughputException, DependencyException, InvalidStateException {
517+
DynamoDbAsyncClient mockDdbClient = mock(DynamoDbAsyncClient.class);
518+
519+
when(mockDdbClient.describeTable(any(DescribeTableRequest.class)))
520+
.thenReturn(CompletableFuture.completedFuture(DescribeTableResponse.builder()
521+
.table(TableDescription.builder()
522+
.tableName(TEST_LEASE_TABLE)
523+
.tableStatus(TableStatus.ACTIVE)
524+
.tableSizeBytes(tableSizeBytes)
525+
.build())
526+
.build()));
527+
when(mockDdbClient.scan(any(ScanRequest.class)))
528+
.thenReturn(CompletableFuture.completedFuture(
529+
ScanResponse.builder().items(new ArrayList<>()).build()));
530+
531+
final LeaseRefresher leaseRefresher = new DynamoDBLeaseRefresher(
532+
TEST_LEASE_TABLE,
533+
mockDdbClient,
534+
new DynamoDBLeaseSerializer(),
535+
true,
536+
NOOP_TABLE_CREATOR_CALLBACK,
537+
Duration.ofSeconds(10),
538+
new DdbTableConfig(),
539+
true,
540+
true,
541+
new ArrayList<>());
542+
543+
leaseRefresher.listLeasesParallely(Executors.newFixedThreadPool(2), 0);
544+
verify(mockDdbClient, times(totalSegments)).scan(any(ScanRequest.class));
545+
}
546+
396547
@Test
397548
void initiateGracefulLeaseHandoff_sanity() throws Exception {
398549
DynamoDBLeaseRefresher leaseRefresher = createLeaseRefresher(new DdbTableConfig(), dynamoDbAsyncClient);

0 commit comments

Comments
 (0)