Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1890076: Ensure correct file is downloaded as stream #2043

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/java/net/snowflake/client/jdbc/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public enum ErrorCode {
GCP_SERVICE_ERROR(200061, SqlState.SYSTEM_ERROR),
AUTHENTICATOR_REQUEST_TIMEOUT(200062, SqlState.CONNECTION_EXCEPTION),
INVALID_STRUCT_DATA(200063, SqlState.DATA_EXCEPTION),
DISABLEOCSP_INSECUREMODE_VALUE_MISMATCH(200064, SqlState.INVALID_PARAMETER_VALUE);
DISABLEOCSP_INSECUREMODE_VALUE_MISMATCH(200064, SqlState.INVALID_PARAMETER_VALUE),
TOO_MANY_FILES_TO_DOWNLOAD_AS_STREAM(200065, SqlState.DATA_EXCEPTION);

public static final String errorMessageResource = "net.snowflake.client.jdbc.jdbc_error_messages";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFException;
Expand All @@ -61,12 +62,14 @@ public class SnowflakeConnectionV1 implements Connection, SnowflakeConnection {

/** Refer to all created and open statements from this connection */
private final Set<Statement> openStatements = ConcurrentHashMap.newKeySet();

// Injected delay for the purpose of connection timeout testing
// Any statement execution will sleep for the specified number of milliseconds
private final AtomicInteger _injectedDelay = new AtomicInteger(0);
private boolean isClosed;
private SQLWarning sqlWarnings = null;
private List<DriverPropertyInfo> missingProperties = null;

/**
* Amount of milliseconds a user is willing to tolerate for network related issues (e.g. HTTP
* 503/504) or database transient issues (e.g. GS not responding)
Expand All @@ -76,12 +79,14 @@ public class SnowflakeConnectionV1 implements Connection, SnowflakeConnection {
* <p>Default: 300 seconds
*/
private int networkTimeoutInMilli = 0; // in milliseconds

/* this should be set to Connection.TRANSACTION_READ_COMMITTED
* There may not be many implications here since the call to
* setTransactionIsolation doesn't do anything.
*/
private int transactionIsolation = Connection.TRANSACTION_NONE;
private SFBaseSession sfSession;

/** The SnowflakeConnectionImpl that provides the underlying physical-layer implementation */
private SFConnectionHandler sfConnectionHandler;

Expand Down Expand Up @@ -1037,6 +1042,12 @@ public InputStream downloadStream(String stageName, String sourceFileName, boole
// no file will be downloaded to this location
getCommand.append(" file:///tmp/ /*jdbc download stream*/");

// We cannot match whole sourceFileName since it may be different e.g. for git repositories so
// we match only raw filename
String[] split = sourceFileName.split("/");
String fileName = Pattern.quote(split[split.length - 1]);
getCommand.append(" PATTERN=\".*").append(fileName).append("$\"");

SFBaseFileTransferAgent transferAgent =
sfConnectionHandler.getFileTransferAgent(getCommand.toString(), stmt.getSFBaseStatement());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,18 @@ public InputStream downloadStream(String fileName) throws SnowflakeSQLException
remoteLocation remoteLocation = extractLocationAndPath(stageInfo.getLocation());

// when downloading files as stream there should be only one file in source files
// let's fail fast when more than one file matches instead of fetching random one
if (sourceFiles.size() > 1) {
throw new SnowflakeSQLException(
queryID,
SqlState.NO_DATA,
ErrorCode.TOO_MANY_FILES_TO_DOWNLOAD_AS_STREAM.getMessageCode(),
session,
"There are more than one file matching "
+ fileName
+ ": "
+ String.join(",", sourceFiles));
}
String sourceLocation =
sourceFiles.stream()
.findFirst()
Expand Down
73 changes: 73 additions & 0 deletions src/test/java/net/snowflake/client/jdbc/StreamLatestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import net.snowflake.client.annotations.DontRunOnGithubActions;
import net.snowflake.client.category.TestTags;
import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -237,4 +241,73 @@ public void testSpecialCharactersInFileName() throws SQLException, IOException {
}
}
}

/** Added > 3.21.0. Fixed regression introduced in 3.19.1 */
@Test
public void shouldDownloadStreamInDeterministicWay() throws Exception {
try (Connection conn = getConnection();
Statement stat = conn.createStatement()) {
String randomStage = "test" + UUID.randomUUID().toString().replaceAll("-", "");
try {
stat.execute("CREATE OR REPLACE STAGE " + randomStage);
String randomDir = UUID.randomUUID().toString();
String sourceFilePathWithoutExtension = getFullPathFileInResource("test_file");
String sourceFilePathWithExtension = getFullPathFileInResource("test_file.csv");
String stageDest = String.format("@%s/%s", randomStage, randomDir);
putFile(stat, sourceFilePathWithExtension, stageDest, false);
putFile(stat, sourceFilePathWithoutExtension, stageDest, false);
putFile(stat, sourceFilePathWithExtension, stageDest, true);
putFile(stat, sourceFilePathWithoutExtension, stageDest, true);
expectsFilesOnStage(stat, stageDest, 4);
String stageName = "@" + randomStage;
downloadStreamExpectingContent(
conn, stageName, randomDir + "/test_file.gz", true, "I am a file without extension");
downloadStreamExpectingContent(
conn, stageName, randomDir + "/test_file.csv.gz", true, "I am a file with extension");
downloadStreamExpectingContent(
conn, stageName, randomDir + "/test_file", false, "I am a file without extension");
downloadStreamExpectingContent(
conn, stageName, randomDir + "/test_file.csv", false, "I am a file with extension");
} finally {
stat.execute("DROP STAGE IF EXISTS " + randomStage);
}
}
}

private static void downloadStreamExpectingContent(
Connection conn,
String stageName,
String fileName,
boolean decompress,
String expectedFileContent)
throws IOException, SQLException {
try (InputStream inputStream =
conn.unwrap(SnowflakeConnectionV1.class)
.downloadStream(stageName, fileName, decompress);
InputStreamReader isr = new InputStreamReader(inputStream);
BufferedReader br = new BufferedReader(isr)) {
String content = br.lines().collect(Collectors.joining("\n"));
assertEquals(expectedFileContent, content);
}
}

private static void expectsFilesOnStage(Statement stmt, String stageDest, int expectCount)
throws SQLException {
int filesInStageDir = 0;
try (ResultSet rs = stmt.executeQuery("LIST " + stageDest)) {
while (rs.next()) {
++filesInStageDir;
}
}
assertEquals(expectCount, filesInStageDir);
}

private static boolean putFile(
Statement stmt, String localFileName, String stageDest, boolean autoCompress)
throws SQLException {
return stmt.execute(
String.format(
"PUT file://%s %s AUTO_COMPRESS=%s",
localFileName, stageDest, String.valueOf(autoCompress).toUpperCase()));
}
}
1 change: 1 addition & 0 deletions src/test/resources/test_file
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I am a file without extension
1 change: 1 addition & 0 deletions src/test/resources/test_file.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
I am a file with extension
Loading