diff --git a/e2e-jar-test/pom.xml b/e2e-jar-test/pom.xml
index 7acf7a1e9..6e3652a4e 100644
--- a/e2e-jar-test/pom.xml
+++ b/e2e-jar-test/pom.xml
@@ -29,7 +29,7 @@
net.snowflakesnowflake-ingest-sdk
- 2.1.2-SNAPSHOT
+ 2.2.0
diff --git a/pom.xml b/pom.xml
index e14af3d8e..b771ac80b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
net.snowflakesnowflake-ingest-sdk
- 2.1.2-SNAPSHOT
+ 2.2.0jarSnowflake Ingest SDKSnowflake Ingest SDK
@@ -45,7 +45,7 @@
3.14.01.3.11.11.0
- 2.16.1
+ 2.17.032.0.1-jre3.3.6true
@@ -60,13 +60,13 @@
4.1.94.Final9.37.33.1
- 1.13.1
+ 1.14.12.0.9UTF-8
- 3.19.6
+ 4.27.2net.snowflake.ingest.internal1.7.36
- 1.1.10.4
+ 1.1.10.53.16.10.13.0
@@ -343,13 +343,13 @@
net.bytebuddybyte-buddy
- 1.10.19
+ 1.14.9testnet.bytebuddybyte-buddy-agent
- 1.10.19
+ 1.14.9test
@@ -358,6 +358,18 @@
3.7.7test
+
+ org.openjdk.jmh
+ jmh-core
+ 1.34
+ test
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ 1.34
+ test
+
@@ -470,6 +482,13 @@
org.apache.parquetparquet-common
+
+
+
+ javax.annotation
+ javax.annotation-api
+
+ org.apache.parquet
@@ -491,7 +510,7 @@
com.github.lubenzstd-jni
- 1.5.0-1
+ 1.5.6-2runtime
@@ -527,6 +546,16 @@
mockito-coretest
+
+ org.openjdk.jmh
+ jmh-core
+ test
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ test
+ org.powermockpowermock-api-mockito2
@@ -723,8 +752,8 @@
true
+ to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
+ the dependency is unused, so we ignore it here-->
org.apache.commons:commons-compressorg.apache.commons:commons-configuration2
@@ -819,9 +848,9 @@
failFast
+ The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
+ check your dependencies", verify the conditions of the license and add the reference to it here.
+ -->
Apache License 2.0BSD 2-Clause License
@@ -844,6 +873,7 @@
BSD 2-Clause License
|The BSD LicenseThe MIT License|MIT License
+ 3-Clause BSD License|BSD-3-Clause
@@ -1133,9 +1163,9 @@
+ Plugin executes license processing Python script, which copies third party license files into the directory
+ target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR.
+ -->
org.codehaus.mojoexec-maven-plugin
diff --git a/scripts/process_licenses.py b/scripts/process_licenses.py
index bb43fbbf0..4a0377a8e 100644
--- a/scripts/process_licenses.py
+++ b/scripts/process_licenses.py
@@ -132,18 +132,22 @@ def main():
dependency_without_license_count += 1
missing_licenses_str += f"{dependency_lookup_key}: {license_name}\n"
else:
- raise Exception(f"The dependency {dependency_lookup_key} does not ship a license file, but neither is it not defined in ADDITIONAL_LICENSES_MAP")
+ raise Exception(
+ f"The dependency {dependency_lookup_key} does not ship a license file, but neither is it not "
+ f"defined in ADDITIONAL_LICENSES_MAP")
with open(Path(target_dir, "ADDITIONAL_LICENCES"), "w") as additional_licenses_handle:
additional_licenses_handle.write(missing_licenses_str)
if dependency_count < 30:
- raise Exception(f"Suspiciously low number of dependency JARs detected in {dependency_jars_path}: {dependency_count}")
+ raise Exception(
+ f"Suspiciously low number of dependency JARs detected in {dependency_jars_path}: {dependency_count}")
print("License generation finished")
print(f"\tTotal dependencies: {dependency_count}")
print(f"\tTotal dependencies (with license): {dependency_with_license_count}")
print(f"\tTotal dependencies (without license): {dependency_without_license_count}")
print(f"\tIgnored dependencies: {dependency_ignored_count}")
+
if __name__ == "__main__":
main()
diff --git a/src/main/java/net/snowflake/ingest/connection/OAuthClient.java b/src/main/java/net/snowflake/ingest/connection/OAuthClient.java
index 61c736a42..a592899ea 100644
--- a/src/main/java/net/snowflake/ingest/connection/OAuthClient.java
+++ b/src/main/java/net/snowflake/ingest/connection/OAuthClient.java
@@ -93,6 +93,7 @@ public void refreshToken() throws IOException {
/** Helper method for making refresh request */
private HttpUriRequest makeRefreshTokenRequest() {
+ // TODO SNOW-1538108 Use SnowflakeServiceClient to make the request
HttpPost post = new HttpPost(oAuthCredential.get().getOAuthTokenEndpoint());
post.addHeader(HttpHeaders.CONTENT_TYPE, OAUTH_CONTENT_TYPE_HEADER);
post.addHeader(HttpHeaders.AUTHORIZATION, oAuthCredential.get().getAuthHeader());
diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
index 55929400d..d2bf317d4 100644
--- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
+++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java
@@ -110,7 +110,7 @@ public class RequestBuilder {
// Don't change!
public static final String CLIENT_NAME = "SnowpipeJavaSDK";
- public static final String DEFAULT_VERSION = "2.1.2-SNAPSHOT";
+ public static final String DEFAULT_VERSION = "2.2.0";
public static final String JAVA_USER_AGENT = "JAVA";
@@ -678,12 +678,23 @@ public HttpGet generateHistoryRangeRequest(
*/
public HttpPost generateStreamingIngestPostRequest(
String payload, String endPoint, String message) {
- LOGGER.debug("Generate Snowpipe streaming request: endpoint={}, payload={}", endPoint, payload);
+ final String requestId = UUID.randomUUID().toString();
+ LOGGER.debug(
+ "Generate Snowpipe streaming request: endpoint={}, payload={}, requestId={}",
+ endPoint,
+ payload,
+ requestId);
// Make the corresponding URI
URI uri = null;
try {
uri =
- new URIBuilder().setScheme(scheme).setHost(host).setPort(port).setPath(endPoint).build();
+ new URIBuilder()
+ .setScheme(scheme)
+ .setHost(host)
+ .setPort(port)
+ .setPath(endPoint)
+ .setParameter(REQUEST_ID, requestId)
+ .build();
} catch (URISyntaxException e) {
throw new SFException(e, ErrorCode.BUILD_REQUEST_FAILURE, message);
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java
index cc8782dbd..4d3ea19aa 100644
--- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java
+++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
+ * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/
package net.snowflake.ingest.streaming;
@@ -150,7 +150,7 @@ public ZoneId getDefaultTimezone() {
}
public String getFullyQualifiedTableName() {
- return String.format("%s.%s.%s", this.dbName, this.schemaName, this.tableName);
+ return Utils.getFullyQualifiedTableName(this.dbName, this.schemaName, this.tableName);
}
public OnErrorOption getOnErrorOption() {
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
index 6d5dce17f..71a9d501e 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
@@ -16,7 +16,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
@@ -400,10 +399,10 @@ public float getSize() {
Set verifyInputColumns(
Map row, InsertValidationResponse.InsertError error, int rowIndex) {
// Map of unquoted column name -> original column name
- Map inputColNamesMap =
- row.keySet().stream()
- .collect(Collectors.toMap(LiteralQuoteUtils::unquoteColumnName, value -> value));
-
+ Set originalKeys = row.keySet();
+ Map inputColNamesMap = new HashMap<>();
+ originalKeys.forEach(
+ key -> inputColNamesMap.put(LiteralQuoteUtils.unquoteColumnName(key), key));
// Check for extra columns in the row
List extraCols = new ArrayList<>();
for (String columnName : inputColNamesMap.keySet()) {
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java
index 989be0fa1..90c0f2ac9 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java
@@ -1,12 +1,15 @@
/*
- * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
+ * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/
package net.snowflake.ingest.streaming.internal;
-import java.util.Iterator;
+import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import net.snowflake.ingest.utils.ErrorCode;
+import net.snowflake.ingest.utils.SFException;
/**
* In-memory cache that stores the active channels for a given Streaming Ingest client, and the
@@ -23,6 +26,20 @@ class ChannelCache {
String, ConcurrentHashMap>>
cache = new ConcurrentHashMap<>();
+ /** Flush information for each table including last flush time and if flush is needed */
+ static class FlushInfo {
+ final long lastFlushTime;
+ final boolean needFlush;
+
+ FlushInfo(long lastFlushTime, boolean needFlush) {
+ this.lastFlushTime = lastFlushTime;
+ this.needFlush = needFlush;
+ }
+ }
+
+ /** Flush information for each table, only used when max chunks in blob is 1 */
+ private final ConcurrentHashMap tableFlushInfo = new ConcurrentHashMap<>();
+
/**
* Add a channel to the channel cache
*
@@ -33,6 +50,11 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) {
this.cache.computeIfAbsent(
channel.getFullyQualifiedTableName(), v -> new ConcurrentHashMap<>());
+ // Update the last flush time for the table, add jitter to avoid all channels flush at the same
+ // time when the blobs are not interleaved
+ this.tableFlushInfo.putIfAbsent(
+ channel.getFullyQualifiedTableName(), new FlushInfo(System.currentTimeMillis(), false));
+
SnowflakeStreamingIngestChannelInternal oldChannel =
channels.put(channel.getName(), channel);
// Invalidate old channel if it exits to block new inserts and return error to users earlier
@@ -44,13 +66,84 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) {
}
/**
- * Returns an iterator over the (table, channels) in this map.
+ * Get the last flush time for a table
+ *
+ * @param fullyQualifiedTableName fully qualified table name
+ * @return last flush time in milliseconds
+ */
+ Long getLastFlushTime(String fullyQualifiedTableName) {
+ FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName);
+ if (tableFlushInfo == null) {
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR,
+ String.format("Last flush time for table %s not found", fullyQualifiedTableName));
+ }
+ return tableFlushInfo.lastFlushTime;
+ }
+
+ /**
+ * Set the last flush time for a table as the current time
*
- * @return
+ * @param fullyQualifiedTableName fully qualified table name
+ * @param lastFlushTime last flush time in milliseconds
*/
- Iterator>>>
- iterator() {
- return this.cache.entrySet().iterator();
+ void setLastFlushTime(String fullyQualifiedTableName, Long lastFlushTime) {
+ this.tableFlushInfo.compute(
+ fullyQualifiedTableName,
+ (k, v) -> {
+ if (v == null) {
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR,
+ String.format("Last flush time for table %s not found", fullyQualifiedTableName));
+ }
+ return new FlushInfo(lastFlushTime, v.needFlush);
+ });
+ }
+
+ /**
+ * Get need flush flag for a table
+ *
+ * @param fullyQualifiedTableName fully qualified table name
+ * @return need flush flag
+ */
+ boolean getNeedFlush(String fullyQualifiedTableName) {
+ FlushInfo tableFlushInfo = this.tableFlushInfo.get(fullyQualifiedTableName);
+ if (tableFlushInfo == null) {
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR,
+ String.format("Need flush flag for table %s not found", fullyQualifiedTableName));
+ }
+ return tableFlushInfo.needFlush;
+ }
+
+ /**
+ * Set need flush flag for a table
+ *
+ * @param fullyQualifiedTableName fully qualified table name
+ * @param needFlush need flush flag
+ */
+ void setNeedFlush(String fullyQualifiedTableName, boolean needFlush) {
+ this.tableFlushInfo.compute(
+ fullyQualifiedTableName,
+ (k, v) -> {
+ if (v == null) {
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR,
+ String.format("Need flush flag for table %s not found", fullyQualifiedTableName));
+ }
+ return new FlushInfo(v.lastFlushTime, needFlush);
+ });
+ }
+
+ /** Returns an immutable set view of the mappings contained in the channel cache. */
+ Set>>>
+ entrySet() {
+ return Collections.unmodifiableSet(cache.entrySet());
+ }
+
+ /** Returns an immutable set view of the keys contained in the channel cache. */
+ Set keySet() {
+ return Collections.unmodifiableSet(cache.keySet());
}
/** Close all channels in the channel cache */
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java
index 3e5265719..fe9542267 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelFlushContext.java
@@ -1,9 +1,11 @@
/*
- * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
+ * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/
package net.snowflake.ingest.streaming.internal;
+import net.snowflake.ingest.utils.Utils;
+
/**
* Channel immutable identification and encryption attributes.
*
@@ -36,12 +38,12 @@ class ChannelFlushContext {
String encryptionKey,
Long encryptionKeyId) {
this.name = name;
- this.fullyQualifiedName = String.format("%s.%s.%s.%s", dbName, schemaName, tableName, name);
+ this.fullyQualifiedName =
+ Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name);
this.dbName = dbName;
this.schemaName = schemaName;
this.tableName = tableName;
- this.fullyQualifiedTableName =
- String.format("%s.%s.%s", this.getDbName(), this.getSchemaName(), this.getTableName());
+ this.fullyQualifiedTableName = Utils.getFullyQualifiedTableName(dbName, schemaName, tableName);
this.channelSequencer = channelSequencer;
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java
index b98782ab9..025647f14 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelsStatusRequest.java
@@ -1,14 +1,16 @@
/*
- * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
+ * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/
package net.snowflake.ingest.streaming.internal;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
+import java.util.stream.Collectors;
+import net.snowflake.ingest.utils.Utils;
/** Class to deserialize a request from a channel status request */
-class ChannelsStatusRequest {
+class ChannelsStatusRequest implements IStreamingIngestRequest {
// Used to deserialize a channel request
static class ChannelStatusRequestDTO {
@@ -61,20 +63,12 @@ Long getClientSequencer() {
}
}
- // Optional Request ID. Used for diagnostic purposes.
- private String requestId;
-
// Channels in request
private List channels;
// Snowflake role used by client
private String role;
- @JsonProperty("request_id")
- String getRequestId() {
- return requestId;
- }
-
@JsonProperty("role")
public String getRole() {
return role;
@@ -85,11 +79,6 @@ public void setRole(String role) {
this.role = role;
}
- @JsonProperty("request_id")
- void setRequestId(String requestId) {
- this.requestId = requestId;
- }
-
@JsonProperty("channels")
void setChannels(List channels) {
this.channels = channels;
@@ -99,4 +88,20 @@ void setChannels(List channels) {
List getChannels() {
return channels;
}
+
+ @Override
+ public String getStringForLogging() {
+ return String.format(
+ "ChannelsStatusRequest(role=%s, channels=[%s])",
+ role,
+ channels.stream()
+ .map(
+ r ->
+ Utils.getFullyQualifiedChannelName(
+ r.getDatabaseName(),
+ r.getSchemaName(),
+ r.getTableName(),
+ r.getChannelName()))
+ .collect(Collectors.joining(", ")));
+ }
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
index 278d4abea..72d89409f 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
@@ -18,6 +18,8 @@ public class ClientBufferParameters {
private Constants.BdecParquetCompression bdecParquetCompression;
+ private Constants.BinaryStringEncoding binaryStringEncoding;
+
/**
* Private constructor used for test methods
*
@@ -30,11 +32,13 @@ private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
- Constants.BdecParquetCompression bdecParquetCompression) {
+ Constants.BdecParquetCompression bdecParquetCompression,
+ Constants.BinaryStringEncoding binaryStringEncoding) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
+ this.binaryStringEncoding = binaryStringEncoding;
}
/** @param clientInternal reference to the client object where the relevant parameters are set */
@@ -55,6 +59,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;
+ this.binaryStringEncoding =
+ clientInternal != null
+ ? clientInternal.getParameterProvider().getBinaryStringEncoding()
+ : ParameterProvider.BINARY_STRING_ENCODING_DEFAULT;
}
/**
@@ -68,12 +76,14 @@ public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
- Constants.BdecParquetCompression bdecParquetCompression) {
+ Constants.BdecParquetCompression bdecParquetCompression,
+ Constants.BinaryStringEncoding binaryStringEncoding) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
- bdecParquetCompression);
+ bdecParquetCompression,
+ binaryStringEncoding);
}
public boolean getEnableParquetInternalBuffering() {
@@ -91,4 +101,8 @@ public long getMaxAllowedRowSizeInBytes() {
public Constants.BdecParquetCompression getBdecParquetCompression() {
return bdecParquetCompression;
}
+
+ public Constants.BinaryStringEncoding getBinaryStringEncoding() {
+ return binaryStringEncoding;
+ }
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java
new file mode 100644
index 000000000..79b282079
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Class used to serialize client configure request */
+class ClientConfigureRequest implements IStreamingIngestRequest {
+ /**
+ * Constructor for client configure request
+ *
+ * @param role Role to be used for the request.
+ */
+ ClientConfigureRequest(String role) {
+ this.role = role;
+ }
+
+ @JsonProperty("role")
+ private String role;
+
+ // File name for the GCS signed url request
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty("file_name")
+ private String fileName;
+
+ String getRole() {
+ return role;
+ }
+
+ void setRole(String role) {
+ this.role = role;
+ }
+
+ String getFileName() {
+ return fileName;
+ }
+
+ void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ @Override
+ public String getStringForLogging() {
+ return String.format("ClientConfigureRequest(role=%s, file_name=%s)", getRole(), getFileName());
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureResponse.java
new file mode 100644
index 000000000..03a1d3576
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientConfigureResponse.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Class used to deserialize responses from configure endpoint */
+@JsonIgnoreProperties(ignoreUnknown = true)
+class ClientConfigureResponse extends StreamingIngestResponse {
+ @JsonProperty("prefix")
+ private String prefix;
+
+ @JsonProperty("status_code")
+ private Long statusCode;
+
+ @JsonProperty("message")
+ private String message;
+
+ @JsonProperty("stage_location")
+ private FileLocationInfo stageLocation;
+
+ @JsonProperty("deployment_id")
+ private Long deploymentId;
+
+ String getPrefix() {
+ return prefix;
+ }
+
+ void setPrefix(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ Long getStatusCode() {
+ return statusCode;
+ }
+
+ void setStatusCode(Long statusCode) {
+ this.statusCode = statusCode;
+ }
+
+ String getMessage() {
+ return message;
+ }
+
+ void setMessage(String message) {
+ this.message = message;
+ }
+
+ FileLocationInfo getStageLocation() {
+ return stageLocation;
+ }
+
+ void setStageLocation(FileLocationInfo stageLocation) {
+ this.stageLocation = stageLocation;
+ }
+
+ Long getDeploymentId() {
+ return deploymentId;
+ }
+
+ void setDeploymentId(Long deploymentId) {
+ this.deploymentId = deploymentId;
+ }
+
+ String getClientPrefix() {
+ if (this.deploymentId == null) {
+ return this.prefix;
+ }
+ return this.prefix + "_" + this.deploymentId;
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
index 814423c28..3e13c4ffc 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
@@ -24,17 +24,14 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeParseException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
import java.util.function.Supplier;
import net.snowflake.client.jdbc.internal.google.common.collect.Sets;
import net.snowflake.client.jdbc.internal.snowflake.common.core.SnowflakeDateTimeFormat;
import net.snowflake.client.jdbc.internal.snowflake.common.util.Power10;
import net.snowflake.ingest.streaming.internal.serialization.ByteArraySerializer;
import net.snowflake.ingest.streaming.internal.serialization.ZonedDateTimeSerializer;
+import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.DecoderException;
@@ -86,6 +83,18 @@ class DataValidationUtil {
objectMapper.registerModule(module);
}
+ // Caching the powers of 10 that are used for checking the range of numbers because computing them
+ // on-demand is expensive.
+ private static final BigDecimal[] POWER_10 = makePower10Table();
+
+ private static BigDecimal[] makePower10Table() {
+ BigDecimal[] power10 = new BigDecimal[Power10.sb16Size];
+ for (int i = 0; i < Power10.sb16Size; i++) {
+ power10[i] = new BigDecimal(Power10.sb16Table[i]);
+ }
+ return power10;
+ }
+
/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
@@ -615,7 +624,7 @@ static int validateAndParseDate(String columnName, Object input, long insertRowI
* @return Validated array
*/
static byte[] validateAndParseBinary(
- String columnName, Object input, Optional maxLengthOptional, long insertRowIndex) {
+ String columnName, Object input, Optional maxLengthOptional, long insertRowIndex, Constants.BinaryStringEncoding binaryStringEncoding) {
byte[] output;
if (input instanceof byte[]) {
// byte[] is a mutable object, we need to create a defensive copy to protect against
@@ -625,12 +634,30 @@ static byte[] validateAndParseBinary(
output = new byte[originalInputArray.length];
System.arraycopy(originalInputArray, 0, output, 0, originalInputArray.length);
} else if (input instanceof String) {
- try {
- String stringInput = ((String) input).trim();
- output = Hex.decodeHex(stringInput);
- } catch (DecoderException e) {
+ if(binaryStringEncoding == Constants.BinaryStringEncoding.BASE64) {
+ try {
+ String stringInput = ((String) input).trim();
+ // Remove double quotes if present
+ if (stringInput.length() >= 2 && stringInput.startsWith("\"") && stringInput.endsWith("\"")) {
+ stringInput = stringInput.substring(1, stringInput.length() - 1);
+ }
+ Base64.Decoder decoder = Base64.getDecoder();
+ output = decoder.decode(stringInput);
+ } catch (IllegalArgumentException e) {
+ throw valueFormatNotAllowedException(
+ columnName, "BINARY", "Not a valid base64 string", insertRowIndex);
+ }
+ } else if (binaryStringEncoding == Constants.BinaryStringEncoding.HEX) {
+ try {
+ String stringInput = ((String) input).trim();
+ output = Hex.decodeHex(stringInput);
+ } catch (DecoderException e) {
+ throw valueFormatNotAllowedException(
+ columnName, "BINARY", "Not a valid hex string", insertRowIndex);
+ }
+ } else {
throw valueFormatNotAllowedException(
- columnName, "BINARY", "Not a valid hex string", insertRowIndex);
+ columnName, "BINARY", "Unsupported binary string format " + binaryStringEncoding.name(), insertRowIndex);
}
} else {
throw typeNotAllowedException(
@@ -823,7 +850,11 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR
static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
- if (bigDecimalValue.abs().compareTo(BigDecimal.TEN.pow(precision - scale)) >= 0) {
+ BigDecimal comparand =
+ (precision >= scale) && (precision - scale) < POWER_10.length
+ ? POWER_10[precision - scale]
+ : BigDecimal.TEN.pow(precision - scale);
+ if (bigDecimalValue.abs().compareTo(comparand) >= 0) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java
new file mode 100644
index 000000000..322b53acf
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import net.snowflake.ingest.streaming.DropChannelRequest;
+import net.snowflake.ingest.utils.Utils;
+
+/** Class used to serialize the {@link DropChannelRequest} */
+class DropChannelRequestInternal implements IStreamingIngestRequest {
+ @JsonProperty("request_id")
+ private String requestId;
+
+ @JsonProperty("role")
+ private String role;
+
+ @JsonProperty("channel")
+ private String channel;
+
+ @JsonProperty("table")
+ private String table;
+
+ @JsonProperty("database")
+ private String database;
+
+ @JsonProperty("schema")
+ private String schema;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty("client_sequencer")
+ Long clientSequencer;
+
+ DropChannelRequestInternal(
+ String requestId,
+ String role,
+ String database,
+ String schema,
+ String table,
+ String channel,
+ Long clientSequencer) {
+ this.requestId = requestId;
+ this.role = role;
+ this.database = database;
+ this.schema = schema;
+ this.table = table;
+ this.channel = channel;
+ this.clientSequencer = clientSequencer;
+ }
+
+ String getRequestId() {
+ return requestId;
+ }
+
+ String getRole() {
+ return role;
+ }
+
+ String getChannel() {
+ return channel;
+ }
+
+ String getTable() {
+ return table;
+ }
+
+ String getDatabase() {
+ return database;
+ }
+
+ String getSchema() {
+ return schema;
+ }
+
+ Long getClientSequencer() {
+ return clientSequencer;
+ }
+
+ String getFullyQualifiedTableName() {
+ return Utils.getFullyQualifiedTableName(database, schema, table);
+ }
+
+ @Override
+ public String getStringForLogging() {
+ return String.format(
+ "DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ + " clientSequencer=%s)",
+ requestId, role, database, schema, table, channel, clientSequencer);
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FileLocationInfo.java b/src/main/java/net/snowflake/ingest/streaming/internal/FileLocationInfo.java
new file mode 100644
index 000000000..add98a6fb
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/FileLocationInfo.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+
+/** Class used to deserialized volume information response by server */
+class FileLocationInfo {
+ /** The stage type */
+ @JsonProperty("locationType")
+ private String locationType;
+
+ /** The container or bucket */
+ @JsonProperty("location")
+ private String location;
+
+ /** The path of the target file */
+ @JsonProperty("path")
+ private String path;
+
+ /** The credentials required for the stage */
+ @JsonProperty("creds")
+ private Map credentials;
+
+ /** AWS/S3/GCS region (S3/GCS only) */
+ @JsonProperty("region")
+ private String region;
+
+ /** The Azure Storage endpoint (Azure only) */
+ @JsonProperty("endPoint")
+ private String endPoint;
+
+ /** The Azure Storage account (Azure only) */
+ @JsonProperty("storageAccount")
+ private String storageAccount;
+
+ /** GCS gives us back a presigned URL instead of a cred */
+ @JsonProperty("presignedUrl")
+ private String presignedUrl;
+
+ /** Whether to encrypt/decrypt files on the stage */
+ @JsonProperty("isClientSideEncrypted")
+ private boolean isClientSideEncrypted;
+
+ /** Whether to use s3 regional URL (AWS Only) */
+ @JsonProperty("useS3RegionalUrl")
+ private boolean useS3RegionalUrl;
+
+ /** A unique id for volume assigned by server */
+ @JsonProperty("volumeHash")
+ private String volumeHash;
+
+ String getLocationType() {
+ return locationType;
+ }
+
+ void setLocationType(String locationType) {
+ this.locationType = locationType;
+ }
+
+ String getLocation() {
+ return location;
+ }
+
+ void setLocation(String location) {
+ this.location = location;
+ }
+
+ String getPath() {
+ return path;
+ }
+
+ void setPath(String path) {
+ this.path = path;
+ }
+
+ Map getCredentials() {
+ return credentials;
+ }
+
+ void setCredentials(Map credentials) {
+ this.credentials = credentials;
+ }
+
+ String getRegion() {
+ return region;
+ }
+
+ void setRegion(String region) {
+ this.region = region;
+ }
+
+ String getEndPoint() {
+ return endPoint;
+ }
+
+ void setEndPoint(String endPoint) {
+ this.endPoint = endPoint;
+ }
+
+ String getStorageAccount() {
+ return storageAccount;
+ }
+
+ void setStorageAccount(String storageAccount) {
+ this.storageAccount = storageAccount;
+ }
+
+ String getPresignedUrl() {
+ return presignedUrl;
+ }
+
+ void setPresignedUrl(String presignedUrl) {
+ this.presignedUrl = presignedUrl;
+ }
+
+ boolean getIsClientSideEncrypted() {
+ return this.isClientSideEncrypted;
+ }
+
+ void setIsClientSideEncrypted(boolean isClientSideEncrypted) {
+ this.isClientSideEncrypted = isClientSideEncrypted;
+ }
+
+ boolean getUseS3RegionalUrl() {
+ return this.useS3RegionalUrl;
+ }
+
+ void setUseS3RegionalUrl(boolean useS3RegionalUrl) {
+ this.useS3RegionalUrl = useS3RegionalUrl;
+ }
+
+ String getVolumeHash() {
+ return this.volumeHash;
+ }
+
+ void setVolumeHash(String volumeHash) {
+ this.volumeHash = volumeHash;
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
index f08196477..84e1a2561 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
@@ -1,10 +1,9 @@
/*
- * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
+ * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/
package net.snowflake.ingest.streaming.internal;
-import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE;
import static net.snowflake.ingest.utils.Constants.DISABLE_BACKGROUND_FLUSH;
import static net.snowflake.ingest.utils.Constants.MAX_BLOB_SIZE_IN_BYTES;
import static net.snowflake.ingest.utils.Constants.MAX_THREAD_COUNT;
@@ -19,13 +18,12 @@
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.TimeZone;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -34,11 +32,10 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
-import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
@@ -84,9 +81,6 @@ List>> getData() {
private static final Logging logger = new Logging(FlushService.class);
- // Increasing counter to generate a unique blob name per client
- private final AtomicLong counter;
-
// The client that owns this flush service
private final SnowflakeStreamingIngestClientInternal owningClient;
@@ -102,18 +96,21 @@ List>> getData() {
// Reference to the channel cache
private final ChannelCache channelCache;
- // Reference to the Streaming Ingest stage
- private final StreamingIngestStage targetStage;
+ // Reference to the Streaming Ingest storage manager
+ private final IStorageManager storageManager;
// Reference to register service
private final RegisterService registerService;
- // Indicates whether we need to schedule a flush
- @VisibleForTesting volatile boolean isNeedFlush;
-
- // Latest flush time
+ /**
+ * Client level last flush time and need flush flag. This two variables are used when max chunk in
+ * blob is not 1. When max chunk in blob is 1, flush service ignores these variables and uses
+ * table level last flush time and need flush flag. See {@link ChannelCache.FlushInfo}.
+ */
@VisibleForTesting volatile long lastFlushTime;
+ @VisibleForTesting volatile boolean isNeedFlush;
+
// Indicates whether it's running as part of the test
private final boolean isTestMode;
@@ -122,23 +119,24 @@ List>> getData() {
// blob encoding version
private final Constants.BdecVersion bdecVersion;
+ private volatile int numProcessors = Runtime.getRuntime().availableProcessors();
/**
- * Constructor for TESTING that takes (usually mocked) StreamingIngestStage
+ * Default constructor
*
- * @param client
- * @param cache
- * @param isTestMode
+ * @param client the owning client
+ * @param cache the channel cache
+ * @param storageManager the storage manager
+ * @param isTestMode whether the service is running in test mode
*/
FlushService(
SnowflakeStreamingIngestClientInternal client,
ChannelCache cache,
- StreamingIngestStage targetStage, // For testing
+ IStorageManager storageManager,
boolean isTestMode) {
this.owningClient = client;
this.channelCache = cache;
- this.targetStage = targetStage;
- this.counter = new AtomicLong(0);
+ this.storageManager = storageManager;
this.registerService = new RegisterService<>(client, isTestMode);
this.isNeedFlush = false;
this.lastFlushTime = System.currentTimeMillis();
@@ -148,40 +146,6 @@ List>> getData() {
createWorkers();
}
- /**
- * Default constructor
- *
- * @param client
- * @param cache
- * @param isTestMode
- */
- FlushService(
- SnowflakeStreamingIngestClientInternal client, ChannelCache cache, boolean isTestMode) {
- this.owningClient = client;
- this.channelCache = cache;
- try {
- this.targetStage =
- new StreamingIngestStage(
- isTestMode,
- client.getRole(),
- client.getHttpClient(),
- client.getRequestBuilder(),
- client.getName(),
- DEFAULT_MAX_UPLOAD_RETRIES);
- } catch (SnowflakeSQLException | IOException err) {
- throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE);
- }
-
- this.registerService = new RegisterService<>(client, isTestMode);
- this.counter = new AtomicLong(0);
- this.isNeedFlush = false;
- this.lastFlushTime = System.currentTimeMillis();
- this.isTestMode = isTestMode;
- this.latencyTimerContextMap = new ConcurrentHashMap<>();
- this.bdecVersion = this.owningClient.getParameterProvider().getBlobFormatVersion();
- createWorkers();
- }
-
/**
* Updates performance stats enabled
*
@@ -203,36 +167,65 @@ private CompletableFuture statsFuture() {
/**
* @param isForce if true will flush regardless of other conditions
- * @param timeDiffMillis Time in milliseconds since the last flush
+ * @param tablesToFlush list of tables to flush
+ * @param flushStartTime the time when the flush started
* @return
*/
- private CompletableFuture distributeFlush(boolean isForce, long timeDiffMillis) {
+ private CompletableFuture distributeFlush(
+ boolean isForce, Set tablesToFlush, Long flushStartTime) {
return CompletableFuture.runAsync(
() -> {
- logFlushTask(isForce, timeDiffMillis);
- distributeFlushTasks();
+ logFlushTask(isForce, tablesToFlush, flushStartTime);
+ distributeFlushTasks(tablesToFlush);
+ long prevFlushEndTime = System.currentTimeMillis();
+ this.lastFlushTime = prevFlushEndTime;
this.isNeedFlush = false;
- this.lastFlushTime = System.currentTimeMillis();
- return;
+ tablesToFlush.forEach(
+ table -> {
+ this.channelCache.setLastFlushTime(table, prevFlushEndTime);
+ this.channelCache.setNeedFlush(table, false);
+ });
},
this.flushWorker);
}
/** If tracing is enabled, print always else, check if it needs flush or is forceful. */
- private void logFlushTask(boolean isForce, long timeDiffMillis) {
+ private void logFlushTask(boolean isForce, Set tablesToFlush, long flushStartTime) {
+ boolean isNeedFlush =
+ this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1
+ ? tablesToFlush.stream().anyMatch(channelCache::getNeedFlush)
+ : this.isNeedFlush;
+ long currentTime = System.currentTimeMillis();
+ final String logInfo;
+ if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
+ logInfo =
+ String.format(
+ "Tables=[%s]",
+ tablesToFlush.stream()
+ .map(
+ table ->
+ String.format(
+ "(name=%s, isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s)",
+ table,
+ channelCache.getNeedFlush(table),
+ flushStartTime - channelCache.getLastFlushTime(table),
+ currentTime - channelCache.getLastFlushTime(table)))
+ .collect(Collectors.joining(", ")));
+ } else {
+ logInfo =
+ String.format(
+ "isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s",
+ isNeedFlush, flushStartTime - this.lastFlushTime, currentTime - this.lastFlushTime);
+ }
+
final String flushTaskLogFormat =
String.format(
- "Submit forced or ad-hoc flush task on client=%s, isForce=%s,"
- + " isNeedFlush=%s, timeDiffMillis=%s, currentDiffMillis=%s",
- this.owningClient.getName(),
- isForce,
- this.isNeedFlush,
- timeDiffMillis,
- System.currentTimeMillis() - this.lastFlushTime);
+ "Submit forced or ad-hoc flush task on client=%s, isForce=%s, %s",
+ this.owningClient.getName(), isForce, logInfo);
if (logger.isTraceEnabled()) {
logger.logTrace(flushTaskLogFormat);
}
- if (!logger.isTraceEnabled() && (this.isNeedFlush || isForce)) {
+ if (!logger.isTraceEnabled() && (isNeedFlush || isForce)) {
logger.logDebug(flushTaskLogFormat);
}
}
@@ -248,27 +241,65 @@ private CompletableFuture registerFuture() {
}
/**
- * Kick off a flush job and distribute the tasks if one of the following conditions is met:
- *
Flush is forced by the users
- *
One or more buffers have reached the flush size
- *
Periodical background flush when a time interval has reached
+ * Kick off a flush job and distribute the tasks. The flush service behaves differently based on
+ * the max chunks in blob:
+ *
+ *
+ *
The max chunks in blob is not 1 (interleaving is allowed), every channel will be flushed
+ * together if one of the following conditions is met:
+ *
+ *
Flush is forced by the users
+ *
One or more buffers have reached the flush size
+ *
Periodical background flush when a time interval has reached
+ *
+ *
The max chunks in blob is 1 (interleaving is not allowed), a channel will be flushed if
+ * one of the following conditions is met:
+ *
+ *
Flush is forced by the users
+ *
One or more buffers with the same target table as the channel have reached the
+ * flush size
+ *
Periodical background flush of the target table when a time interval has reached
+ *
+ *
*
* @param isForce
* @return Completable future that will return when the blobs are registered successfully, or null
* if none of the conditions is met above
*/
CompletableFuture flush(boolean isForce) {
- long timeDiffMillis = System.currentTimeMillis() - this.lastFlushTime;
+ final long flushStartTime = System.currentTimeMillis();
+ final long flushingInterval =
+ this.owningClient.getParameterProvider().getCachedMaxClientLagInMs();
+
+ final Set tablesToFlush;
+ if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
+ tablesToFlush =
+ this.channelCache.keySet().stream()
+ .filter(
+ key ->
+ isForce
+ || flushStartTime - this.channelCache.getLastFlushTime(key)
+ >= flushingInterval
+ || this.channelCache.getNeedFlush(key))
+ .collect(Collectors.toSet());
+ } else {
+ if (isForce
+ || (!DISABLE_BACKGROUND_FLUSH
+ && !isTestMode()
+ && (this.isNeedFlush || flushStartTime - this.lastFlushTime >= flushingInterval))) {
+ tablesToFlush = this.channelCache.keySet();
+ } else {
+ tablesToFlush = null;
+ }
+ }
if (isForce
|| (!DISABLE_BACKGROUND_FLUSH
&& !isTestMode()
- && (this.isNeedFlush
- || timeDiffMillis
- >= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) {
-
+ && tablesToFlush != null
+ && !tablesToFlush.isEmpty())) {
return this.statsFuture()
- .thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis))
+ .thenCompose((v) -> this.distributeFlush(isForce, tablesToFlush, flushStartTime))
.thenCompose((v) -> this.registerFuture());
}
return this.statsFuture();
@@ -351,19 +382,27 @@ private void createWorkers() {
/**
* Distribute the flush tasks by iterating through all the channels in the channel cache and kick
* off a build blob work when certain size has reached or we have reached the end
+ *
+ * @param tablesToFlush list of tables to flush
*/
- void distributeFlushTasks() {
+ void distributeFlushTasks(Set tablesToFlush) {
Iterator<
Map.Entry<
String, ConcurrentHashMap>>>
- itr = this.channelCache.iterator();
+ itr =
+ this.channelCache.entrySet().stream()
+ .filter(e -> tablesToFlush.contains(e.getKey()))
+ .iterator();
List, CompletableFuture>> blobs = new ArrayList<>();
List> leftoverChannelsDataPerTable = new ArrayList<>();
+ // The API states that the number of available processors reported can change and therefore, we
+ // should poll it occasionally.
+ numProcessors = Runtime.getRuntime().availableProcessors();
while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) {
List>> blobData = new ArrayList<>();
float totalBufferSizeInBytes = 0F;
- final String blobPath = getBlobPath(this.targetStage.getClientPrefix());
+ final String blobPath = this.storageManager.generateBlobPath();
// Distribute work at table level, split the blob if reaching the blob size limit or the
// channel has different encryption key ids
@@ -445,9 +484,9 @@ && shouldStopProcessing(
// Kick off a build job
if (blobData.isEmpty()) {
- // we decrement the counter so that we do not have gaps in the blob names created by this
- // client. See method getBlobPath() below.
- this.counter.decrementAndGet();
+ // we decrement the blob sequencer so that we do not have gaps in the blob names created by
+ // this client.
+ this.storageManager.decrementBlobSequencer();
} else {
long flushStartMs = System.currentTimeMillis();
if (this.owningClient.flushLatency != null) {
@@ -459,7 +498,13 @@ && shouldStopProcessing(
CompletableFuture.supplyAsync(
() -> {
try {
- BlobMetadata blobMetadata = buildAndUpload(blobPath, blobData);
+ // Get the fully qualified table name from the first channel in the blob.
+ // This only matters when the client is in Iceberg mode. In Iceberg mode,
+ // all channels in the blob belong to the same table.
+ String fullyQualifiedTableName =
+ blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName();
+ BlobMetadata blobMetadata =
+ buildAndUpload(blobPath, blobData, fullyQualifiedTableName);
blobMetadata.getBlobStats().setFlushStartMs(flushStartMs);
return blobMetadata;
} catch (Throwable e) {
@@ -542,9 +587,12 @@ private boolean shouldStopProcessing(
* @param blobPath Path of the destination blob in cloud storage
* @param blobData All the data for one blob. Assumes that all ChannelData in the inner List
* belongs to the same table. Will error if this is not the case
+ * @param fullyQualifiedTableName the table name of the first channel in the blob, only matters in
+ * Iceberg mode
* @return BlobMetadata for FlushService.upload
*/
- BlobMetadata buildAndUpload(String blobPath, List>> blobData)
+ BlobMetadata buildAndUpload(
+ String blobPath, List>> blobData, String fullyQualifiedTableName)
throws IOException, NoSuchAlgorithmException, InvalidAlgorithmParameterException,
NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException,
InvalidKeyException {
@@ -555,12 +603,18 @@ BlobMetadata buildAndUpload(String blobPath, List>> blobData
blob.blobStats.setBuildDurationMs(buildContext);
- return upload(blobPath, blob.blobBytes, blob.chunksMetadataList, blob.blobStats);
+ return upload(
+ this.storageManager.getStorage(fullyQualifiedTableName),
+ blobPath,
+ blob.blobBytes,
+ blob.chunksMetadataList,
+ blob.blobStats);
}
/**
* Upload a blob to Streaming Ingest dedicated stage
*
+ * @param storage the storage to upload the blob
* @param blobPath full path of the blob
* @param blob blob data
* @param metadata a list of chunk metadata
@@ -568,13 +622,17 @@ BlobMetadata buildAndUpload(String blobPath, List>> blobData
* @return BlobMetadata object used to create the register blob request
*/
BlobMetadata upload(
- String blobPath, byte[] blob, List metadata, BlobStats blobStats)
+ StreamingIngestStorage storage,
+ String blobPath,
+ byte[] blob,
+ List metadata,
+ BlobStats blobStats)
throws NoSuchAlgorithmException {
logger.logInfo("Start uploading blob={}, size={}", blobPath, blob.length);
long startTime = System.currentTimeMillis();
Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
- this.targetStage.put(blobPath, blob);
+ storage.put(blobPath, blob);
if (uploadContext != null) {
blobStats.setUploadDurationMs(uploadContext);
@@ -626,48 +684,16 @@ void shutdown() throws InterruptedException {
}
}
- /** Set the flag to indicate that a flush is needed */
- void setNeedFlush() {
- this.isNeedFlush = true;
- }
-
/**
- * Generate a blob path, which is: "YEAR/MONTH/DAY_OF_MONTH/HOUR_OF_DAY/MINUTE/.BDEC"
+ * Set the flag to indicate that a flush is needed
*
- * @return the generated blob file path
+ * @param fullyQualifiedTableName the fully qualified table name
*/
- private String getBlobPath(String clientPrefix) {
- Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- return getBlobPath(calendar, clientPrefix);
- }
-
- /** For TESTING */
- String getBlobPath(Calendar calendar, String clientPrefix) {
- if (isTestMode && clientPrefix == null) {
- clientPrefix = "testPrefix";
+ void setNeedFlush(String fullyQualifiedTableName) {
+ this.isNeedFlush = true;
+ if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) {
+ this.channelCache.setNeedFlush(fullyQualifiedTableName, true);
}
-
- Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix);
- int year = calendar.get(Calendar.YEAR);
- int month = calendar.get(Calendar.MONTH) + 1; // Gregorian calendar starts from 0
- int day = calendar.get(Calendar.DAY_OF_MONTH);
- int hour = calendar.get(Calendar.HOUR_OF_DAY);
- int minute = calendar.get(Calendar.MINUTE);
- long time = TimeUnit.MILLISECONDS.toSeconds(calendar.getTimeInMillis());
- long threadId = Thread.currentThread().getId();
- // Create the blob short name, the clientPrefix contains the deployment id
- String blobShortName =
- Long.toString(time, 36)
- + "_"
- + clientPrefix
- + "_"
- + threadId
- + "_"
- + this.counter.getAndIncrement()
- + "."
- + BLOB_EXTENSION_TYPE;
- return year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + blobShortName;
}
/**
@@ -693,19 +719,13 @@ void invalidateAllChannelsInBlob(
}));
}
- /** Get the server generated unique prefix for this client */
- String getClientPrefix() {
- return this.targetStage.getClientPrefix();
- }
-
/**
* Throttle if the number of queued buildAndUpload tasks is bigger than the total number of
* available processors
*/
boolean throttleDueToQueuedFlushTasks() {
ThreadPoolExecutor buildAndUpload = (ThreadPoolExecutor) this.buildUploadWorkers;
- boolean throttleOnQueuedTasks =
- buildAndUpload.getQueue().size() > Runtime.getRuntime().availableProcessors();
+ boolean throttleOnQueuedTasks = buildAndUpload.getQueue().size() > numProcessors;
if (throttleOnQueuedTasks) {
logger.logWarn(
"Throttled due too many queue flush tasks (probably because of slow uploading speed),"
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java
new file mode 100644
index 000000000..51f4a82de
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+import java.util.Optional;
+
+/**
+ * Interface to manage {@link StreamingIngestStorage} for {@link FlushService}
+ *
+ * @param The type of chunk data
+ * @param the type of location that's being managed (internal stage / external volume)
+ */
+interface IStorageManager {
+ /** Default max upload retries for streaming ingest storage */
+ int DEFAULT_MAX_UPLOAD_RETRIES = 5;
+
+ /**
+ * Given a fully qualified table name, return the target storage
+ *
+ * @param fullyQualifiedTableName the target fully qualified table name
+ * @return target stage
+ */
+ StreamingIngestStorage getStorage(String fullyQualifiedTableName);
+
+ /**
+ * Add a storage to the manager
+ *
+ * @param dbName the database name
+ * @param schemaName the schema name
+ * @param tableName the table name
+ * @param fileLocationInfo file location info from configure response
+ */
+ void addStorage(
+ String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo);
+
+ /**
+ * Gets the latest file location info (with a renewed short-lived access token) for the specified
+ * location
+ *
+ * @param location A reference to the target location
+ * @param fileName optional filename for single-file signed URL fetch from server
+ * @return the new location information
+ */
+ FileLocationInfo getRefreshedLocation(TLocation location, Optional fileName);
+
+ /**
+ * Generate a unique blob path and increment the blob sequencer
+ *
+ * @return the blob path
+ */
+ String generateBlobPath();
+
+ /**
+ * Decrement the blob sequencer, this method is needed to prevent gap between file name sequencer.
+ * See {@link IStorageManager#generateBlobPath()} for more details.
+ */
+ void decrementBlobSequencer();
+
+ /**
+ * Get the unique client prefix generated by the Snowflake server
+ *
+ * @return the client prefix
+ */
+ String getClientPrefix();
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStreamingIngestRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStreamingIngestRequest.java
new file mode 100644
index 000000000..a4b5e29d1
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStreamingIngestRequest.java
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+/**
+ * The StreamingIngestRequest interface is a marker interface used for type safety in the {@link
+ * SnowflakeServiceClient} for streaming ingest API request.
+ */
+interface IStreamingIngestRequest {
+ String getStringForLogging();
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java
new file mode 100644
index 000000000..d33a80738
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import net.snowflake.client.jdbc.SnowflakeSQLException;
+import net.snowflake.ingest.connection.IngestResponseException;
+import net.snowflake.ingest.utils.ErrorCode;
+import net.snowflake.ingest.utils.SFException;
+import net.snowflake.ingest.utils.Utils;
+
+class InternalStageLocation {
+ public InternalStageLocation() {}
+}
+
+/** Class to manage single Snowflake internal stage */
+class InternalStageManager implements IStorageManager {
+ /** Target stage for the client */
+ private final StreamingIngestStorage targetStage;
+
+ /** Increasing counter to generate a unique blob name per client */
+ private final AtomicLong counter;
+
+ /** Whether the manager in test mode */
+ private final boolean isTestMode;
+
+ /** Snowflake service client used for configure calls */
+ private final SnowflakeServiceClient snowflakeServiceClient;
+
+ /** The name of the client */
+ private final String clientName;
+
+ /** The role of the client */
+ private final String role;
+
+ /** Client prefix generated by the Snowflake server */
+ private String clientPrefix;
+
+ /** Deployment ID generated by the Snowflake server */
+ private Long deploymentId;
+
+ /**
+ * Constructor for InternalStageManager
+ *
+ * @param isTestMode whether the manager in test mode
+ * @param role the role of the client
+ * @param clientName the name of the client
+ * @param snowflakeServiceClient the Snowflake service client to use for configure calls
+ */
+ InternalStageManager(
+ boolean isTestMode,
+ String role,
+ String clientName,
+ SnowflakeServiceClient snowflakeServiceClient) {
+ this.snowflakeServiceClient = snowflakeServiceClient;
+ this.isTestMode = isTestMode;
+ this.clientName = clientName;
+ this.role = role;
+ this.counter = new AtomicLong(0);
+ try {
+ if (!isTestMode) {
+ ClientConfigureResponse response =
+ this.snowflakeServiceClient.clientConfigure(new ClientConfigureRequest(role));
+ this.clientPrefix = response.getClientPrefix();
+ this.deploymentId = response.getDeploymentId();
+ this.targetStage =
+ new StreamingIngestStorage(
+ this,
+ clientName,
+ response.getStageLocation(),
+ new InternalStageLocation(),
+ DEFAULT_MAX_UPLOAD_RETRIES);
+ } else {
+ this.clientPrefix = null;
+ this.deploymentId = null;
+ this.targetStage =
+ new StreamingIngestStorage(
+ this,
+ "testClient",
+ (StreamingIngestStorage.SnowflakeFileTransferMetadataWithAge) null,
+ new InternalStageLocation(),
+ DEFAULT_MAX_UPLOAD_RETRIES);
+ }
+ } catch (IngestResponseException | IOException e) {
+ throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
+ } catch (SnowflakeSQLException e) {
+ throw new SFException(e, ErrorCode.UNABLE_TO_CONNECT_TO_STAGE, e.getMessage());
+ }
+ }
+
+ /**
+ * Get the storage. In this case, the storage is always the target stage as there's only one stage
+ * in non-iceberg mode.
+ *
+ * @param fullyQualifiedTableName the target fully qualified table name
+ * @return the target storage
+ */
+ @Override
+ @SuppressWarnings("unused")
+ public StreamingIngestStorage getStorage(
+ String fullyQualifiedTableName) {
+ // There's always only one stage for the client in non-iceberg mode
+ return targetStage;
+ }
+
+ /** Add storage to the manager. Do nothing as there's only one stage in non-Iceberg mode. */
+ @Override
+ public void addStorage(
+ String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) {}
+
+ /**
+ * Gets the latest file location info (with a renewed short-lived access token) for the specified
+ * location
+ *
+ * @param location A reference to the target location
+ * @param fileName optional filename for single-file signed URL fetch from server
+ * @return the new location information
+ */
+ @Override
+ public FileLocationInfo getRefreshedLocation(
+ InternalStageLocation location, Optional fileName) {
+ try {
+ ClientConfigureRequest request = new ClientConfigureRequest(this.role);
+ fileName.ifPresent(request::setFileName);
+ ClientConfigureResponse response = snowflakeServiceClient.clientConfigure(request);
+ if (this.clientPrefix == null) {
+ this.clientPrefix = response.getClientPrefix();
+ this.deploymentId = response.getDeploymentId();
+ }
+ if (this.deploymentId != null && !this.deploymentId.equals(response.getDeploymentId())) {
+ throw new SFException(
+ ErrorCode.CLIENT_DEPLOYMENT_ID_MISMATCH,
+ this.deploymentId,
+ response.getDeploymentId(),
+ this.clientName);
+ }
+ return response.getStageLocation();
+ } catch (IngestResponseException | IOException e) {
+ throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
+ }
+ }
+
+ /**
+ * Generate a blob path, which is: "YEAR/MONTH/DAY_OF_MONTH/HOUR_OF_DAY/MINUTE/.BDEC"
+ *
+ * @return the generated blob file path
+ */
+ @Override
+ public String generateBlobPath() {
+ Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ return getBlobPath(calendar, this.clientPrefix);
+ }
+
+ @Override
+ public void decrementBlobSequencer() {
+ this.counter.decrementAndGet();
+ }
+
+ /** For TESTING */
+ @VisibleForTesting
+ public String getBlobPath(Calendar calendar, String clientPrefix) {
+ if (this.isTestMode && clientPrefix == null) {
+ clientPrefix = "testPrefix";
+ }
+
+ Utils.assertStringNotNullOrEmpty("client prefix", clientPrefix);
+ int year = calendar.get(Calendar.YEAR);
+ int month = calendar.get(Calendar.MONTH) + 1; // Gregorian calendar starts from 0
+ int day = calendar.get(Calendar.DAY_OF_MONTH);
+ int hour = calendar.get(Calendar.HOUR_OF_DAY);
+ int minute = calendar.get(Calendar.MINUTE);
+ long time = TimeUnit.MILLISECONDS.toSeconds(calendar.getTimeInMillis());
+ long threadId = Thread.currentThread().getId();
+ // Create the blob short name, the clientPrefix contains the deployment id
+ String blobShortName =
+ Long.toString(time, 36)
+ + "_"
+ + clientPrefix
+ + "_"
+ + threadId
+ + "_"
+ + this.counter.getAndIncrement()
+ + "."
+ + BLOB_EXTENSION_TYPE;
+ return year + "/" + month + "/" + day + "/" + hour + "/" + minute + "/" + blobShortName;
+ }
+
+ /**
+ * Get the unique client prefix generated by the Snowflake server
+ *
+ * @return the client prefix
+ */
+ @Override
+ public String getClientPrefix() {
+ return this.clientPrefix;
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java
index f426e898d..777ae4fdc 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java
@@ -9,9 +9,6 @@ public interface MemoryInfoProvider {
/** @return Max memory the JVM can allocate */
long getMaxMemory();
- /** @return Total allocated JVM memory so far */
- long getTotalMemory();
-
/** @return Free JVM memory */
long getFreeMemory();
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java
index 3a957f225..d248ddfd9 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java
@@ -4,20 +4,51 @@
package net.snowflake.ingest.streaming.internal;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/** Reads memory information from JVM runtime */
public class MemoryInfoProviderFromRuntime implements MemoryInfoProvider {
- @Override
- public long getMaxMemory() {
- return Runtime.getRuntime().maxMemory();
+ private final long maxMemory;
+ private volatile long totalFreeMemory;
+ private final ScheduledExecutorService executorService;
+ private static final long FREE_MEMORY_UPDATE_INTERVAL_MS = 100;
+ private static final MemoryInfoProviderFromRuntime INSTANCE =
+ new MemoryInfoProviderFromRuntime(FREE_MEMORY_UPDATE_INTERVAL_MS);
+
+ private MemoryInfoProviderFromRuntime(long freeMemoryUpdateIntervalMs) {
+ maxMemory = Runtime.getRuntime().maxMemory();
+ totalFreeMemory =
+ Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory());
+ executorService =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ r -> {
+ Thread th = new Thread(r, "MemoryInfoProviderFromRuntime");
+ th.setDaemon(true);
+ return th;
+ });
+ executorService.scheduleAtFixedRate(
+ this::updateFreeMemory, 0, freeMemoryUpdateIntervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ private void updateFreeMemory() {
+ totalFreeMemory =
+ Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory());
+ }
+
+ public static MemoryInfoProviderFromRuntime getInstance() {
+ return INSTANCE;
}
@Override
- public long getTotalMemory() {
- return Runtime.getRuntime().totalMemory();
+ public long getMaxMemory() {
+ return maxMemory;
}
@Override
public long getFreeMemory() {
- return Runtime.getRuntime().freeMemory();
+ return totalFreeMemory;
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java
new file mode 100644
index 000000000..ff53f6729
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import net.snowflake.ingest.streaming.OpenChannelRequest;
+import net.snowflake.ingest.utils.Constants;
+
+/** Class used to serialize the {@link OpenChannelRequest} */
+class OpenChannelRequestInternal implements IStreamingIngestRequest {
+ @JsonProperty("request_id")
+ private String requestId;
+
+ @JsonProperty("role")
+ private String role;
+
+ @JsonProperty("channel")
+ private String channel;
+
+ @JsonProperty("table")
+ private String table;
+
+ @JsonProperty("database")
+ private String database;
+
+ @JsonProperty("schema")
+ private String schema;
+
+ @JsonProperty("write_mode")
+ private String writeMode;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty("offset_token")
+ private String offsetToken;
+
+ OpenChannelRequestInternal(
+ String requestId,
+ String role,
+ String database,
+ String schema,
+ String table,
+ String channel,
+ Constants.WriteMode writeMode,
+ String offsetToken) {
+ this.requestId = requestId;
+ this.role = role;
+ this.database = database;
+ this.schema = schema;
+ this.table = table;
+ this.channel = channel;
+ this.writeMode = writeMode.name();
+ this.offsetToken = offsetToken;
+ }
+
+ String getRequestId() {
+ return requestId;
+ }
+
+ String getRole() {
+ return role;
+ }
+
+ String getChannel() {
+ return channel;
+ }
+
+ String getTable() {
+ return table;
+ }
+
+ String getDatabase() {
+ return database;
+ }
+
+ String getSchema() {
+ return schema;
+ }
+
+ String getWriteMode() {
+ return writeMode;
+ }
+
+ String getOffsetToken() {
+ return offsetToken;
+ }
+
+ @Override
+ public String getStringForLogging() {
+ return String.format(
+ "OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ + " writeMode=%s)",
+ requestId, role, database, schema, table, channel, writeMode);
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java
index 16b1ededa..9950c44aa 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java
@@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;
import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.parquet.hadoop.BdecParquetWriter;
@@ -34,6 +35,16 @@ public ParquetChunkData(
this.rows = rows;
this.parquetWriter = parquetWriter;
this.output = output;
- this.metadata = metadata;
+ // create a defensive copy of the parameter map because the argument map passed here
+ // may currently be shared across multiple threads.
+ this.metadata = createDefensiveCopy(metadata);
+ }
+
+ private Map createDefensiveCopy(final Map metadata) {
+ final Map copy = new HashMap<>(metadata);
+ for (String k : metadata.keySet()) {
+ copy.put(k, metadata.get(k));
+ }
+ return copy;
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
index 39ec66dbb..3ad3db5f4 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
@@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
@@ -124,6 +125,12 @@ private SerializationResult serializeFromParquetWriteBuffers(
if (mergedChannelWriter != null) {
mergedChannelWriter.close();
+ this.verifyRowCounts(
+ "serializeFromParquetWriteBuffers",
+ mergedChannelWriter,
+ rowCount,
+ channelsDataPerTable,
+ -1);
}
return new SerializationResult(
channelsMetadataList,
@@ -216,6 +223,9 @@ private SerializationResult serializeFromJavaObjects(
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();
+ this.verifyRowCounts(
+ "serializeFromJavaObjects", parquetWriter, rowCount, channelsDataPerTable, rows.size());
+
return new SerializationResult(
channelsMetadataList,
columnEpStatsMapCombined,
@@ -224,4 +234,71 @@ private SerializationResult serializeFromJavaObjects(
mergedData,
chunkMinMaxInsertTimeInMs);
}
+
+ /**
+ * Validates that rows count in metadata matches the row count in Parquet footer and the row count
+ * written by the parquet writer
+ *
+ * @param serializationType Serialization type, used for logging purposes only
+ * @param writer Parquet writer writing the data
+ * @param channelsDataPerTable Channel data
+ * @param totalMetadataRowCount Row count calculated during metadata collection
+ * @param javaSerializationTotalRowCount Total row count when java object serialization is used.
+ * Used only for logging purposes if there is a mismatch.
+ */
+ private void verifyRowCounts(
+ String serializationType,
+ BdecParquetWriter writer,
+ long totalMetadataRowCount,
+ List> channelsDataPerTable,
+ long javaSerializationTotalRowCount) {
+ long parquetTotalRowsWritten = writer.getRowsWritten();
+
+ List parquetFooterRowsPerBlock = writer.getRowCountsFromFooter();
+ long parquetTotalRowsInFooter = 0;
+ for (long perBlockCount : parquetFooterRowsPerBlock) {
+ parquetTotalRowsInFooter += perBlockCount;
+ }
+
+ if (parquetTotalRowsInFooter != totalMetadataRowCount
+ || parquetTotalRowsWritten != totalMetadataRowCount) {
+
+ final String perChannelRowCountsInMetadata =
+ channelsDataPerTable.stream()
+ .map(x -> String.valueOf(x.getRowCount()))
+ .collect(Collectors.joining(","));
+
+ final String channelNames =
+ channelsDataPerTable.stream()
+ .map(x -> String.valueOf(x.getChannelContext().getName()))
+ .collect(Collectors.joining(","));
+
+ final String perBlockRowCountsInFooter =
+ parquetFooterRowsPerBlock.stream().map(String::valueOf).collect(Collectors.joining(","));
+
+ final long channelsCountInMetadata = channelsDataPerTable.size();
+
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR,
+ String.format(
+ "[%s]The number of rows in Parquet does not match the number of rows in metadata. "
+ + "parquetTotalRowsInFooter=%d "
+ + "totalMetadataRowCount=%d "
+ + "parquetTotalRowsWritten=%d "
+ + "perChannelRowCountsInMetadata=%s "
+ + "perBlockRowCountsInFooter=%s "
+ + "channelsCountInMetadata=%d "
+ + "countOfSerializedJavaObjects=%d "
+ + "channelNames=%s",
+ serializationType,
+ parquetTotalRowsInFooter,
+ totalMetadataRowCount,
+ parquetTotalRowsWritten,
+ perChannelRowCountsInMetadata,
+ perBlockRowCountsInFooter,
+ channelsCountInMetadata,
+ javaSerializationTotalRowCount,
+ channelNames));
+ }
+ }
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
index 17aaa9136..f5835d2a1 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
@@ -207,7 +207,7 @@ private float addRow(
ColumnMetadata column = parquetColumn.columnMetadata;
ParquetValueParser.ParquetBufferValue valueWithSize =
ParquetValueParser.parseColumnValueToParquet(
- value, column, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex);
+ value, column, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex, clientBufferParameters.getBinaryStringEncoding());
indexedRow[colIndex] = valueWithSize.getValue();
size += valueWithSize.getSize();
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java
index 282a007d4..15770d4f9 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java
@@ -10,6 +10,8 @@
import java.time.ZoneId;
import java.util.Optional;
import javax.annotation.Nullable;
+
+import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;
@@ -80,12 +82,12 @@ float getSize() {
* @return parsed value and byte size of Parquet internal representation
*/
static ParquetBufferValue parseColumnValueToParquet(
- Object value,
- ColumnMetadata columnMetadata,
- PrimitiveType.PrimitiveTypeName typeName,
- RowBufferStats stats,
- ZoneId defaultTimezone,
- long insertRowsCurrIndex) {
+ Object value,
+ ColumnMetadata columnMetadata,
+ PrimitiveType.PrimitiveTypeName typeName,
+ RowBufferStats stats,
+ ZoneId defaultTimezone,
+ long insertRowsCurrIndex, Constants.BinaryStringEncoding binaryStringEncoding) {
Utils.assertNotNull("Parquet column stats", stats);
float estimatedParquetSize = 0F;
estimatedParquetSize += DEFINITION_LEVEL_ENCODING_BYTE_LEN;
@@ -144,7 +146,7 @@ static ParquetBufferValue parseColumnValueToParquet(
int length = 0;
if (logicalType == AbstractRowBuffer.ColumnLogicalType.BINARY) {
value =
- getBinaryValueForLogicalBinary(value, stats, columnMetadata, insertRowsCurrIndex);
+ getBinaryValueForLogicalBinary(value, stats, columnMetadata, insertRowsCurrIndex, binaryStringEncoding);
length = ((byte[]) value).length;
} else {
String str = getBinaryValue(value, stats, columnMetadata, insertRowsCurrIndex);
@@ -414,14 +416,16 @@ private static byte[] getBinaryValueForLogicalBinary(
Object value,
RowBufferStats stats,
ColumnMetadata columnMetadata,
- final long insertRowsCurrIndex) {
+ final long insertRowsCurrIndex,
+ final Constants.BinaryStringEncoding binaryStringEncoding) {
String maxLengthString = columnMetadata.getByteLength().toString();
byte[] bytes =
DataValidationUtil.validateAndParseBinary(
columnMetadata.getName(),
value,
Optional.of(maxLengthString).map(Integer::parseInt),
- insertRowsCurrIndex);
+ insertRowsCurrIndex, binaryStringEncoding
+ );
stats.addBinaryValue(bytes);
return bytes;
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java
new file mode 100644
index 000000000..fcb7edf4f
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Class used to serialize the blob register request */
+class RegisterBlobRequest implements IStreamingIngestRequest {
+ @JsonProperty("request_id")
+ private String requestId;
+
+ @JsonProperty("role")
+ private String role;
+
+ @JsonProperty("blobs")
+ private List blobs;
+
+ RegisterBlobRequest(String requestId, String role, List blobs) {
+ this.requestId = requestId;
+ this.role = role;
+ this.blobs = blobs;
+ }
+
+ String getRequestId() {
+ return requestId;
+ }
+
+ String getRole() {
+ return role;
+ }
+
+ List getBlobs() {
+ return blobs;
+ }
+
+ @Override
+ public String getStringForLogging() {
+ return String.format(
+ "RegisterBlobRequest(requestId=%s, role=%s, blobs=[%s])",
+ requestId,
+ role,
+ blobs.stream().map(BlobMetadata::getPath).collect(Collectors.joining(", ")));
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java
new file mode 100644
index 000000000..67958618b
--- /dev/null
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package net.snowflake.ingest.streaming.internal;
+
+import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS;
+import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE;
+import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL;
+import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL;
+import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB;
+import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries;
+import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT;
+import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT;
+import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT;
+import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT;
+import static net.snowflake.ingest.utils.Constants.REGISTER_BLOB_ENDPOINT;
+import static net.snowflake.ingest.utils.Constants.RESPONSE_SUCCESS;
+
+import java.io.IOException;
+import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
+import net.snowflake.ingest.connection.IngestResponseException;
+import net.snowflake.ingest.connection.RequestBuilder;
+import net.snowflake.ingest.connection.ServiceResponseHandler;
+import net.snowflake.ingest.utils.ErrorCode;
+import net.snowflake.ingest.utils.Logging;
+import net.snowflake.ingest.utils.SFException;
+
+/**
+ * The SnowflakeServiceClient class is responsible for making API requests to the Snowflake service.
+ */
+class SnowflakeServiceClient {
+ private static final Logging logger = new Logging(SnowflakeServiceClient.class);
+
+ /** HTTP client used for making requests */
+ private final CloseableHttpClient httpClient;
+
+ /** Request builder for building streaming API request */
+ private final RequestBuilder requestBuilder;
+
+ /**
+ * Default constructor
+ *
+ * @param httpClient the HTTP client used for making requests
+ * @param requestBuilder the request builder for building streaming API requests
+ */
+ SnowflakeServiceClient(CloseableHttpClient httpClient, RequestBuilder requestBuilder) {
+ this.httpClient = httpClient;
+ this.requestBuilder = requestBuilder;
+ }
+
+ /**
+ * Configures the client given a {@link ClientConfigureRequest}.
+ *
+ * @param request the client configuration request
+ * @return the response from the configuration request
+ */
+ ClientConfigureResponse clientConfigure(ClientConfigureRequest request)
+ throws IngestResponseException, IOException {
+ ClientConfigureResponse response =
+ executeApiRequestWithRetries(
+ ClientConfigureResponse.class,
+ request,
+ CLIENT_CONFIGURE_ENDPOINT,
+ "client configure",
+ STREAMING_CLIENT_CONFIGURE);
+ if (response.getStatusCode() != RESPONSE_SUCCESS) {
+ logger.logDebug(
+ "Client configure request failed, request={}, message={}",
+ request.getStringForLogging(),
+ response.getMessage());
+ throw new SFException(ErrorCode.CLIENT_CONFIGURE_FAILURE, response.getMessage());
+ }
+ return response;
+ }
+
+ /**
+ * Opens a channel given a {@link OpenChannelRequestInternal}.
+ *
+ * @param request the open channel request
+ * @return the response from the open channel request
+ */
+ OpenChannelResponse openChannel(OpenChannelRequestInternal request)
+ throws IngestResponseException, IOException {
+ OpenChannelResponse response =
+ executeApiRequestWithRetries(
+ OpenChannelResponse.class,
+ request,
+ OPEN_CHANNEL_ENDPOINT,
+ "open channel",
+ STREAMING_OPEN_CHANNEL);
+
+ if (response.getStatusCode() != RESPONSE_SUCCESS) {
+ logger.logDebug(
+ "Open channel request failed, request={}, response={}",
+ request.getStringForLogging(),
+ response.getMessage());
+ throw new SFException(ErrorCode.OPEN_CHANNEL_FAILURE, response.getMessage());
+ }
+ return response;
+ }
+
+ /**
+ * Drops a channel given a {@link DropChannelRequestInternal}.
+ *
+ * @param request the drop channel request
+ * @return the response from the drop channel request
+ */
+ DropChannelResponse dropChannel(DropChannelRequestInternal request)
+ throws IngestResponseException, IOException {
+ DropChannelResponse response =
+ executeApiRequestWithRetries(
+ DropChannelResponse.class,
+ request,
+ DROP_CHANNEL_ENDPOINT,
+ "drop channel",
+ STREAMING_DROP_CHANNEL);
+
+ if (response.getStatusCode() != RESPONSE_SUCCESS) {
+ logger.logDebug(
+ "Drop channel request failed, request={}, response={}",
+ request.getStringForLogging(),
+ response.getMessage());
+ throw new SFException(ErrorCode.DROP_CHANNEL_FAILURE, response.getMessage());
+ }
+ return response;
+ }
+
+ /**
+ * Gets the status of a channel given a {@link ChannelsStatusRequest}.
+ *
+ * @param request the channel status request
+ * @return the response from the channel status request
+ */
+ ChannelsStatusResponse getChannelStatus(ChannelsStatusRequest request)
+ throws IngestResponseException, IOException {
+ ChannelsStatusResponse response =
+ executeApiRequestWithRetries(
+ ChannelsStatusResponse.class,
+ request,
+ CHANNEL_STATUS_ENDPOINT,
+ "channel status",
+ STREAMING_CHANNEL_STATUS);
+
+ if (response.getStatusCode() != RESPONSE_SUCCESS) {
+ logger.logDebug(
+ "Channel status request failed, request={}, response={}",
+ request.getStringForLogging(),
+ response.getMessage());
+ throw new SFException(ErrorCode.CHANNEL_STATUS_FAILURE, response.getMessage());
+ }
+ return response;
+ }
+
+ /**
+ * Registers a blob given a {@link RegisterBlobRequest}.
+ *
+ * @param request the register blob request
+ * @param executionCount the number of times the request has been executed, used for logging
+ * @return the response from the register blob request
+ */
+ RegisterBlobResponse registerBlob(RegisterBlobRequest request, final int executionCount)
+ throws IngestResponseException, IOException {
+ RegisterBlobResponse response =
+ executeApiRequestWithRetries(
+ RegisterBlobResponse.class,
+ request,
+ REGISTER_BLOB_ENDPOINT,
+ "register blob",
+ STREAMING_REGISTER_BLOB);
+
+ if (response.getStatusCode() != RESPONSE_SUCCESS) {
+ logger.logDebug(
+ "Register blob request failed, request={}, response={}, executionCount={}",
+ request.getStringForLogging(),
+ response.getMessage(),
+ executionCount);
+ throw new SFException(ErrorCode.REGISTER_BLOB_FAILURE, response.getMessage());
+ }
+ return response;
+ }
+
+ private T executeApiRequestWithRetries(
+ Class responseClass,
+ IStreamingIngestRequest request,
+ String endpoint,
+ String operation,
+ ServiceResponseHandler.ApiName apiName)
+ throws IngestResponseException, IOException {
+ return executeWithRetries(
+ responseClass, endpoint, request, operation, apiName, this.httpClient, this.requestBuilder);
+ }
+}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
index 58e81d116..ca0bbe782 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
+ * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/
package net.snowflake.ingest.streaming.internal;
@@ -45,6 +45,10 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
// Reference to the row buffer
private final RowBuffer rowBuffer;
+ private final long insertThrottleIntervalInMs;
+ private final int insertThrottleThresholdInBytes;
+ private final int insertThrottleThresholdInPercentage;
+ private final long maxMemoryLimitInBytes;
// Indicates whether the channel is closed
private volatile boolean isClosed;
@@ -61,6 +65,9 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
// The latest cause of channel invalidation
private String invalidationCause;
+ private final MemoryInfoProvider memoryInfoProvider;
+ private volatile long freeMemoryInBytes = 0;
+
/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
@@ -121,6 +128,17 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
OffsetTokenVerificationFunction offsetTokenVerificationFunction) {
this.isClosed = false;
this.owningClient = client;
+
+ this.insertThrottleIntervalInMs =
+ this.owningClient.getParameterProvider().getInsertThrottleIntervalInMs();
+ this.insertThrottleThresholdInBytes =
+ this.owningClient.getParameterProvider().getInsertThrottleThresholdInBytes();
+ this.insertThrottleThresholdInPercentage =
+ this.owningClient.getParameterProvider().getInsertThrottleThresholdInPercentage();
+ this.maxMemoryLimitInBytes =
+ this.owningClient.getParameterProvider().getMaxMemoryLimitInBytes();
+
+ this.memoryInfoProvider = MemoryInfoProviderFromRuntime.getInstance();
this.channelFlushContext =
new ChannelFlushContext(
name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId);
@@ -373,7 +391,7 @@ public InsertValidationResponse insertRows(
Iterable