From 92203cd124f00dc7aa535a761005ca9dcabab708 Mon Sep 17 00:00:00 2001 From: Ryan Sun Date: Tue, 4 Mar 2025 09:24:44 -0800 Subject: [PATCH 1/6] Added params and finished Todos --- .../client/core/SFBaseStatement.java | 4 +- .../snowflake/client/core/SFStatement.java | 41 +++++++++++-------- .../client/jdbc/SnowflakeStatementV1.java | 9 +++- .../client/jdbc/MockConnectionTest.java | 11 +++++ 4 files changed, 45 insertions(+), 20 deletions(-) diff --git a/src/main/java/net/snowflake/client/core/SFBaseStatement.java b/src/main/java/net/snowflake/client/core/SFBaseStatement.java index ad4a4a276..4e25c50e7 100644 --- a/src/main/java/net/snowflake/client/core/SFBaseStatement.java +++ b/src/main/java/net/snowflake/client/core/SFBaseStatement.java @@ -84,10 +84,10 @@ public abstract SFBaseResultSet execute( throws SQLException, SFException; /** - * Executes the given SQL string. + * Executes the given SQL string, with dataframe AST parameter. * * @param sql The SQL string to execute, synchronously. - * @param dataframeAst ... + * @param dataframeAst encoded string representation of the dataframe AST * @param parametersBinding parameters to bind * @param caller the JDBC interface method that called this method, if any * @param execTimeData OOB telemetry object to record timings diff --git a/src/main/java/net/snowflake/client/core/SFStatement.java b/src/main/java/net/snowflake/client/core/SFStatement.java index f1c1d3311..0d44a0f79 100644 --- a/src/main/java/net/snowflake/client/core/SFStatement.java +++ b/src/main/java/net/snowflake/client/core/SFStatement.java @@ -115,7 +115,7 @@ private void sanityCheckQuery(String sql) throws SQLException { * Execute SQL query with an option for describe only * * @param sql sql statement - * @param dataframeAst ... + * @param dataframeAst encoded string representation of the dataframe AST * @param describeOnly true if describe only * @return query result set * @throws SQLException if connection is already closed @@ -130,23 +130,25 @@ private SFBaseResultSet executeQuery( CallingMethod caller, ExecTimeTelemetryData execTimeData) throws SQLException, SFException { - sanityCheckQuery(sql); + if (dataframeAst == null) { + sanityCheckQuery(sql); + + String trimmedSql = sql.trim(); + + // snowflake specific client side commands + if (isFileTransfer(trimmedSql)) { + // Server side value or Connection string value is false then disable the PUT/GET command + if ((session != null && !(session.getJdbcEnablePutGet() && session.getEnablePutGet()))) { + // PUT/GET command disabled either on server side or in the client connection string + logger.debug("Executing file transfer locally is disabled: {}", sql); + throw new SnowflakeSQLException("File transfers have been disabled."); + } - String trimmedSql = sql.trim(); + // PUT/GET command + logger.debug("Executing file transfer locally: {}", sql); - // snowflake specific client side commands - if (isFileTransfer(trimmedSql)) { - // Server side value or Connection string value is false then disable the PUT/GET command - if ((session != null && !(session.getJdbcEnablePutGet() && session.getEnablePutGet()))) { - // PUT/GET command disabled either on server side or in the client connection string - logger.debug("Executing file transfer locally is disabled: {}", sql); - throw new SnowflakeSQLException("File transfers have been disabled."); + return executeFileTransfer(sql); } - - // PUT/GET command - logger.debug("Executing file transfer locally: {}", sql); - - return executeFileTransfer(sql); } // NOTE: It is intentional two describeOnly parameters are specified. @@ -191,6 +193,7 @@ public SFPreparedStatementMetaData describe(String sql) throws SFException, SQLE *

* * @param sql sql statement + * @param dataframeAst encoded string representation of the dataframe AST * @param parameterBindings binding information * @param describeOnly true if just showing result set metadata * @param internal true if internal command not showing up in the history @@ -321,6 +324,7 @@ public Void call() throws SQLException { * A helper method to build URL and submit the SQL to snowflake for exec * * @param sql sql statement + * @param dataframeAst encoded string representation of the dataframe AST * @param mediaType media type * @param bindValues map of binding values * @param describeOnly whether only show the result set metadata @@ -767,7 +771,7 @@ private void cancelHelper(String sql, String mediaType, CancellationReason cance * Execute sql * * @param sql sql statement. - * @param dataframeAst ... + * @param dataframeAst encoded string representation of the dataframe AST * @param asyncExec is async exec * @param parametersBinding parameters to bind * @param caller the JDBC interface method that called this method, if any @@ -787,7 +791,10 @@ public SFBaseResultSet execute( throws SQLException, SFException { TelemetryService.getInstance().updateContext(session.getSnowflakeConnectionString()); - // todo: if (dataframeAst == null) + if (dataframeAst != null) { + return executeQuery(sql, dataframeAst, parametersBinding, false, asyncExec, caller, execTimeData); + } + sanityCheckQuery(sql); session.injectedDelay(); diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java index a6e797260..f31baedf2 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java @@ -175,7 +175,13 @@ public ResultSet executeAsyncQuery(String sql) throws SQLException { return rs; } - // todo: add doc + /** + * Execute SQL query asynchronously + * + * @param dataframeAst encoded string representation of the dataframe AST + * @return ResultSet + * @throws SQLException if @link{#executeQueryInternal(String, Map)} throws an exception + */ public ResultSet executeDataframeAst(String dataframeAst) throws SQLException { ExecTimeTelemetryData execTimeData = new ExecTimeTelemetryData("ResultSet Statement.executeQuery(String)", this.batchID); @@ -284,6 +290,7 @@ private void setQueryIdWhenValidOrNull(String queryId) { * Internal method for executing a query with bindings accepted. * * @param sql sql statement + * @param dataframeAst encoded string representation of the dataframe AST * @param asyncExec execute query asynchronously * @param parameterBindings parameters bindings * @return query result set diff --git a/src/test/java/net/snowflake/client/jdbc/MockConnectionTest.java b/src/test/java/net/snowflake/client/jdbc/MockConnectionTest.java index 65118cec6..9fe101966 100644 --- a/src/test/java/net/snowflake/client/jdbc/MockConnectionTest.java +++ b/src/test/java/net/snowflake/client/jdbc/MockConnectionTest.java @@ -576,6 +576,17 @@ public SFBaseResultSet execute( return new MockJsonResultSet(mockedResponse, sfSession); } + @Override + public SFBaseResultSet execute( + String sql, + String dataframeAst, + Map parametersBinding, + CallingMethod caller, + ExecTimeTelemetryData execTimeData) + throws SQLException, SFException { + return new MockJsonResultSet(mockedResponse, sfSession); + } + @Override public SFBaseResultSet asyncExecute( String sql, From 814e514cd48b92be6c0b2afda9c940d196c9ab9c Mon Sep 17 00:00:00 2001 From: Ryan Sun Date: Tue, 4 Mar 2025 13:32:16 -0800 Subject: [PATCH 2/6] Fixed dataframe==null change and testing fixes --- .../snowflake/client/core/SFStatement.java | 28 +++++++++---------- .../client/jdbc/SnowflakeStatementV1.java | 2 +- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/main/java/net/snowflake/client/core/SFStatement.java b/src/main/java/net/snowflake/client/core/SFStatement.java index 0d44a0f79..b7ce724c5 100644 --- a/src/main/java/net/snowflake/client/core/SFStatement.java +++ b/src/main/java/net/snowflake/client/core/SFStatement.java @@ -791,25 +791,23 @@ public SFBaseResultSet execute( throws SQLException, SFException { TelemetryService.getInstance().updateContext(session.getSnowflakeConnectionString()); - if (dataframeAst != null) { - return executeQuery(sql, dataframeAst, parametersBinding, false, asyncExec, caller, execTimeData); - } - - sanityCheckQuery(sql); + if (dataframeAst == null) { + sanityCheckQuery(sql); - session.injectedDelay(); + session.injectedDelay(); - if (session.getPreparedStatementLogging()) { - logger.info("Execute: {}", sql); - } else { - logger.debug("Execute: {}", sql); - } + if (session.getPreparedStatementLogging()) { + logger.info("Execute: {}", sql); + } else { + logger.debug("Execute: {}", sql); + } - String trimmedSql = sql.trim(); + String trimmedSql = sql.trim(); - if (trimmedSql.length() >= 20 && trimmedSql.toLowerCase().startsWith("set-sf-property")) { - executeSetProperty(sql); - return null; + if (trimmedSql.length() >= 20 && trimmedSql.toLowerCase().startsWith("set-sf-property")) { + executeSetProperty(sql); + return null; + } } return executeQuery(sql, dataframeAst, parametersBinding, false, asyncExec, caller, execTimeData); } diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java index f31baedf2..0ede8dc24 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java @@ -316,7 +316,7 @@ ResultSet executeQueryInternal( } else { sfResultSet = sfBaseStatement.execute( - sql, parameterBindings, SFBaseStatement.CallingMethod.EXECUTE_QUERY, execTimeData); + sql, dataframeAst, parameterBindings, SFBaseStatement.CallingMethod.EXECUTE_QUERY, execTimeData); resultSetMetadataHandler(sfResultSet); } sfResultSet.setSession(this.connection.getSFBaseSession()); From 744731460b4d03e148f77dbd26c800f82f45988b Mon Sep 17 00:00:00 2001 From: Ryan Sun Date: Wed, 5 Mar 2025 11:32:43 -0800 Subject: [PATCH 3/6] Added dataframeAst to QueryExecDTO, passes basic test --- src/main/java/net/snowflake/client/core/QueryExecDTO.java | 2 ++ src/main/java/net/snowflake/client/core/StmtUtil.java | 1 + 2 files changed, 3 insertions(+) diff --git a/src/main/java/net/snowflake/client/core/QueryExecDTO.java b/src/main/java/net/snowflake/client/core/QueryExecDTO.java index 71209262a..306a299bc 100644 --- a/src/main/java/net/snowflake/client/core/QueryExecDTO.java +++ b/src/main/java/net/snowflake/client/core/QueryExecDTO.java @@ -40,6 +40,7 @@ public class QueryExecDTO { public QueryExecDTO( String sqlText, + String dataframeAst, boolean describeOnly, Integer sequenceId, Map bindings, @@ -50,6 +51,7 @@ public QueryExecDTO( boolean internal, boolean asyncExec) { this.sqlText = sqlText; + this.dataframeAst = dataframeAst; this.describeOnly = describeOnly; this.sequenceId = sequenceId; this.bindings = bindings; diff --git a/src/main/java/net/snowflake/client/core/StmtUtil.java b/src/main/java/net/snowflake/client/core/StmtUtil.java index f1749e7fd..69ebd3bf3 100644 --- a/src/main/java/net/snowflake/client/core/StmtUtil.java +++ b/src/main/java/net/snowflake/client/core/StmtUtil.java @@ -330,6 +330,7 @@ public static StmtOutput execute(StmtInput stmtInput, ExecTimeTelemetryData exec QueryExecDTO sqlJsonBody = new QueryExecDTO( stmtInput.sql, + stmtInput.dataframeAst, stmtInput.describeOnly, stmtInput.sequenceId, stmtInput.bindValues, From d9e07a0382786b09c0b7dfc3d23e7d30b68563b6 Mon Sep 17 00:00:00 2001 From: Ryan Sun Date: Thu, 6 Mar 2025 16:04:14 -0800 Subject: [PATCH 4/6] Resolve comments and add DataframeAst test --- .../snowflake/client/core/SFStatement.java | 1 + .../client/jdbc/SnowflakeStatementV1.java | 2 +- .../client/jdbc/DataframeAstTest.java | 71 +++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) create mode 100644 src/test/java/net/snowflake/client/jdbc/DataframeAstTest.java diff --git a/src/main/java/net/snowflake/client/core/SFStatement.java b/src/main/java/net/snowflake/client/core/SFStatement.java index b7ce724c5..e371caed1 100644 --- a/src/main/java/net/snowflake/client/core/SFStatement.java +++ b/src/main/java/net/snowflake/client/core/SFStatement.java @@ -791,6 +791,7 @@ public SFBaseResultSet execute( throws SQLException, SFException { TelemetryService.getInstance().updateContext(session.getSnowflakeConnectionString()); + // if dataframeAst is passed, then no need for sql checks and can skip if (dataframeAst == null) { sanityCheckQuery(sql); diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java index 0ede8dc24..177514c74 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java @@ -176,7 +176,7 @@ public ResultSet executeAsyncQuery(String sql) throws SQLException { } /** - * Execute SQL query asynchronously + * Execute SQL query * * @param dataframeAst encoded string representation of the dataframe AST * @return ResultSet diff --git a/src/test/java/net/snowflake/client/jdbc/DataframeAstTest.java b/src/test/java/net/snowflake/client/jdbc/DataframeAstTest.java new file mode 100644 index 000000000..1a6517634 --- /dev/null +++ b/src/test/java/net/snowflake/client/jdbc/DataframeAstTest.java @@ -0,0 +1,71 @@ +package net.snowflake.client.jdbc; + +import com.fasterxml.jackson.databind.ObjectMapper; +import net.snowflake.client.core.*; +import org.apache.http.client.methods.HttpPost; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class DataframeAstTest { + + @Test + public void testSendAst() throws SQLException, IOException { + String ast = "dummyAst"; + SnowflakeConnectionV1 mockedConn = mock(SnowflakeConnectionV1.class); + SFConnectionHandler mockedHandler = mock(SFConnectionHandler.class); + when(mockedConn.getHandler()).thenReturn(mockedHandler); + HttpClientSettingsKey mockedHttpClientSettingsKey = mock(HttpClientSettingsKey.class); + SFSession mockedSession = mock(SFSession.class); + SFBaseStatement sfBaseStatement = new SFStatement(mockedSession); + when(mockedHandler.getSFStatement()).thenReturn(sfBaseStatement); + when(mockedSession.getServerUrl()).thenReturn("dummy"); + when(mockedSession.getHttpClientKey()).thenReturn(mockedHttpClientSettingsKey); + when(mockedHttpClientSettingsKey.getGzipDisabled()).thenReturn(true); + + ArgumentCaptor captor = ArgumentCaptor.forClass(HttpPost.class); + try(MockedStatic mockedHttpUtil = Mockito.mockStatic(HttpUtil.class)) { + mockedHttpUtil.when(() -> HttpUtil.executeRequest( + captor.capture(), + anyInt(), + anyInt(), + anyInt(), + anyInt(), + anyInt(), + any(), + anyBoolean(), + anyBoolean(), + any(), + any() + )).thenReturn("dummy"); + SnowflakeStatementV1 stmt = new SnowflakeStatementV1( + mockedConn, + ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY, + ResultSet.CLOSE_CURSORS_AT_COMMIT); + stmt.executeDataframeAst(ast); + } catch (Exception e) { + // do nothing, let it terminate early + } + HttpPost result = captor.getValue(); + ObjectMapper mapper = new ObjectMapper(); + ByteArrayInputStream bais = (ByteArrayInputStream) result.getEntity().getContent(); + int size = bais.available(); + byte[] buffer = new byte[size]; + bais.read(buffer); + String resultStr = new String(buffer, StandardCharsets.UTF_8); + assert mapper.readTree(resultStr).get("dataframeAst").asText().equals(ast); + } +} \ No newline at end of file From 68e86d7da73dbcee527571dbf53528bde1cbaef3 Mon Sep 17 00:00:00 2001 From: Ryan Sun Date: Thu, 6 Mar 2025 16:06:15 -0800 Subject: [PATCH 5/6] Renamed function from Execute SQL->DataframeAST query --- .../java/net/snowflake/client/jdbc/SnowflakeStatementV1.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java index 177514c74..88e4a8480 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java @@ -176,7 +176,7 @@ public ResultSet executeAsyncQuery(String sql) throws SQLException { } /** - * Execute SQL query + * Execute dataframeAst query * * @param dataframeAst encoded string representation of the dataframe AST * @return ResultSet From fc005b53362699d5fa425d64546db10dbd811750 Mon Sep 17 00:00:00 2001 From: Ryan Sun Date: Mon, 10 Mar 2025 15:54:18 -0700 Subject: [PATCH 6/6] Added comment to describe if (dataframe == null) --- src/main/java/net/snowflake/client/core/SFStatement.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/net/snowflake/client/core/SFStatement.java b/src/main/java/net/snowflake/client/core/SFStatement.java index e371caed1..125f0ddc2 100644 --- a/src/main/java/net/snowflake/client/core/SFStatement.java +++ b/src/main/java/net/snowflake/client/core/SFStatement.java @@ -130,6 +130,7 @@ private SFBaseResultSet executeQuery( CallingMethod caller, ExecTimeTelemetryData execTimeData) throws SQLException, SFException { + // if dataframeAst is passed, then can skip sql checks if (dataframeAst == null) { sanityCheckQuery(sql);