Skip to content

Commit 393f302

Browse files
SNOW-1950183 Resolve Iceberg ingestion filename mismatch between upload and registration (#958)
1 parent 7f1f7e4 commit 393f302

File tree

8 files changed

+227
-149
lines changed

8 files changed

+227
-149
lines changed

src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
2+
* Copyright (c) 2021-2025 Snowflake Computing Inc. All rights reserved.
33
*/
44

55
package net.snowflake.ingest.streaming.internal;
@@ -660,13 +660,8 @@ BlobMetadata upload(
660660

661661
Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
662662

663-
// The returned etag is non-empty ONLY in case of iceberg uploads. With iceberg files, the XP
664-
// scanner expects the
665-
// MD5 value to exactly match the etag value in S3. This assumption doesn't hold when multipart
666-
// upload kicks in,
667-
// causing scan time failures and table corruption. By plugging in the etag value instead of the
668-
// md5 value,
669-
Optional<String> etag = storage.put(blobPath, blob);
663+
// The returned icebergPostUploadMetadata is non-empty if and only if it's iceberg uploads.
664+
Optional<IcebergPostUploadMetadata> icebergPostUploadMetadata = storage.put(blobPath, blob);
670665

671666
if (uploadContext != null) {
672667
blobStats.setUploadDurationMs(uploadContext);
@@ -677,17 +672,22 @@ BlobMetadata upload(
677672
}
678673

679674
logger.logInfo(
680-
"Finish uploading blob={}, size={}, timeInMillis={}, etag={}",
675+
"Finish uploading blob={}, size={}, timeInMillis={}, icebergPostUploadMetadata={}",
681676
blobPath.fileRegistrationPath,
682677
blob.length,
683678
System.currentTimeMillis() - startTime,
684-
etag);
679+
icebergPostUploadMetadata);
685680

686681
// at this point we know for sure if the BDEC file has data for more than one chunk, i.e.
687682
// spans mixed tables or not
688683
return BlobMetadata.createBlobMetadata(
689-
blobPath.fileRegistrationPath,
690-
etag.isPresent() ? etag.get() : BlobBuilder.computeMD5(blob),
684+
icebergPostUploadMetadata
685+
.map(IcebergPostUploadMetadata::getBlobPath)
686+
.orElse(blobPath)
687+
.fileRegistrationPath,
688+
icebergPostUploadMetadata
689+
.flatMap(IcebergPostUploadMetadata::getEtag)
690+
.orElse(BlobBuilder.computeMD5(blob)),
691691
bdecVersion,
692692
metadata,
693693
blobStats,

src/main/java/net/snowflake/ingest/streaming/internal/IStorage.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
2+
* Copyright (c) 2024-2025 Snowflake Computing Inc. All rights reserved.
33
*/
44

55
package net.snowflake.ingest.streaming.internal;
@@ -17,8 +17,8 @@ interface IStorage {
1717
*
1818
* @param blobPath
1919
* @param blob
20-
* @return The String ETag returned by the upload. Can be null in situations where the underlying
21-
* layer does not have an ETag to return.
20+
* @return The IcebergPostUploadMetadata returned by the upload. Present iff and only if the
21+
* implementation is creating Iceberg blobs.
2222
*/
23-
Optional<String> put(BlobPath blobPath, byte[] blob);
23+
Optional<IcebergPostUploadMetadata> put(BlobPath blobPath, byte[] blob);
2424
}

src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
/*
2-
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
2+
* Copyright (c) 2024-2025 Snowflake Computing Inc. All rights reserved.
33
*/
44

55
package net.snowflake.ingest.streaming.internal;
66

7+
import com.google.common.annotations.VisibleForTesting;
78
import java.util.Optional;
89

910
/**
1011
* Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link
1112
* FlushService}
1213
*/
13-
interface IStorageManager {
14+
@VisibleForTesting
15+
public interface IStorageManager {
1416

1517
/** Default max upload retries for streaming ingest storage */
1618
int DEFAULT_MAX_UPLOAD_RETRIES = 5;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2025 Snowflake Computing Inc. All rights reserved.
3+
*/
4+
5+
package net.snowflake.ingest.streaming.internal;
6+
7+
import java.util.Optional;
8+
import javax.annotation.Nullable;
9+
10+
/** Stores Iceberg post-upload metadata. */
11+
class IcebergPostUploadMetadata {
12+
/**
13+
* With iceberg files, the XP scanner expects the MD5 value to exactly match the etag value in S3.
14+
* This assumption doesn't hold when multipart upload kicks in, causing scan time failures and
15+
* table corruption. By plugging in the etag value instead of the md5 value, we can ensure that
16+
* the XP scanner can successfully scan the file.
17+
*/
18+
private final @Nullable String etag;
19+
20+
/** The final uploaded blob path of the file within the table's external volume. */
21+
private final BlobPath blobPath;
22+
23+
/**
24+
* Constructor for IcebergPostUploadMetadata.
25+
*
26+
* @param etag The etag of the uploaded file.
27+
* @param blobPath The updated blob path of the file within the table's external volume.
28+
*/
29+
IcebergPostUploadMetadata(@Nullable String etag, BlobPath blobPath) {
30+
this.etag = etag;
31+
this.blobPath = blobPath;
32+
}
33+
34+
Optional<String> getEtag() {
35+
return Optional.ofNullable(etag);
36+
}
37+
38+
BlobPath getBlobPath() {
39+
return blobPath;
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return String.format("IcebergPostUploadMetadata(etag=%s, blobPath=%s)", etag, blobPath);
45+
}
46+
}

0 commit comments

Comments
 (0)