5
5
package net .snowflake .ingest .streaming .internal ;
6
6
7
7
import static net .snowflake .ingest .utils .Constants .DISABLE_BACKGROUND_FLUSH ;
8
- import static net .snowflake .ingest .utils .Constants .MAX_BLOB_SIZE_IN_BYTES ;
9
8
import static net .snowflake .ingest .utils .Constants .MAX_THREAD_COUNT ;
10
9
import static net .snowflake .ingest .utils .Constants .THREAD_SHUTDOWN_TIMEOUT_IN_SEC ;
11
10
import static net .snowflake .ingest .utils .Utils .getStackTrace ;
17
16
import java .security .InvalidAlgorithmParameterException ;
18
17
import java .security .InvalidKeyException ;
19
18
import java .security .NoSuchAlgorithmException ;
20
- import java .util .ArrayList ;
21
- import java .util .Collections ;
22
- import java .util .Iterator ;
23
- import java .util .List ;
24
- import java .util .Map ;
25
- import java .util .Objects ;
26
- import java .util .Optional ;
27
- import java .util .Set ;
19
+ import java .util .*;
28
20
import java .util .concurrent .CompletableFuture ;
29
21
import java .util .concurrent .ConcurrentHashMap ;
30
22
import java .util .concurrent .ExecutorService ;
@@ -377,180 +369,32 @@ private void createWorkers() {
377
369
Runtime .getRuntime ().availableProcessors ());
378
370
}
379
371
380
- /**
381
- * Distribute the flush tasks by iterating through all the channels in the channel cache and kick
382
- * off a build blob work when certain size has reached or we have reached the end
383
- *
384
- * @param tablesToFlush list of tables to flush
385
- */
372
+ private Iterator <ConcurrentHashMap <String , SnowflakeStreamingIngestChannelInternal <T >>> getChannelsToFlush (Set <String > tablesToFlush ) {
373
+ return this .channelCache .entrySet ().stream ()
374
+ .filter (e -> tablesToFlush .contains (e .getKey ()))
375
+ .map (Map .Entry ::getValue )
376
+ .iterator ();
377
+ }
378
+
386
379
void distributeFlushTasks (Set <String > tablesToFlush ) {
387
- Iterator <
388
- Map .Entry <
389
- String , ConcurrentHashMap <String , SnowflakeStreamingIngestChannelInternal <T >>>>
390
- itr =
391
- this .channelCache .entrySet ().stream ()
392
- .filter (e -> tablesToFlush .contains (e .getKey ()))
393
- .iterator ();
394
380
List <Pair <BlobData <T >, CompletableFuture <BlobMetadata >>> blobs = new ArrayList <>();
395
- List <ChannelData <T >> leftoverChannelsDataPerTable = new ArrayList <>();
396
381
397
382
// The API states that the number of available processors reported can change and therefore, we
398
383
// should poll it occasionally.
399
384
numProcessors = Runtime .getRuntime ().availableProcessors ();
400
- while (itr .hasNext () || !leftoverChannelsDataPerTable .isEmpty ()) {
401
- List <List <ChannelData <T >>> blobData = new ArrayList <>();
402
- float totalBufferSizeInBytes = 0F ;
403
-
404
- // Distribute work at table level, split the blob if reaching the blob size limit or the
405
- // channel has different encryption key ids
406
- while (itr .hasNext () || !leftoverChannelsDataPerTable .isEmpty ()) {
407
- List <ChannelData <T >> channelsDataPerTable = Collections .synchronizedList (new ArrayList <>());
408
- if (!leftoverChannelsDataPerTable .isEmpty ()) {
409
- channelsDataPerTable .addAll (leftoverChannelsDataPerTable );
410
- leftoverChannelsDataPerTable .clear ();
411
- } else if (blobData .size ()
412
- >= this .owningClient .getParameterProvider ().getMaxChunksInBlob ()) {
413
- // Create a new blob if the current one already contains max allowed number of chunks
414
- logger .logInfo (
415
- "Max allowed number of chunks in the current blob reached. chunkCount={}"
416
- + " maxChunkCount={}" ,
417
- blobData .size (),
418
- this .owningClient .getParameterProvider ().getMaxChunksInBlob ());
419
- break ;
420
- } else {
421
- ConcurrentHashMap <String , SnowflakeStreamingIngestChannelInternal <T >> table =
422
- itr .next ().getValue ();
423
- // Use parallel stream since getData could be the performance bottleneck when we have a
424
- // high number of channels
425
- table .values ().parallelStream ()
426
- .forEach (
427
- channel -> {
428
- if (channel .isValid ()) {
429
- ChannelData <T > data = channel .getData ();
430
- if (data != null ) {
431
- channelsDataPerTable .add (data );
432
- }
433
- }
434
- });
435
- }
436
-
437
- if (!channelsDataPerTable .isEmpty ()) {
438
- int idx = 0 ;
439
- float totalBufferSizePerTableInBytes = 0F ;
440
- while (idx < channelsDataPerTable .size ()) {
441
- ChannelData <T > channelData = channelsDataPerTable .get (idx );
442
- // Stop processing the rest of channels when needed
443
- if (idx > 0
444
- && shouldStopProcessing (
445
- totalBufferSizeInBytes ,
446
- totalBufferSizePerTableInBytes ,
447
- channelData ,
448
- channelsDataPerTable .get (idx - 1 ))) {
449
- leftoverChannelsDataPerTable .addAll (
450
- channelsDataPerTable .subList (idx , channelsDataPerTable .size ()));
451
- logger .logInfo (
452
- "Creation of another blob is needed because of blob/chunk size limit or"
453
- + " different encryption ids or different schema, client={}, table={},"
454
- + " blobSize={}, chunkSize={}, nextChannelSize={}, encryptionId1={},"
455
- + " encryptionId2={}, schema1={}, schema2={}" ,
456
- this .owningClient .getName (),
457
- channelData .getChannelContext ().getTableName (),
458
- totalBufferSizeInBytes ,
459
- totalBufferSizePerTableInBytes ,
460
- channelData .getBufferSize (),
461
- channelData .getChannelContext ().getEncryptionKeyId (),
462
- channelsDataPerTable .get (idx - 1 ).getChannelContext ().getEncryptionKeyId (),
463
- channelData .getColumnEps ().keySet (),
464
- channelsDataPerTable .get (idx - 1 ).getColumnEps ().keySet ());
465
- break ;
466
- }
467
- totalBufferSizeInBytes += channelData .getBufferSize ();
468
- totalBufferSizePerTableInBytes += channelData .getBufferSize ();
469
- idx ++;
470
- }
471
- // Add processed channels to the current blob, stop if we need to create a new blob
472
- blobData .add (channelsDataPerTable .subList (0 , idx ));
473
- if (idx != channelsDataPerTable .size ()) {
474
- break ;
475
- }
476
- }
477
- }
478
-
479
- if (blobData .isEmpty ()) {
480
- continue ;
481
- }
482
385
483
- // Kick off a build job
386
+ List < List < List < ChannelData < T >>>> allBlobData = buildBlobData ( getChannelsToFlush ( tablesToFlush ));
484
387
388
+ for (List <List <ChannelData <T >>> blob : allBlobData ) {
485
389
// Get the fully qualified table name from the first channel in the blob.
486
390
// This only matters when the client is in Iceberg mode. In Iceberg mode,
487
391
// all channels in the blob belong to the same table.
488
- String fullyQualifiedTableName =
489
- blobData .get (0 ).get (0 ).getChannelContext ().getFullyQualifiedTableName ();
490
-
392
+ final String fullyQualifiedTableName =
393
+ blob .get (0 ).get (0 ).getChannelContext ().getFullyQualifiedTableName ();
491
394
final BlobPath blobPath = this .storageManager .generateBlobPath (fullyQualifiedTableName );
492
395
493
- long flushStartMs = System .currentTimeMillis ();
494
- if (this .owningClient .flushLatency != null ) {
495
- latencyTimerContextMap .putIfAbsent (
496
- blobPath .fileRegistrationPath , this .owningClient .flushLatency .time ());
497
- }
498
-
499
- // Copy encryptionKeysPerTable from owning client
500
- Map <FullyQualifiedTableName , EncryptionKey > encryptionKeysPerTable =
501
- new ConcurrentHashMap <>();
502
- this .owningClient
503
- .getEncryptionKeysPerTable ()
504
- .forEach ((k , v ) -> encryptionKeysPerTable .put (k , new EncryptionKey (v )));
505
-
506
- Supplier <BlobMetadata > supplier =
507
- () -> {
508
- try {
509
- BlobMetadata blobMetadata =
510
- buildAndUpload (
511
- blobPath , blobData , fullyQualifiedTableName , encryptionKeysPerTable );
512
- blobMetadata .getBlobStats ().setFlushStartMs (flushStartMs );
513
- return blobMetadata ;
514
- } catch (Throwable e ) {
515
- Throwable ex = e .getCause () == null ? e : e .getCause ();
516
- String errorMessage =
517
- String .format (
518
- "Building blob failed, client=%s, blob=%s, exception=%s,"
519
- + " detail=%s, trace=%s, all channels in the blob will be"
520
- + " invalidated" ,
521
- this .owningClient .getName (),
522
- blobPath .fileRegistrationPath ,
523
- ex ,
524
- ex .getMessage (),
525
- getStackTrace (ex ));
526
- logger .logError (errorMessage );
527
- if (this .owningClient .getTelemetryService () != null ) {
528
- this .owningClient
529
- .getTelemetryService ()
530
- .reportClientFailure (this .getClass ().getSimpleName (), errorMessage );
531
- }
532
-
533
- if (e instanceof IOException ) {
534
- invalidateAllChannelsInBlob (blobData , errorMessage );
535
- return null ;
536
- } else if (e instanceof NoSuchAlgorithmException ) {
537
- throw new SFException (e , ErrorCode .MD5_HASHING_NOT_AVAILABLE );
538
- } else if (e instanceof InvalidAlgorithmParameterException
539
- | e instanceof NoSuchPaddingException
540
- | e instanceof IllegalBlockSizeException
541
- | e instanceof BadPaddingException
542
- | e instanceof InvalidKeyException ) {
543
- throw new SFException (e , ErrorCode .ENCRYPTION_FAILURE );
544
- } else {
545
- throw new SFException (e , ErrorCode .INTERNAL_ERROR , e .getMessage ());
546
- }
547
- }
548
- };
549
-
550
- blobs .add (
551
- new Pair <>(
552
- new BlobData <>(blobPath .fileRegistrationPath , blobData ),
553
- CompletableFuture .supplyAsync (supplier , this .buildUploadWorkers )));
396
+ // Kick off a build job
397
+ blobs .add (buildAndUploadBlob (fullyQualifiedTableName , blobPath , blob ));
554
398
555
399
logger .logInfo (
556
400
"buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}" ,
@@ -563,28 +407,78 @@ && shouldStopProcessing(
563
407
this .registerService .addBlobs (blobs );
564
408
}
565
409
566
- /**
567
- * Check whether we should stop merging more channels into the same chunk, we need to stop in a
568
- * few cases:
569
- *
570
- * <p>When the blob size is larger than a certain threshold
571
- *
572
- * <p>When the chunk size is larger than a certain threshold
573
- *
574
- * <p>When the schemas are not the same
575
- */
576
- private boolean shouldStopProcessing (
577
- float totalBufferSizeInBytes ,
578
- float totalBufferSizePerTableInBytes ,
579
- ChannelData <T > current ,
580
- ChannelData <T > prev ) {
581
- return totalBufferSizeInBytes + current .getBufferSize () > MAX_BLOB_SIZE_IN_BYTES
582
- || totalBufferSizePerTableInBytes + current .getBufferSize ()
583
- > this .owningClient .getParameterProvider ().getMaxChunkSizeInBytes ()
584
- || !Objects .equals (
585
- current .getChannelContext ().getEncryptionKeyId (),
586
- prev .getChannelContext ().getEncryptionKeyId ())
587
- || !current .getColumnEps ().keySet ().equals (prev .getColumnEps ().keySet ());
410
+ private List <List <List <ChannelData <T >>>> buildBlobData (Iterator <ConcurrentHashMap <String , SnowflakeStreamingIngestChannelInternal <T >>> tablesToFlush ) {
411
+ BlobDataBuilder <T > blobDataBuilder = new BlobDataBuilder <>(this .owningClient .getName (), this .owningClient .getParameterProvider ());
412
+ while (tablesToFlush .hasNext ()) {
413
+ ConcurrentHashMap <String , SnowflakeStreamingIngestChannelInternal <T >> next = tablesToFlush .next ();
414
+ Collection <SnowflakeStreamingIngestChannelInternal <T >> tableChannels = next .values ();
415
+ blobDataBuilder .appendDataForTable (tableChannels );
416
+ }
417
+
418
+ return blobDataBuilder .getAllBlobData ();
419
+ }
420
+
421
+ private Pair <BlobData <T >, CompletableFuture <BlobMetadata >> buildAndUploadBlob (String fullyQualifiedTableName , BlobPath blobPath , List <List <ChannelData <T >>> blobData ) {
422
+ long flushStartMs = System .currentTimeMillis ();
423
+ if (this .owningClient .flushLatency != null ) {
424
+ latencyTimerContextMap .putIfAbsent (
425
+ blobPath .fileRegistrationPath , this .owningClient .flushLatency .time ());
426
+ }
427
+
428
+ // Copy encryptionKeysPerTable from owning client
429
+ Map <FullyQualifiedTableName , EncryptionKey > encryptionKeysPerTable =
430
+ new ConcurrentHashMap <>();
431
+ this .owningClient
432
+ .getEncryptionKeysPerTable ()
433
+ .forEach ((k , v ) -> encryptionKeysPerTable .put (k , new EncryptionKey (v )));
434
+
435
+ Supplier <BlobMetadata > supplier =
436
+ () -> {
437
+ try {
438
+ BlobMetadata blobMetadata =
439
+ buildAndUpload (
440
+ blobPath , blobData , fullyQualifiedTableName , encryptionKeysPerTable );
441
+ blobMetadata .getBlobStats ().setFlushStartMs (flushStartMs );
442
+ return blobMetadata ;
443
+ } catch (Throwable e ) {
444
+ Throwable ex = e .getCause () == null ? e : e .getCause ();
445
+ String errorMessage =
446
+ String .format (
447
+ "Building blob failed, client=%s, blob=%s, exception=%s,"
448
+ + " detail=%s, trace=%s, all channels in the blob will be"
449
+ + " invalidated" ,
450
+ this .owningClient .getName (),
451
+ blobPath .fileRegistrationPath ,
452
+ ex ,
453
+ ex .getMessage (),
454
+ getStackTrace (ex ));
455
+ logger .logError (errorMessage );
456
+ if (this .owningClient .getTelemetryService () != null ) {
457
+ this .owningClient
458
+ .getTelemetryService ()
459
+ .reportClientFailure (this .getClass ().getSimpleName (), errorMessage );
460
+ }
461
+
462
+ if (e instanceof IOException ) {
463
+ invalidateAllChannelsInBlob (blobData , errorMessage );
464
+ return null ;
465
+ } else if (e instanceof NoSuchAlgorithmException ) {
466
+ throw new SFException (e , ErrorCode .MD5_HASHING_NOT_AVAILABLE );
467
+ } else if (e instanceof InvalidAlgorithmParameterException
468
+ | e instanceof NoSuchPaddingException
469
+ | e instanceof IllegalBlockSizeException
470
+ | e instanceof BadPaddingException
471
+ | e instanceof InvalidKeyException ) {
472
+ throw new SFException (e , ErrorCode .ENCRYPTION_FAILURE );
473
+ } else {
474
+ throw new SFException (e , ErrorCode .INTERNAL_ERROR , e .getMessage ());
475
+ }
476
+ }
477
+ };
478
+
479
+ return new Pair <>(
480
+ new BlobData <>(blobPath .fileRegistrationPath , blobData ),
481
+ CompletableFuture .supplyAsync (supplier , this .buildUploadWorkers ));
588
482
}
589
483
590
484
/**
0 commit comments