From f61388bca85c60b1595768566eea6530bc9ce43d Mon Sep 17 00:00:00 2001 From: Brian Kroeker Date: Fri, 3 Jan 2025 10:07:56 -0800 Subject: [PATCH] This splits the logic to structure channel data into blob collections out into a separate class and refactors the main nested loop into separate methods within that new class. The collections produced by the new class should be identical to those prior to the change. --- .../streaming/internal/BlobDataBuilder.java | 155 ++++++++++ .../streaming/internal/ChannelCache.java | 18 +- .../internal/ChannelsStatusRequest.java | 2 +- .../streaming/internal/FlushService.java | 278 ++++++------------ ...wflakeStreamingIngestChannelFlushable.java | 23 ++ ...owflakeStreamingIngestChannelInternal.java | 27 +- ...nowflakeStreamingIngestClientInternal.java | 24 +- .../streaming/internal/ChannelCacheTest.java | 12 +- 8 files changed, 309 insertions(+), 230 deletions(-) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/BlobDataBuilder.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFlushable.java diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobDataBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobDataBuilder.java new file mode 100644 index 000000000..8892f2493 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobDataBuilder.java @@ -0,0 +1,155 @@ +package net.snowflake.ingest.streaming.internal; + +import net.snowflake.ingest.utils.Logging; +import net.snowflake.ingest.utils.ParameterProvider; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static net.snowflake.ingest.utils.Constants.MAX_BLOB_SIZE_IN_BYTES; + +/** + * Responsible for accepting data from channels and collating into collections that will be used to build the actual blobs + *

+ * A chunk is represented as a list of channel data from a single table + * A blob is represented as a list of chunks that must share the same schema (but not necessarily the same table) + *

+ * This class returns a list of blobs + */ +class BlobDataBuilder { + private static final Logging logger = new Logging(BlobDataBuilder.class); + private final List>>> allBlobs; + private final ParameterProvider parameterProvider; + private final String clientName; + private List>> currentBlob; + private ChannelData prevChannelData = null; + private float totalCurrentBlobSizeInBytes = 0F; + private float totalBufferSizeInBytes = 0F; + + public BlobDataBuilder(String clientName, ParameterProvider parameterProvider) { + this.clientName = clientName; + this.parameterProvider = parameterProvider; + this.currentBlob = new ArrayList<>(); + this.allBlobs = new ArrayList<>(); + } + + public List>>> getAllBlobData() { + addCurrentBlob(); + return allBlobs; + } + + public void appendDataForTable(Collection> tableChannels) { + List> chunk = getChunkForTable(tableChannels); + appendChunk(chunk); + } + + private List> getChunkForTable(Collection> tableChannels) { + List> channelsDataPerTable = Collections.synchronizedList(new ArrayList<>()); + // Use parallel stream since getData could be the performance bottleneck when we have a + // high number of channels + tableChannels.parallelStream() + .forEach( + channel -> { + if (channel.isValid()) { + ChannelData data = channel.getData(); + if (data != null) { + channelsDataPerTable.add(data); + } + } + }); + return channelsDataPerTable; + } + + private void appendChunk(List> chunkData) { + if (chunkData.isEmpty()) { + return; + } + + if (currentBlob.size() >= parameterProvider.getMaxChunksInBlob()) { + // Create a new blob if the current one already contains max allowed number of chunks + logger.logInfo( + "Max allowed number of chunks in the current blob reached. chunkCount={}" + + " maxChunkCount={}", + currentBlob.size(), + parameterProvider.getMaxChunksInBlob()); + + addCurrentBlob(); + } + + int i, start = 0; + for (i = 0; i < chunkData.size(); i++) { + ChannelData channelData = chunkData.get(i); + if (prevChannelData != null && shouldStopProcessing( + totalCurrentBlobSizeInBytes, + totalBufferSizeInBytes, + channelData, + prevChannelData)) { + logger.logInfo( + "Creation of another blob is needed because of blob/chunk size limit or" + + " different encryption ids or different schema, client={}, table={}," + + " blobSize={}, chunkSize={}, nextChannelSize={}, encryptionId1={}," + + " encryptionId2={}, schema1={}, schema2={}", + clientName, + channelData.getChannelContext().getTableName(), + totalCurrentBlobSizeInBytes, + totalBufferSizeInBytes, + channelData.getBufferSize(), + channelData.getChannelContext().getEncryptionKeyId(), + prevChannelData.getChannelContext().getEncryptionKeyId(), + channelData.getColumnEps().keySet(), + prevChannelData.getColumnEps().keySet()); + + if (i != start) { + currentBlob.add(chunkData.subList(start, i)); + start = i; + } + + addCurrentBlob(); + } + + totalCurrentBlobSizeInBytes += channelData.getBufferSize(); + totalBufferSizeInBytes += channelData.getBufferSize(); + prevChannelData = channelData; + } + + if (i != start) { + currentBlob.add(chunkData.subList(start, i)); + } + } + + private void addCurrentBlob() { + if (!currentBlob.isEmpty()) { + allBlobs.add(currentBlob); + currentBlob = new ArrayList<>(); + } + totalBufferSizeInBytes = 0; + totalCurrentBlobSizeInBytes = 0; + } + + /** + * Check whether we should stop merging more channels into the same chunk, we need to stop in a + * few cases: + * + *

When the blob size is larger than a certain threshold + * + *

When the chunk size is larger than a certain threshold + * + *

When the schemas are not the same + */ + private boolean shouldStopProcessing( + float totalBufferSizeInBytes, + float totalBufferSizePerTableInBytes, + ChannelData current, + ChannelData prev) { + return totalBufferSizeInBytes + current.getBufferSize() > MAX_BLOB_SIZE_IN_BYTES + || totalBufferSizePerTableInBytes + current.getBufferSize() + > parameterProvider.getMaxChunkSizeInBytes() + || !Objects.equals( + current.getChannelContext().getEncryptionKeyId(), + prev.getChannelContext().getEncryptionKeyId()) + || !current.getColumnEps().keySet().equals(prev.getColumnEps().keySet()); + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java index 90c0f2ac9..ad3f487de 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java @@ -23,7 +23,7 @@ class ChannelCache { // Cache to hold all the valid channels, the key for the outer map is FullyQualifiedTableName and // the key for the inner map is ChannelName private final ConcurrentHashMap< - String, ConcurrentHashMap>> + String, ConcurrentHashMap>> cache = new ConcurrentHashMap<>(); /** Flush information for each table including last flush time and if flush is needed */ @@ -45,8 +45,8 @@ static class FlushInfo { * * @param channel */ - void addChannel(SnowflakeStreamingIngestChannelInternal channel) { - ConcurrentHashMap> channels = + void addChannel(SnowflakeStreamingIngestChannelFlushable channel) { + ConcurrentHashMap> channels = this.cache.computeIfAbsent( channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>()); @@ -55,7 +55,7 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) { this.tableFlushInfo.putIfAbsent( channel.getFullyQualifiedTableName(), new FlushInfo(System.currentTimeMillis(), false)); - SnowflakeStreamingIngestChannelInternal oldChannel = + SnowflakeStreamingIngestChannelFlushable oldChannel = channels.put(channel.getName(), channel); // Invalidate old channel if it exits to block new inserts and return error to users earlier if (oldChannel != null) { @@ -136,7 +136,7 @@ void setNeedFlush(String fullyQualifiedTableName, boolean needFlush) { } /** Returns an immutable set view of the mappings contained in the channel cache. */ - Set>>> + Set>>> entrySet() { return Collections.unmodifiableSet(cache.entrySet()); } @@ -155,11 +155,11 @@ void closeAllChannels() { /** Remove a channel in the channel cache if the channel sequencer matches */ // TODO: background cleaner to cleanup old stale channels that are not closed? - void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal channel) { + void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelFlushable channel) { cache.computeIfPresent( channel.getFullyQualifiedTableName(), (k, v) -> { - SnowflakeStreamingIngestChannelInternal channelInCache = v.get(channel.getName()); + SnowflakeStreamingIngestChannelFlushable channelInCache = v.get(channel.getName()); // We need to compare the channel sequencer in case the old channel was already been // removed return channelInCache != null @@ -180,10 +180,10 @@ void invalidateChannelIfSequencersMatch( Long channelSequencer, String invalidationCause) { String fullyQualifiedTableName = String.format("%s.%s.%s", dbName, schemaName, tableName); - ConcurrentHashMap> channelsMapPerTable = + ConcurrentHashMap> channelsMapPerTable = cache.get(fullyQualifiedTableName); if (channelsMapPerTable != null) { - SnowflakeStreamingIngestChannelInternal channel = channelsMapPerTable.get(channelName); + SnowflakeStreamingIngestChannelFlushable channel = channelsMapPerTable.get(channelName); if (channel != null && channel.getChannelSequencer().equals(channelSequencer)) { channel.invalidate("invalidate with matched sequencer", invalidationCause); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java index 025647f14..1cf5272e3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java @@ -29,7 +29,7 @@ static class ChannelStatusRequestDTO { // Client Sequencer private final Long clientSequencer; - ChannelStatusRequestDTO(SnowflakeStreamingIngestChannelInternal channel) { + ChannelStatusRequestDTO(SnowflakeStreamingIngestChannelFlushable channel) { this.channelName = channel.getName(); this.databaseName = channel.getDBName(); this.schemaName = channel.getSchemaName(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index f0f87e889..15da0160d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -5,7 +5,6 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.DISABLE_BACKGROUND_FLUSH; -import static net.snowflake.ingest.utils.Constants.MAX_BLOB_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.MAX_THREAD_COUNT; import static net.snowflake.ingest.utils.Constants.THREAD_SHUTDOWN_TIMEOUT_IN_SEC; import static net.snowflake.ingest.utils.Utils.getStackTrace; @@ -17,14 +16,7 @@ import java.security.InvalidAlgorithmParameterException; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -377,180 +369,32 @@ private void createWorkers() { Runtime.getRuntime().availableProcessors()); } - /** - * Distribute the flush tasks by iterating through all the channels in the channel cache and kick - * off a build blob work when certain size has reached or we have reached the end - * - * @param tablesToFlush list of tables to flush - */ + private Iterator>> getChannelsToFlush(Set tablesToFlush) { + return this.channelCache.entrySet().stream() + .filter(e -> tablesToFlush.contains(e.getKey())) + .map(Map.Entry::getValue) + .iterator(); + } + void distributeFlushTasks(Set tablesToFlush) { - Iterator< - Map.Entry< - String, ConcurrentHashMap>>> - itr = - this.channelCache.entrySet().stream() - .filter(e -> tablesToFlush.contains(e.getKey())) - .iterator(); List, CompletableFuture>> blobs = new ArrayList<>(); - List> leftoverChannelsDataPerTable = new ArrayList<>(); // The API states that the number of available processors reported can change and therefore, we // should poll it occasionally. numProcessors = Runtime.getRuntime().availableProcessors(); - while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) { - List>> blobData = new ArrayList<>(); - float totalBufferSizeInBytes = 0F; - - // Distribute work at table level, split the blob if reaching the blob size limit or the - // channel has different encryption key ids - while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) { - List> channelsDataPerTable = Collections.synchronizedList(new ArrayList<>()); - if (!leftoverChannelsDataPerTable.isEmpty()) { - channelsDataPerTable.addAll(leftoverChannelsDataPerTable); - leftoverChannelsDataPerTable.clear(); - } else if (blobData.size() - >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) { - // Create a new blob if the current one already contains max allowed number of chunks - logger.logInfo( - "Max allowed number of chunks in the current blob reached. chunkCount={}" - + " maxChunkCount={}", - blobData.size(), - this.owningClient.getParameterProvider().getMaxChunksInBlob()); - break; - } else { - ConcurrentHashMap> table = - itr.next().getValue(); - // Use parallel stream since getData could be the performance bottleneck when we have a - // high number of channels - table.values().parallelStream() - .forEach( - channel -> { - if (channel.isValid()) { - ChannelData data = channel.getData(); - if (data != null) { - channelsDataPerTable.add(data); - } - } - }); - } - - if (!channelsDataPerTable.isEmpty()) { - int idx = 0; - float totalBufferSizePerTableInBytes = 0F; - while (idx < channelsDataPerTable.size()) { - ChannelData channelData = channelsDataPerTable.get(idx); - // Stop processing the rest of channels when needed - if (idx > 0 - && shouldStopProcessing( - totalBufferSizeInBytes, - totalBufferSizePerTableInBytes, - channelData, - channelsDataPerTable.get(idx - 1))) { - leftoverChannelsDataPerTable.addAll( - channelsDataPerTable.subList(idx, channelsDataPerTable.size())); - logger.logInfo( - "Creation of another blob is needed because of blob/chunk size limit or" - + " different encryption ids or different schema, client={}, table={}," - + " blobSize={}, chunkSize={}, nextChannelSize={}, encryptionId1={}," - + " encryptionId2={}, schema1={}, schema2={}", - this.owningClient.getName(), - channelData.getChannelContext().getTableName(), - totalBufferSizeInBytes, - totalBufferSizePerTableInBytes, - channelData.getBufferSize(), - channelData.getChannelContext().getEncryptionKeyId(), - channelsDataPerTable.get(idx - 1).getChannelContext().getEncryptionKeyId(), - channelData.getColumnEps().keySet(), - channelsDataPerTable.get(idx - 1).getColumnEps().keySet()); - break; - } - totalBufferSizeInBytes += channelData.getBufferSize(); - totalBufferSizePerTableInBytes += channelData.getBufferSize(); - idx++; - } - // Add processed channels to the current blob, stop if we need to create a new blob - blobData.add(channelsDataPerTable.subList(0, idx)); - if (idx != channelsDataPerTable.size()) { - break; - } - } - } - - if (blobData.isEmpty()) { - continue; - } - // Kick off a build job + List>>> allBlobData = buildBlobData(getChannelsToFlush(tablesToFlush)); + for (List>> blob : allBlobData) { // Get the fully qualified table name from the first channel in the blob. // This only matters when the client is in Iceberg mode. In Iceberg mode, // all channels in the blob belong to the same table. - String fullyQualifiedTableName = - blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName(); - + final String fullyQualifiedTableName = + blob.get(0).get(0).getChannelContext().getFullyQualifiedTableName(); final BlobPath blobPath = this.storageManager.generateBlobPath(fullyQualifiedTableName); - long flushStartMs = System.currentTimeMillis(); - if (this.owningClient.flushLatency != null) { - latencyTimerContextMap.putIfAbsent( - blobPath.fileRegistrationPath, this.owningClient.flushLatency.time()); - } - - // Copy encryptionKeysPerTable from owning client - Map encryptionKeysPerTable = - new ConcurrentHashMap<>(); - this.owningClient - .getEncryptionKeysPerTable() - .forEach((k, v) -> encryptionKeysPerTable.put(k, new EncryptionKey(v))); - - Supplier supplier = - () -> { - try { - BlobMetadata blobMetadata = - buildAndUpload( - blobPath, blobData, fullyQualifiedTableName, encryptionKeysPerTable); - blobMetadata.getBlobStats().setFlushStartMs(flushStartMs); - return blobMetadata; - } catch (Throwable e) { - Throwable ex = e.getCause() == null ? e : e.getCause(); - String errorMessage = - String.format( - "Building blob failed, client=%s, blob=%s, exception=%s," - + " detail=%s, trace=%s, all channels in the blob will be" - + " invalidated", - this.owningClient.getName(), - blobPath.fileRegistrationPath, - ex, - ex.getMessage(), - getStackTrace(ex)); - logger.logError(errorMessage); - if (this.owningClient.getTelemetryService() != null) { - this.owningClient - .getTelemetryService() - .reportClientFailure(this.getClass().getSimpleName(), errorMessage); - } - - if (e instanceof IOException) { - invalidateAllChannelsInBlob(blobData, errorMessage); - return null; - } else if (e instanceof NoSuchAlgorithmException) { - throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE); - } else if (e instanceof InvalidAlgorithmParameterException - | e instanceof NoSuchPaddingException - | e instanceof IllegalBlockSizeException - | e instanceof BadPaddingException - | e instanceof InvalidKeyException) { - throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE); - } else { - throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()); - } - } - }; - - blobs.add( - new Pair<>( - new BlobData<>(blobPath.fileRegistrationPath, blobData), - CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers))); + // Kick off a build job + blobs.add(buildAndUploadBlob(fullyQualifiedTableName, blobPath, blob)); logger.logInfo( "buildAndUpload task added for client={}, blob={}, buildUploadWorkers stats={}", @@ -563,28 +407,78 @@ && shouldStopProcessing( this.registerService.addBlobs(blobs); } - /** - * Check whether we should stop merging more channels into the same chunk, we need to stop in a - * few cases: - * - *

When the blob size is larger than a certain threshold - * - *

When the chunk size is larger than a certain threshold - * - *

When the schemas are not the same - */ - private boolean shouldStopProcessing( - float totalBufferSizeInBytes, - float totalBufferSizePerTableInBytes, - ChannelData current, - ChannelData prev) { - return totalBufferSizeInBytes + current.getBufferSize() > MAX_BLOB_SIZE_IN_BYTES - || totalBufferSizePerTableInBytes + current.getBufferSize() - > this.owningClient.getParameterProvider().getMaxChunkSizeInBytes() - || !Objects.equals( - current.getChannelContext().getEncryptionKeyId(), - prev.getChannelContext().getEncryptionKeyId()) - || !current.getColumnEps().keySet().equals(prev.getColumnEps().keySet()); + private List>>> buildBlobData(Iterator>> tablesToFlush) { + BlobDataBuilder blobDataBuilder = new BlobDataBuilder<>(this.owningClient.getName(), this.owningClient.getParameterProvider()); + while (tablesToFlush.hasNext()) { + ConcurrentHashMap> next = tablesToFlush.next(); + Collection> tableChannels = next.values(); + blobDataBuilder.appendDataForTable(tableChannels); + } + + return blobDataBuilder.getAllBlobData(); + } + + private Pair, CompletableFuture> buildAndUploadBlob(String fullyQualifiedTableName, BlobPath blobPath, List>> blobData) { + long flushStartMs = System.currentTimeMillis(); + if (this.owningClient.flushLatency != null) { + latencyTimerContextMap.putIfAbsent( + blobPath.fileRegistrationPath, this.owningClient.flushLatency.time()); + } + + // Copy encryptionKeysPerTable from owning client + Map encryptionKeysPerTable = + new ConcurrentHashMap<>(); + this.owningClient + .getEncryptionKeysPerTable() + .forEach((k, v) -> encryptionKeysPerTable.put(k, new EncryptionKey(v))); + + Supplier supplier = + () -> { + try { + BlobMetadata blobMetadata = + buildAndUpload( + blobPath, blobData, fullyQualifiedTableName, encryptionKeysPerTable); + blobMetadata.getBlobStats().setFlushStartMs(flushStartMs); + return blobMetadata; + } catch (Throwable e) { + Throwable ex = e.getCause() == null ? e : e.getCause(); + String errorMessage = + String.format( + "Building blob failed, client=%s, blob=%s, exception=%s," + + " detail=%s, trace=%s, all channels in the blob will be" + + " invalidated", + this.owningClient.getName(), + blobPath.fileRegistrationPath, + ex, + ex.getMessage(), + getStackTrace(ex)); + logger.logError(errorMessage); + if (this.owningClient.getTelemetryService() != null) { + this.owningClient + .getTelemetryService() + .reportClientFailure(this.getClass().getSimpleName(), errorMessage); + } + + if (e instanceof IOException) { + invalidateAllChannelsInBlob(blobData, errorMessage); + return null; + } else if (e instanceof NoSuchAlgorithmException) { + throw new SFException(e, ErrorCode.MD5_HASHING_NOT_AVAILABLE); + } else if (e instanceof InvalidAlgorithmParameterException + | e instanceof NoSuchPaddingException + | e instanceof IllegalBlockSizeException + | e instanceof BadPaddingException + | e instanceof InvalidKeyException) { + throw new SFException(e, ErrorCode.ENCRYPTION_FAILURE); + } else { + throw new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()); + } + } + }; + + return new Pair<>( + new BlobData<>(blobPath.fileRegistrationPath, blobData), + CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers)); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFlushable.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFlushable.java new file mode 100644 index 000000000..790e16745 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelFlushable.java @@ -0,0 +1,23 @@ +package net.snowflake.ingest.streaming.internal; + +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +interface SnowflakeStreamingIngestChannelFlushable extends SnowflakeStreamingIngestChannel { + Long getChannelSequencer(); + + ChannelRuntimeState getChannelState(); + + ChannelData getData(); + + void invalidate(String message, String invalidationCause); + + void markClosed(); + + CompletableFuture flush(boolean closing); + + // TODO: need to verify with the table schema when supporting sub-columns + void setupSchema(List columns); +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index ee06eedcc..63d009ec2 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -36,7 +36,7 @@ * * @param type of column data {@link ParquetChunkData}) */ -class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIngestChannel { +class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIngestChannelFlushable { private static final Logging logger = new Logging(SnowflakeStreamingIngestChannelInternal.class); @@ -156,13 +156,15 @@ public String getTableName() { return this.channelFlushContext.getTableName(); } - Long getChannelSequencer() { + @Override + public Long getChannelSequencer() { return this.channelFlushContext.getChannelSequencer(); } /** @return current state of the channel */ + @Override @VisibleForTesting - ChannelRuntimeState getChannelState() { + public ChannelRuntimeState getChannelState() { return this.channelState; } @@ -181,7 +183,8 @@ public String getFullyQualifiedTableName() { * * @return a ChannelData object */ - ChannelData getData() { + @Override + public ChannelData getData() { ChannelData data = this.rowBuffer.flush(); if (data != null) { data.setChannelContext(channelFlushContext); @@ -196,7 +199,8 @@ public boolean isValid() { } /** Mark the channel as invalid, and release resources */ - void invalidate(String message, String invalidationCause) { + @Override + public void invalidate(String message, String invalidationCause) { this.channelState.invalidate(); this.invalidationCause = invalidationCause; this.rowBuffer.close("invalidate"); @@ -215,7 +219,8 @@ public boolean isClosed() { } /** Mark the channel as closed */ - void markClosed() { + @Override + public void markClosed() { this.isClosed = true; logger.logInfo( "Channel is marked as closed, name={}, channel sequencer={}, row sequencer={}", @@ -230,7 +235,8 @@ void markClosed() { * @param closing whether the flush is called as part of channel closing * @return future which will be complete when the flush the data is registered */ - CompletableFuture flush(boolean closing) { + @Override + public CompletableFuture flush(boolean closing) { // Skip this check for closing because we need to set the channel to closed first and then flush // in case there is any leftover rows if (isClosed() && !closing) { @@ -268,7 +274,7 @@ public CompletableFuture close(boolean drop) { return flush(true) .thenRunAsync( () -> { - List> uncommittedChannels = + List> uncommittedChannels = this.owningClient.verifyChannelsAreFullyCommitted( Collections.singletonList(this)); @@ -280,7 +286,7 @@ public CompletableFuture close(boolean drop) { throw new SFException( ErrorCode.CHANNELS_WITH_UNCOMMITTED_ROWS, uncommittedChannels.stream() - .map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName) + .map(SnowflakeStreamingIngestChannelFlushable::getFullyQualifiedName) .collect(Collectors.toList())); } if (drop) { @@ -301,7 +307,8 @@ public CompletableFuture close(boolean drop) { * @param columns */ // TODO: need to verify with the table schema when supporting sub-columns - void setupSchema(List columns) { + @Override + public void setupSchema(List columns) { logger.logDebug("Setup schema for channel={}, schema={}", getFullyQualifiedName(), columns); this.rowBuffer.setupSchema(columns); columns.forEach(c -> tableColumns.putIfAbsent(c.getName(), new ColumnProperties(c))); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 8eb5cc066..dda4eb7d6 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -330,7 +330,7 @@ public boolean isClosed() { * @return a SnowflakeStreamingIngestChannel object */ @Override - public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest request) { + public SnowflakeStreamingIngestChannelFlushable openChannel(OpenChannelRequest request) { if (isClosed) { throw new SFException(ErrorCode.CLOSED_CLIENT); } @@ -382,7 +382,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest getName()); // Channel is now registered, add it to the in-memory channel pool - SnowflakeStreamingIngestChannelInternal channel = + SnowflakeStreamingIngestChannelFlushable channel = SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()) .setDBName(response.getDBName()) .setSchemaName(response.getSchemaName()) @@ -464,9 +464,9 @@ public void dropChannel(DropChannelRequest request) { @Override public Map getLatestCommittedOffsetTokens( List channels) { - List> internalChannels = + List> internalChannels = channels.stream() - .map(c -> (SnowflakeStreamingIngestChannelInternal) c) + .map(c -> (SnowflakeStreamingIngestChannelFlushable) c) .collect(Collectors.toList()); List channelsStatus = getChannelsStatus(internalChannels).getChannels(); @@ -486,7 +486,7 @@ public Map getLatestCommittedOffsetTokens( * @return a ChannelsStatusResponse object */ ChannelsStatusResponse getChannelsStatus( - List> channels) { + List> channels) { try { ChannelsStatusRequest request = new ChannelsStatusRequest(); List requestDTOs = @@ -499,7 +499,7 @@ ChannelsStatusResponse getChannelsStatus( ChannelsStatusResponse response = snowflakeServiceClient.getChannelStatus(request); for (int idx = 0; idx < channels.size(); idx++) { - SnowflakeStreamingIngestChannelInternal channel = channels.get(idx); + SnowflakeStreamingIngestChannelFlushable channel = channels.get(idx); ChannelsStatusResponse.ChannelStatusResponseDTO channelStatus = response.getChannels().get(idx); if (channelStatus.getStatusCode() != RESPONSE_SUCCESS) { @@ -808,7 +808,7 @@ void setNeedFlush(String fullyQualifiedTableName) { } /** Remove the channel in the channel cache if the channel sequencer matches */ - void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal channel) { + void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelFlushable channel) { this.channelCache.removeChannelIfSequencersMatch(channel); } @@ -843,8 +843,8 @@ FlushService getFlushService() { * @param channels a list of channels we want to check against * @return a list of channels that has uncommitted rows */ - List> verifyChannelsAreFullyCommitted( - List> channels) { + List> verifyChannelsAreFullyCommitted( + List> channels) { if (channels.isEmpty()) { return channels; } @@ -853,16 +853,16 @@ List> verifyChannelsAreFullyCommitted int retry = 0; boolean isTimeout = true; List oldChannelsStatus = new ArrayList<>(); - List> channelsWithError = new ArrayList<>(); + List> channelsWithError = new ArrayList<>(); do { List channelsStatus = getChannelsStatus(channels).getChannels(); - List> tempChannels = new ArrayList<>(); + List> tempChannels = new ArrayList<>(); List tempChannelsStatus = new ArrayList<>(); for (int idx = 0; idx < channelsStatus.size(); idx++) { ChannelsStatusResponse.ChannelStatusResponseDTO channelStatus = channelsStatus.get(idx); - SnowflakeStreamingIngestChannelInternal channel = channels.get(idx); + SnowflakeStreamingIngestChannelFlushable channel = channels.get(idx); long rowSequencer = channel.getChannelState().getRowSequencer(); logger.logInfo( "Get channel status name={}, status={}, clientSequencer={}, rowSequencer={}," diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 1ce9407f5..307c58710 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -166,7 +166,7 @@ public void testAddChannel() { Assert.assertTrue(!channel.isValid()); Assert.assertTrue(channelDup.isValid()); Assert.assertEquals(1, cache.getSize()); - ConcurrentHashMap> channels = + ConcurrentHashMap> channels = cache.entrySet().iterator().next().getValue(); Assert.assertEquals(1, channels.size()); Assert.assertTrue(channelDup == channels.get(channelName)); @@ -179,15 +179,15 @@ public void testIterator() { Iterator< Map.Entry< String, - ConcurrentHashMap>>> + ConcurrentHashMap>>> iter = cache.entrySet().iterator(); Map.Entry< String, - ConcurrentHashMap>> + ConcurrentHashMap>> firstTable = iter.next(); Map.Entry< String, - ConcurrentHashMap>> + ConcurrentHashMap>> secondTable = iter.next(); Assert.assertFalse(iter.hasNext()); if (firstTable.getKey().equals(channel1.getFullyQualifiedTableName())) { @@ -209,10 +209,10 @@ public void testCloseAllChannels() { Iterator< Map.Entry< String, - ConcurrentHashMap>>> + ConcurrentHashMap>>> iter = cache.entrySet().iterator(); while (iter.hasNext()) { - for (SnowflakeStreamingIngestChannelInternal channel : iter.next().getValue().values()) { + for (SnowflakeStreamingIngestChannelFlushable channel : iter.next().getValue().values()) { Assert.assertTrue(channel.isClosed()); } }