Skip to content

Commit 0c54d6f

Browse files
SNOW-1234214 Native Arrow structured types - map support (#1686)
1 parent e4f2f94 commit 0c54d6f

9 files changed

+206
-53
lines changed

src/main/java/net/snowflake/client/core/ArrowSqlInput.java

+13-6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.sql.Timestamp;
1515
import java.util.Iterator;
1616
import java.util.List;
17+
import java.util.Map;
1718
import java.util.TimeZone;
1819
import net.snowflake.client.core.json.Converters;
1920
import net.snowflake.client.core.structs.SQLDataCreationHelper;
@@ -24,16 +25,22 @@
2425
@SnowflakeJdbcInternalApi
2526
public class ArrowSqlInput extends BaseSqlInput {
2627

28+
private final Map<String, Object> input;
2729
private final Iterator<Object> structuredTypeFields;
2830
private int currentIndex = 0;
2931

3032
public ArrowSqlInput(
31-
JsonStringHashMap<String, Object> input,
33+
Map<String, Object> input,
3234
SFBaseSession session,
3335
Converters converters,
3436
List<FieldMetadata> fields) {
3537
super(session, converters, fields);
3638
this.structuredTypeFields = input.values().iterator();
39+
this.input = input;
40+
}
41+
42+
public Map<String, Object> getInput() {
43+
return input;
3744
}
3845

3946
@Override
@@ -172,14 +179,17 @@ public Timestamp readTimestamp(TimeZone tz) throws SQLException {
172179
if (value == null) {
173180
return null;
174181
}
182+
int columnType = ColumnTypeHelper.getColumnType(fieldMetadata.getType(), session);
183+
int columnSubType = fieldMetadata.getType();
175184
int scale = fieldMetadata.getScale();
176185
return mapSFExceptionToSQLException(
177186
() ->
178187
converters
179188
.getStructuredTypeDateTimeConverter()
180189
.getTimestamp(
181190
(JsonStringHashMap<String, Object>) value,
182-
fieldMetadata.getBase(),
191+
columnType,
192+
columnSubType,
183193
tz,
184194
scale));
185195
});
@@ -204,10 +214,7 @@ public <T> T readObject(Class<T> type) throws SQLException {
204214
SQLData instance = (SQLData) SQLDataCreationHelper.create(type);
205215
instance.readSQL(
206216
new ArrowSqlInput(
207-
(JsonStringHashMap<String, Object>) value,
208-
session,
209-
converters,
210-
fieldMetadata.getFields()),
217+
(Map<String, Object>) value, session, converters, fieldMetadata.getFields()),
211218
null);
212219
return (T) instance;
213220
});

src/main/java/net/snowflake/client/core/SFArrowResultSet.java

+32-6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.sql.Time;
2020
import java.sql.Timestamp;
2121
import java.sql.Types;
22+
import java.util.Map;
2223
import java.util.TimeZone;
2324
import net.snowflake.client.core.arrow.ArrowVectorConverter;
2425
import net.snowflake.client.core.arrow.StructConverter;
@@ -104,7 +105,7 @@ public class SFArrowResultSet extends SFBaseResultSet implements DataConversionC
104105
*/
105106
private boolean formatDateWithTimezone;
106107

107-
@SnowflakeJdbcInternalApi protected Converters jsonConverters;
108+
@SnowflakeJdbcInternalApi protected Converters converters;
108109

109110
/**
110111
* Constructor takes a result from the API response that we get from executing a SQL statement.
@@ -124,7 +125,7 @@ public SFArrowResultSet(
124125
boolean sortResult)
125126
throws SQLException {
126127
this(resultSetSerializable, session.getTelemetryClient(), sortResult);
127-
this.jsonConverters =
128+
this.converters =
128129
new Converters(
129130
resultSetSerializable.getTimeZone(),
130131
session,
@@ -356,6 +357,31 @@ private boolean fetchNextRowSorted() throws SnowflakeSQLException {
356357
}
357358
}
358359

360+
@Override
361+
@SnowflakeJdbcInternalApi
362+
public Converters getConverters() {
363+
return converters;
364+
}
365+
366+
@Override
367+
public Date convertToDate(Object object, TimeZone tz) throws SFException {
368+
return converters.getStructuredTypeDateTimeConverter().getDate((int) object, tz);
369+
}
370+
371+
@Override
372+
public Time convertToTime(Object object, int scale) throws SFException {
373+
return converters.getStructuredTypeDateTimeConverter().getTime((long) object, scale);
374+
}
375+
376+
@Override
377+
public Timestamp convertToTimestamp(
378+
Object object, int columnType, int columnSubType, TimeZone tz, int scale) throws SFException {
379+
return converters
380+
.getStructuredTypeDateTimeConverter()
381+
.getTimestamp(
382+
(JsonStringHashMap<String, Object>) object, columnType, columnSubType, tz, scale);
383+
}
384+
359385
/**
360386
* Advance to next row
361387
*
@@ -510,7 +536,7 @@ public Object getObject(int columnIndex) throws SFException {
510536
if (converter instanceof VarCharConverter) {
511537
return createJsonSqlInput(columnIndex, obj);
512538
} else if (converter instanceof StructConverter) {
513-
return createArrowSqlInput(columnIndex, (JsonStringHashMap<String, Object>) obj);
539+
return createArrowSqlInput(columnIndex, (Map<String, Object>) obj);
514540
}
515541
}
516542
return obj;
@@ -522,19 +548,19 @@ private Object createJsonSqlInput(int columnIndex, Object obj) throws SFExceptio
522548
return new JsonSqlInput(
523549
jsonNode,
524550
session,
525-
jsonConverters,
551+
converters,
526552
resultSetMetaData.getColumnMetadata().get(columnIndex - 1).getFields(),
527553
sessionTimezone);
528554
} catch (JsonProcessingException e) {
529555
throw new SFException(e, ErrorCode.INVALID_STRUCT_DATA);
530556
}
531557
}
532558

533-
private Object createArrowSqlInput(int columnIndex, JsonStringHashMap<String, Object> input) {
559+
private Object createArrowSqlInput(int columnIndex, Map<String, Object> input) {
534560
return new ArrowSqlInput(
535561
input,
536562
session,
537-
jsonConverters,
563+
converters,
538564
resultSetMetaData.getColumnMetadata().get(columnIndex - 1).getFields());
539565
}
540566

src/main/java/net/snowflake/client/core/SFBaseResultSet.java

+10
Original file line numberDiff line numberDiff line change
@@ -213,4 +213,14 @@ public Converters getConverters() {
213213
public TimeZone getSessionTimeZone() {
214214
return resultSetSerializable.getTimeZone();
215215
}
216+
217+
@SnowflakeJdbcInternalApi
218+
public abstract Date convertToDate(Object object, TimeZone tz) throws SFException;
219+
220+
@SnowflakeJdbcInternalApi
221+
public abstract Time convertToTime(Object object, int scale) throws SFException;
222+
223+
@SnowflakeJdbcInternalApi
224+
public abstract Timestamp convertToTimestamp(
225+
Object object, int columnType, int columnSubType, TimeZone tz, int scale) throws SFException;
216226
}

src/main/java/net/snowflake/client/core/SFJsonResultSet.java

+19
Original file line numberDiff line numberDiff line change
@@ -426,4 +426,23 @@ private static Object convert(JsonStringToTypeConverter converter, JsonNode node
426426
return converter.convert(node.toString());
427427
}
428428
}
429+
430+
@Override
431+
public Date convertToDate(Object object, TimeZone tz) throws SFException {
432+
return (Date) converters.dateConverter(session).convert((String) object);
433+
}
434+
435+
@Override
436+
public Time convertToTime(Object object, int scale) throws SFException {
437+
return (Time) converters.timeConverter(session).convert((String) object);
438+
}
439+
440+
@Override
441+
public Timestamp convertToTimestamp(
442+
Object object, int columnType, int columnSubType, TimeZone tz, int scale) throws SFException {
443+
return (Timestamp)
444+
converters
445+
.timestampConverter(columnSubType, columnType, scale, session, null, tz)
446+
.convert((String) object);
447+
}
429448
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package net.snowflake.client.core.arrow;
2+
3+
import java.util.List;
4+
import java.util.stream.Collectors;
5+
import net.snowflake.client.core.DataConversionContext;
6+
import net.snowflake.client.core.SFException;
7+
import net.snowflake.client.jdbc.SnowflakeType;
8+
import org.apache.arrow.vector.complex.MapVector;
9+
import org.apache.arrow.vector.util.JsonStringHashMap;
10+
11+
public class MapConverter extends AbstractArrowVectorConverter {
12+
13+
private final MapVector vector;
14+
15+
public MapConverter(MapVector valueVector, int columnIndex, DataConversionContext context) {
16+
super(SnowflakeType.MAP.name(), valueVector, columnIndex, context);
17+
this.vector = valueVector;
18+
}
19+
20+
@Override
21+
public Object toObject(int index) throws SFException {
22+
List<JsonStringHashMap<String, Object>> entriesList =
23+
(List<JsonStringHashMap<String, Object>>) vector.getObject(index);
24+
return entriesList.stream()
25+
.collect(
26+
Collectors.toMap(entry -> entry.get("key").toString(), entry -> entry.get("value")));
27+
}
28+
29+
@Override
30+
public String toString(int index) throws SFException {
31+
return vector.getObject(index).toString();
32+
}
33+
}

src/main/java/net/snowflake/client/core/arrow/StructuredTypeDateTimeConverter.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
import java.sql.Date;
1212
import java.sql.Time;
1313
import java.sql.Timestamp;
14+
import java.sql.Types;
1415
import java.util.TimeZone;
1516
import net.snowflake.client.core.SFException;
1617
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
1718
import net.snowflake.client.jdbc.ErrorCode;
18-
import net.snowflake.client.jdbc.SnowflakeType;
19+
import net.snowflake.client.jdbc.SnowflakeUtil;
1920
import org.apache.arrow.vector.util.JsonStringHashMap;
2021

2122
@SnowflakeJdbcInternalApi
@@ -45,22 +46,33 @@ public StructuredTypeDateTimeConverter(
4546
}
4647

4748
public Timestamp getTimestamp(
48-
JsonStringHashMap<String, Object> obj, SnowflakeType type, TimeZone tz, int scale)
49+
JsonStringHashMap<String, Object> obj,
50+
int columnType,
51+
int columnSubType,
52+
TimeZone tz,
53+
int scale)
4954
throws SFException {
5055
if (tz == null) {
5156
tz = TimeZone.getDefault();
5257
}
53-
switch (type) {
54-
case TIMESTAMP_LTZ:
58+
if (Types.TIMESTAMP == columnType) {
59+
if (SnowflakeUtil.EXTRA_TYPES_TIMESTAMP_LTZ == columnSubType) {
5560
return convertTimestampLtz(obj, scale);
56-
case TIMESTAMP_NTZ:
61+
} else {
5762
return convertTimestampNtz(obj, tz, scale);
58-
case TIMESTAMP_TZ:
59-
return convertTimestampTz(obj, scale);
63+
}
64+
} else if (Types.TIMESTAMP_WITH_TIMEZONE == columnType
65+
&& SnowflakeUtil.EXTRA_TYPES_TIMESTAMP_TZ == columnSubType) {
66+
return convertTimestampTz(obj, scale);
6067
}
6168
throw new SFException(
6269
ErrorCode.INVALID_VALUE_CONVERT,
63-
"Unexpected Arrow Field for " + type.name() + " and object type " + obj.getClass());
70+
"Unexpected Arrow Field for columnType "
71+
+ columnType
72+
+ " , column subtype "
73+
+ columnSubType
74+
+ " , and object type "
75+
+ obj.getClass());
6476
}
6577

6678
public Date getDate(int value, TimeZone tz) throws SFException {

src/main/java/net/snowflake/client/jdbc/ArrowResultChunk.java

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import net.snowflake.client.core.arrow.IntToFixedConverter;
2828
import net.snowflake.client.core.arrow.IntToScaledFixedConverter;
2929
import net.snowflake.client.core.arrow.IntToTimeConverter;
30+
import net.snowflake.client.core.arrow.MapConverter;
3031
import net.snowflake.client.core.arrow.SmallIntToFixedConverter;
3132
import net.snowflake.client.core.arrow.SmallIntToScaledFixedConverter;
3233
import net.snowflake.client.core.arrow.StructConverter;
@@ -55,6 +56,7 @@
5556
import org.apache.arrow.vector.VarBinaryVector;
5657
import org.apache.arrow.vector.VarCharVector;
5758
import org.apache.arrow.vector.VectorSchemaRoot;
59+
import org.apache.arrow.vector.complex.MapVector;
5860
import org.apache.arrow.vector.complex.StructVector;
5961
import org.apache.arrow.vector.ipc.ArrowStreamReader;
6062
import org.apache.arrow.vector.types.Types;
@@ -206,6 +208,10 @@ private static List<ArrowVectorConverter> initConverters(
206208
converters.add(new VarCharConverter(vector, i, context));
207209
break;
208210

211+
case MAP:
212+
converters.add(new MapConverter((MapVector) vector, i, context));
213+
break;
214+
209215
case OBJECT:
210216
if (vector instanceof StructVector) {
211217
converters.add(new StructConverter((StructVector) vector, i, context));

0 commit comments

Comments
 (0)