Skip to content

Commit 324a590

Browse files
Merge branch 'master' into sfc-gh-anavalos-test-unshaded
2 parents a0a1d18 + d807380 commit 324a590

File tree

12 files changed

+276
-181
lines changed

12 files changed

+276
-181
lines changed

README.md

+40-22
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ Snowflake Ingest Service Java SDK
88
The Snowflake Ingest Service SDK allows users to ingest files into their
99
Snowflake data warehouse in a programmatic fashion via key-pair
1010
authentication. Currently, we support ingestion through the following APIs:
11+
1112
1. [Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-gs.html#client-requirement-java-or-python-sdk)
1213
2. [Snowpipe Streaming](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview)
1314

1415
# Dependencies
1516

1617
The Snowflake Ingest Service SDK depends on the following libraries:
1718

18-
* snowflake-jdbc (3.16.1+)
19+
* snowflake-jdbc (3.16.1+)
1920
* slf4j-api
2021
* com.github.luben:zstd-jni (1.5.0-1)
2122

@@ -63,13 +64,16 @@ dependencies {
6364

6465
## Jar Versions
6566

66-
The Snowflake Ingest SDK provides shaded and unshaded versions of its jar. The shaded version bundles the dependencies into its own jar,
67-
whereas the unshaded version declares its dependencies in `pom.xml`, which are fetched as standard transitive dependencies by the build system like Maven or Gradle.
68-
The shaded JAR can help avoid potential dependency conflicts, but the unshaded version provides finer graned control over transitive dependencies.
67+
The Snowflake Ingest SDK provides shaded and unshaded versions of its jar. The shaded version bundles the dependencies
68+
into its own jar, whereas the unshaded version declares its dependencies in `pom.xml`, which are fetched as standard transitive
69+
dependencies by the build system like Maven or Gradle.
70+
The shaded JAR can help avoid potential dependency conflicts, but the unshaded version provides finer graned control
71+
over transitive dependencies.
6972

7073
## Using with snowflake-jdbc-fips
7174

72-
For use cases, which need to use `snowflake-jdbc-fips` instead of the default `snowflake-jdbc`, we recommend to take the following steps:
75+
For use cases, which need to use `snowflake-jdbc-fips` instead of the default `snowflake-jdbc`, we recommend to take the
76+
following steps:
7377

7478
- Use the unshaded version of the Ingest SDK.
7579
- Exclude these transitive dependencies:
@@ -78,7 +82,8 @@ For use cases, which need to use `snowflake-jdbc-fips` instead of the default `s
7882
- `org.bouncycastle:bcprov-jdk18on`
7983
- Add a dependency on `snowflake-jdbc-fips`.
8084

81-
See [this test](https://github.com/snowflakedb/snowflake-ingest-java/tree/master/e2e-jar-test/fips) for an example how to use Snowflake Ingest SDK together with Snowflake FIPS JDBC Driver.
85+
See [this test](https://github.com/snowflakedb/snowflake-ingest-java/tree/master/e2e-jar-test/fips) for an example how
86+
to use Snowflake Ingest SDK together with Snowflake FIPS JDBC Driver.
8287

8388
# Example
8489

@@ -89,16 +94,23 @@ Check out `SnowflakeIngestBasicExample.java`
8994
## Snowpipe Streaming
9095

9196
Check out `SnowflakeStreamingIngestExample.java`, which performs following operations:
92-
1. Reads a JSON file which contains details regarding Snowflake Account, User, Role and Private Key. Take a look at `profile_streaming.json.example` for more details.
93-
1. [Here](https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication) are the steps required to generate a private key.
94-
2. Creates a `SnowflakeStreamingIngestClient` which can be used to open one or more Streaming Channels pointing to the same or different tables.
97+
98+
1. Reads a JSON file which contains details regarding Snowflake Account, User, Role and Private Key. Take a look at
99+
`profile_streaming.json.example` for more details.
100+
1. [Here](https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication) are the
101+
steps required to generate a private key.
102+
2. Creates a `SnowflakeStreamingIngestClient` which can be used to open one or more Streaming Channels pointing to the
103+
same or different tables.
95104
3. Creates a `SnowflakeStreamingIngestChannel` against a Database, Schema and Table.
96-
1. Please note: The database, schema and table is expected to be present before opening the Channel. Example SQL queries to create them:
105+
1. Please note: The database, schema and table is expected to be present before opening the Channel. Example SQL
106+
queries to create them:
107+
97108
```sql
98109
create or replace database MY_DATABASE;
99110
create or replace schema MY_SCHEMA;
100111
create or replace table MY_TABLE(c1 number);
101112
```
113+
102114
4. Inserts 1000 rows into the channel created in 3rd step using the `insertRows` API on the Channel object
103115
1. `insertRows` API also takes in an optional `offsetToken` String which can be associated to this batch of rows.
104116
5. Calls `getLatestCommittedOffsetToken` on the channel until the appropriate offset is found in Snowflake.
@@ -123,28 +135,34 @@ mvn package
123135
However, for general usage, pulling a pre-built jar from maven is
124136
recommended.
125137

126-
If you would like to run SnowflakeIngestBasicExample.java or SnowflakeStreamingIngestExample.java in the example folder,
138+
If you would like to run SnowflakeIngestBasicExample.java or SnowflakeStreamingIngestExample.java in the example folder,
127139
please edit `pom.xml` and change the scope of the dependency `slf4j-simple` from `test` to `runtime` in order to enable
128-
console log output.
129-
140+
console log output. For changing the logging level or message format, updating the system property or a
141+
simplelogger.properties file on the classpath is required, please
142+
see https://www.slf4j.org/api/org/slf4j/simple/SimpleLogger.html for more details. If you run into formatting errors,
143+
please run format.sh to format the code.
130144

131145
# Testing (SimpleIngestIT Test)
132146

133-
- Modify `TestUtils.java` file and replace *PROFILE_PATH* with `profile.json.example` for testing.
147+
- Modify `TestUtils.java` file and replace *PROFILE_PATH* with `profile.json.example` for testing.
134148

135-
- `profile.json` is used because an encrypted file will be
136-
decrypted for Github Actions testing. Check `End2EndTest.yml`
149+
- `profile.json` is used because an encrypted file will be
150+
decrypted for Github Actions testing. Check `End2EndTest.yml`
137151

138-
- Use an unencrypted version(Only for testing) of private key while generating keys(private and public pair) using OpenSSL.
152+
- Use an unencrypted version(Only for testing) of private key while generating keys(private and public pair) using
153+
OpenSSL.
139154

140-
- Here is the link for documentation [Key Pair
141-
Generator](https://docs.snowflake.com/en/user-guide/key-pair-auth.html)
155+
- Here is the link for documentation [Key Pair
156+
Generator](https://docs.snowflake.com/en/user-guide/key-pair-auth.html)
142157

143158
# Contributing to this repo
144159

145-
Each PR must pass all required github action merge gates before approval and merge. In addition to those tests, you will need:
160+
Each PR must pass all required github action merge gates before approval and merge. In addition to those tests, you will
161+
need:
146162

147-
- Formatter: run this script [`./format.sh`](https://github.com/snowflakedb/snowflake-ingest-java/blob/master/format.sh) from root
148-
- CLA: all contributers must sign the Snowflake CLA. This is a one time signature, please provide your email so we can work with you to get this signed after you open a PR.
163+
- Formatter: run this script [`./format.sh`](https://github.com/snowflakedb/snowflake-ingest-java/blob/master/format.sh)
164+
from root
165+
- CLA: all contributers must sign the Snowflake CLA. This is a one time signature, please provide your email so we can
166+
work with you to get this signed after you open a PR.
149167

150168
Thank you for contributing! We will review and approve PRs as soon as we can.

e2e-jar-test/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
<dependency>
3333
<groupId>net.snowflake</groupId>
3434
<artifactId>snowflake-ingest-sdk</artifactId>
35-
<version>3.1.1</version>
35+
<version>3.1.2</version>
3636
</dependency>
3737

3838
<dependency>

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<!-- Arifact name and version information -->
1010
<groupId>net.snowflake</groupId>
1111
<artifactId>snowflake-ingest-sdk</artifactId>
12-
<version>3.1.1</version>
12+
<version>3.1.2</version>
1313
<packaging>jar</packaging>
1414
<name>Snowflake Ingest SDK</name>
1515
<description>Snowflake Ingest SDK</description>

src/main/java/net/snowflake/ingest/connection/RequestBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public class RequestBuilder {
110110
// Don't change!
111111
public static final String CLIENT_NAME = "SnowpipeJavaSDK";
112112

113-
public static final String DEFAULT_VERSION = "3.1.1";
113+
public static final String DEFAULT_VERSION = "3.1.2";
114114

115115
public static final String JAVA_USER_AGENT = "JAVA";
116116

src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
2+
* Copyright (c) 2021-2025 Snowflake Computing Inc. All rights reserved.
33
*/
44

55
package net.snowflake.ingest.streaming.internal;
@@ -660,13 +660,8 @@ BlobMetadata upload(
660660

661661
Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
662662

663-
// The returned etag is non-empty ONLY in case of iceberg uploads. With iceberg files, the XP
664-
// scanner expects the
665-
// MD5 value to exactly match the etag value in S3. This assumption doesn't hold when multipart
666-
// upload kicks in,
667-
// causing scan time failures and table corruption. By plugging in the etag value instead of the
668-
// md5 value,
669-
Optional<String> etag = storage.put(blobPath, blob);
663+
// The returned icebergPostUploadMetadata is non-empty if and only if it's iceberg uploads.
664+
Optional<IcebergPostUploadMetadata> icebergPostUploadMetadata = storage.put(blobPath, blob);
670665

671666
if (uploadContext != null) {
672667
blobStats.setUploadDurationMs(uploadContext);
@@ -677,17 +672,22 @@ BlobMetadata upload(
677672
}
678673

679674
logger.logInfo(
680-
"Finish uploading blob={}, size={}, timeInMillis={}, etag={}",
675+
"Finish uploading blob={}, size={}, timeInMillis={}, icebergPostUploadMetadata={}",
681676
blobPath.fileRegistrationPath,
682677
blob.length,
683678
System.currentTimeMillis() - startTime,
684-
etag);
679+
icebergPostUploadMetadata);
685680

686681
// at this point we know for sure if the BDEC file has data for more than one chunk, i.e.
687682
// spans mixed tables or not
688683
return BlobMetadata.createBlobMetadata(
689-
blobPath.fileRegistrationPath,
690-
etag.isPresent() ? etag.get() : BlobBuilder.computeMD5(blob),
684+
icebergPostUploadMetadata
685+
.map(IcebergPostUploadMetadata::getBlobPath)
686+
.orElse(blobPath)
687+
.fileRegistrationPath,
688+
icebergPostUploadMetadata
689+
.flatMap(IcebergPostUploadMetadata::getEtag)
690+
.orElse(BlobBuilder.computeMD5(blob)),
691691
bdecVersion,
692692
metadata,
693693
blobStats,

src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
2+
* Copyright (c) 2024-2025 Snowflake Computing Inc. All rights reserved.
33
*/
44

55
package net.snowflake.ingest.streaming.internal;
@@ -17,8 +17,8 @@ interface IStorage {
1717
*
1818
* @param blobPath
1919
* @param blob
20-
* @return The String ETag returned by the upload. Can be null in situations where the underlying
21-
* layer does not have an ETag to return.
20+
* @return The IcebergPostUploadMetadata returned by the upload. Present iff and only if the
21+
* implementation is creating Iceberg blobs.
2222
*/
23-
Optional<String> put(BlobPath blobPath, byte[] blob);
23+
Optional<IcebergPostUploadMetadata> put(BlobPath blobPath, byte[] blob);
2424
}

src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
/*
2-
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
2+
* Copyright (c) 2024-2025 Snowflake Computing Inc. All rights reserved.
33
*/
44

55
package net.snowflake.ingest.streaming.internal;
66

7+
import com.google.common.annotations.VisibleForTesting;
78
import java.util.Optional;
89

910
/**
1011
* Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link
1112
* FlushService}
1213
*/
13-
interface IStorageManager {
14+
@VisibleForTesting
15+
public interface IStorageManager {
1416

1517
/** Default max upload retries for streaming ingest storage */
1618
int DEFAULT_MAX_UPLOAD_RETRIES = 5;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2025 Snowflake Computing Inc. All rights reserved.
3+
*/
4+
5+
package net.snowflake.ingest.streaming.internal;
6+
7+
import java.util.Optional;
8+
import javax.annotation.Nullable;
9+
10+
/** Stores Iceberg post-upload metadata. */
11+
class IcebergPostUploadMetadata {
12+
/**
13+
* With iceberg files, the XP scanner expects the MD5 value to exactly match the etag value in S3.
14+
* This assumption doesn't hold when multipart upload kicks in, causing scan time failures and
15+
* table corruption. By plugging in the etag value instead of the md5 value, we can ensure that
16+
* the XP scanner can successfully scan the file.
17+
*/
18+
private final @Nullable String etag;
19+
20+
/** The final uploaded blob path of the file within the table's external volume. */
21+
private final BlobPath blobPath;
22+
23+
/**
24+
* Constructor for IcebergPostUploadMetadata.
25+
*
26+
* @param etag The etag of the uploaded file.
27+
* @param blobPath The updated blob path of the file within the table's external volume.
28+
*/
29+
IcebergPostUploadMetadata(@Nullable String etag, BlobPath blobPath) {
30+
this.etag = etag;
31+
this.blobPath = blobPath;
32+
}
33+
34+
Optional<String> getEtag() {
35+
return Optional.ofNullable(etag);
36+
}
37+
38+
BlobPath getBlobPath() {
39+
return blobPath;
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return String.format("IcebergPostUploadMetadata(etag=%s, blobPath=%s)", etag, blobPath);
45+
}
46+
}

0 commit comments

Comments
 (0)