|
4 | 4 |
|
5 | 5 | package net.snowflake.client.core;
|
6 | 6 |
|
| 7 | +import static net.snowflake.client.jdbc.SnowflakeUtil.getTimestampFromType; |
| 8 | + |
7 | 9 | import com.fasterxml.jackson.core.JsonProcessingException;
|
8 | 10 | import com.fasterxml.jackson.databind.JsonNode;
|
9 | 11 | import com.fasterxml.jackson.databind.ObjectMapper;
|
| 12 | +import com.fasterxml.jackson.databind.node.ArrayNode; |
10 | 13 | import java.math.BigDecimal;
|
| 14 | +import java.sql.Array; |
11 | 15 | import java.sql.Date;
|
12 | 16 | import java.sql.Time;
|
13 | 17 | import java.sql.Timestamp;
|
14 | 18 | import java.sql.Types;
|
| 19 | +import java.time.Instant; |
| 20 | +import java.time.ZoneOffset; |
| 21 | +import java.util.HashMap; |
| 22 | +import java.util.Iterator; |
| 23 | +import java.util.Map; |
| 24 | +import java.util.Spliterator; |
| 25 | +import java.util.Spliterators; |
15 | 26 | import java.util.TimeZone;
|
| 27 | +import java.util.concurrent.atomic.AtomicInteger; |
| 28 | +import java.util.stream.Stream; |
| 29 | +import java.util.stream.StreamSupport; |
16 | 30 | import net.snowflake.client.core.json.Converters;
|
17 | 31 | import net.snowflake.client.jdbc.ErrorCode;
|
| 32 | +import net.snowflake.client.jdbc.FieldMetadata; |
| 33 | +import net.snowflake.client.jdbc.SnowflakeColumnMetadata; |
18 | 34 | import net.snowflake.client.log.SFLogger;
|
19 | 35 | import net.snowflake.client.log.SFLoggerFactory;
|
| 36 | +import net.snowflake.client.util.TypeConverter; |
| 37 | +import net.snowflake.common.core.SFTimestamp; |
| 38 | +import net.snowflake.common.core.SnowflakeDateTimeFormat; |
20 | 39 |
|
21 | 40 | /** Abstract class used to represent snowflake result set in json format */
|
22 | 41 | public abstract class SFJsonResultSet extends SFBaseResultSet {
|
@@ -88,35 +107,22 @@ public Object getObject(int columnIndex) throws SFException {
|
88 | 107 | } else {
|
89 | 108 | throw new SFException(ErrorCode.FEATURE_UNSUPPORTED, "data type: " + type);
|
90 | 109 | }
|
| 110 | + case Types.ARRAY: |
| 111 | + if (Boolean.valueOf(System.getProperty(STRUCTURED_TYPE_ENABLED_PROPERTY_NAME))) { |
| 112 | + return getArray(columnIndex); |
| 113 | + } else { |
| 114 | + throw new SFException(ErrorCode.FEATURE_UNSUPPORTED, "data type: " + type); |
| 115 | + } |
91 | 116 |
|
92 | 117 | default:
|
93 | 118 | throw new SFException(ErrorCode.FEATURE_UNSUPPORTED, "data type: " + type);
|
94 | 119 | }
|
95 | 120 | }
|
96 | 121 |
|
97 |
| - private Object getSqlInput(String input, int columnIndex) throws SFException { |
98 |
| - try { |
99 |
| - JsonNode jsonNode = OBJECT_MAPPER.readTree(input); |
100 |
| - return new JsonSqlInput( |
101 |
| - jsonNode, |
102 |
| - session, |
103 |
| - converters, |
104 |
| - resultSetMetaData.getColumnMetadata().get(columnIndex - 1).getFields()); |
105 |
| - } catch (JsonProcessingException e) { |
106 |
| - throw new SFException(e, ErrorCode.INVALID_STRUCT_DATA); |
107 |
| - } |
108 |
| - } |
109 |
| - |
110 |
| - /** |
111 |
| - * Sometimes large BIGINTS overflow the java Long type. In these cases, return a BigDecimal type |
112 |
| - * instead. |
113 |
| - * |
114 |
| - * @param columnIndex the column index |
115 |
| - * @return an object of type long or BigDecimal depending on number size |
116 |
| - * @throws SFException |
117 |
| - */ |
118 |
| - private Object getBigInt(int columnIndex, Object obj) throws SFException { |
119 |
| - return converters.getNumberConverter().getBigInt(obj, columnIndex); |
| 122 | + @Override |
| 123 | + public Array getArray(int columnIndex) throws SFException { |
| 124 | + Object obj = getObjectInternal(columnIndex); |
| 125 | + return getArrayInternal((String) obj); |
120 | 126 | }
|
121 | 127 |
|
122 | 128 | @Override
|
@@ -250,4 +256,194 @@ public Date getDate(int columnIndex, TimeZone tz) throws SFException {
|
250 | 256 | private Timestamp getTimestamp(int columnIndex) throws SFException {
|
251 | 257 | return getTimestamp(columnIndex, TimeZone.getDefault());
|
252 | 258 | }
|
| 259 | + |
| 260 | + public Converters getConverters() { |
| 261 | + return converters; |
| 262 | + } |
| 263 | + |
| 264 | + private Object getSqlInput(String input, int columnIndex) throws SFException { |
| 265 | + try { |
| 266 | + JsonNode jsonNode = OBJECT_MAPPER.readTree(input); |
| 267 | + return new JsonSqlInput( |
| 268 | + jsonNode, |
| 269 | + session, |
| 270 | + converters, |
| 271 | + resultSetMetaData.getColumnMetadata().get(columnIndex - 1).getFields()); |
| 272 | + } catch (JsonProcessingException e) { |
| 273 | + throw new SFException(e, ErrorCode.INVALID_STRUCT_DATA); |
| 274 | + } |
| 275 | + } |
| 276 | + |
| 277 | + private SfSqlArray getArrayInternal(String obj) throws SFException { |
| 278 | + try { |
| 279 | + SnowflakeColumnMetadata arrayMetadata = resultSetMetaData.getColumnMetadata().get(0); |
| 280 | + FieldMetadata fieldMetadata = arrayMetadata.getField(1); |
| 281 | + |
| 282 | + int columnSubType = fieldMetadata.getType(); |
| 283 | + int columnType = ColumnTypeHelper.getColumnType(columnSubType, session); |
| 284 | + int scale = fieldMetadata.getScale(); |
| 285 | + |
| 286 | + ArrayNode arrayNode = (ArrayNode) OBJECT_MAPPER.readTree(obj); |
| 287 | + |
| 288 | + Iterator nodeElements = arrayNode.elements(); |
| 289 | + |
| 290 | + switch (columnSubType) { |
| 291 | + case Types.INTEGER: |
| 292 | + case Types.SMALLINT: |
| 293 | + case Types.TINYINT: |
| 294 | + TypeConverter integerConverter = |
| 295 | + value -> converters.getNumberConverter().getInt(value, Types.INTEGER); |
| 296 | + return new SfSqlArray( |
| 297 | + columnSubType, getStream(nodeElements, integerConverter).toArray(Integer[]::new)); |
| 298 | + case Types.BIGINT: |
| 299 | + case Types.DECIMAL: |
| 300 | + case Types.NUMERIC: |
| 301 | + TypeConverter bigIntConverter = |
| 302 | + value -> converters.getNumberConverter().getBigInt(value, Types.BIGINT); |
| 303 | + return new SfSqlArray( |
| 304 | + columnSubType, convertToNumericArray(nodeElements, bigIntConverter)); |
| 305 | + case Types.CHAR: |
| 306 | + case Types.VARCHAR: |
| 307 | + case Types.LONGNVARCHAR: |
| 308 | + TypeConverter varcharConverter = value -> value.toString(); |
| 309 | + return new SfSqlArray( |
| 310 | + columnSubType, getStream(nodeElements, varcharConverter).toArray(String[]::new)); |
| 311 | + case Types.BINARY: |
| 312 | + TypeConverter bytesConverter = |
| 313 | + value -> |
| 314 | + converters.getBytesConverter().getBytes(value, columnType, Types.BINARY, scale); |
| 315 | + return new SfSqlArray( |
| 316 | + columnSubType, getStream(nodeElements, bytesConverter).toArray(Object[]::new)); |
| 317 | + case Types.FLOAT: |
| 318 | + case Types.DOUBLE: |
| 319 | + TypeConverter doubleConverter = |
| 320 | + value -> converters.getNumberConverter().getDouble(value, Types.DOUBLE); |
| 321 | + return new SfSqlArray( |
| 322 | + columnSubType, getStream(nodeElements, doubleConverter).toArray(Double[]::new)); |
| 323 | + case Types.DATE: |
| 324 | + TypeConverter dateConverter = |
| 325 | + value -> { |
| 326 | + SnowflakeDateTimeFormat formatter = |
| 327 | + SnowflakeDateTimeFormat.fromSqlFormat( |
| 328 | + (String) session.getCommonParameters().get("DATE_OUTPUT_FORMAT")); |
| 329 | + SFTimestamp timestamp = formatter.parse((String) value); |
| 330 | + return Date.valueOf( |
| 331 | + Instant.ofEpochMilli(timestamp.getTime()).atZone(ZoneOffset.UTC).toLocalDate()); |
| 332 | + }; |
| 333 | + return new SfSqlArray( |
| 334 | + columnSubType, getStream(nodeElements, dateConverter).toArray(Date[]::new)); |
| 335 | + case Types.TIME: |
| 336 | + TypeConverter timeConverter = |
| 337 | + value -> { |
| 338 | + SnowflakeDateTimeFormat formatter = |
| 339 | + SnowflakeDateTimeFormat.fromSqlFormat( |
| 340 | + (String) session.getCommonParameters().get("TIME_OUTPUT_FORMAT")); |
| 341 | + SFTimestamp timestamp = formatter.parse((String) value); |
| 342 | + return Time.valueOf( |
| 343 | + Instant.ofEpochMilli(timestamp.getTime()).atZone(ZoneOffset.UTC).toLocalTime()); |
| 344 | + }; |
| 345 | + return new SfSqlArray( |
| 346 | + columnSubType, getStream(nodeElements, timeConverter).toArray(Time[]::new)); |
| 347 | + case Types.TIMESTAMP: |
| 348 | + TypeConverter timestampConverter = |
| 349 | + value -> { |
| 350 | + Timestamp result = getTimestampFromType(columnSubType, (String) value, session); |
| 351 | + if (result != null) { |
| 352 | + return result; |
| 353 | + } |
| 354 | + return converters |
| 355 | + .getDateTimeConverter() |
| 356 | + .getTimestamp(value, columnType, columnSubType, null, scale); |
| 357 | + }; |
| 358 | + return new SfSqlArray( |
| 359 | + columnSubType, getStream(nodeElements, timestampConverter).toArray(Timestamp[]::new)); |
| 360 | + case Types.BOOLEAN: |
| 361 | + TypeConverter booleanConverter = |
| 362 | + value -> converters.getBooleanConverter().getBoolean(value, columnType); |
| 363 | + return new SfSqlArray( |
| 364 | + columnSubType, getStream(nodeElements, booleanConverter).toArray(Boolean[]::new)); |
| 365 | + case Types.STRUCT: |
| 366 | + TypeConverter structConverter = |
| 367 | + value -> { |
| 368 | + try { |
| 369 | + return OBJECT_MAPPER.readValue(value, Map.class); |
| 370 | + } catch (JsonProcessingException e) { |
| 371 | + throw new SFException(e, ErrorCode.INVALID_STRUCT_DATA); |
| 372 | + } |
| 373 | + }; |
| 374 | + return new SfSqlArray( |
| 375 | + columnSubType, getStream(nodeElements, structConverter).toArray(Map[]::new)); |
| 376 | + case Types.ARRAY: |
| 377 | + TypeConverter arrayConverter = |
| 378 | + value -> { |
| 379 | + try { |
| 380 | + return OBJECT_MAPPER.readValue(value, HashMap[].class); |
| 381 | + } catch (JsonProcessingException e) { |
| 382 | + throw new SFException(e, ErrorCode.INVALID_STRUCT_DATA); |
| 383 | + } |
| 384 | + }; |
| 385 | + return new SfSqlArray( |
| 386 | + columnSubType, getStream(nodeElements, arrayConverter).toArray(Map[][]::new)); |
| 387 | + default: |
| 388 | + return null; |
| 389 | + } |
| 390 | + } catch (JsonProcessingException e) { |
| 391 | + throw new SFException(e, ErrorCode.INVALID_STRUCT_DATA); |
| 392 | + } |
| 393 | + } |
| 394 | + |
| 395 | + private Object[] convertToNumericArray(Iterator nodeElements, TypeConverter bigIntConverter) { |
| 396 | + AtomicInteger bigDecimalCount = new AtomicInteger(); |
| 397 | + Object[] elements = |
| 398 | + getStream(nodeElements, bigIntConverter) |
| 399 | + .map( |
| 400 | + elem -> { |
| 401 | + if (elem instanceof BigDecimal) { |
| 402 | + bigDecimalCount.incrementAndGet(); |
| 403 | + } |
| 404 | + return elem; |
| 405 | + }) |
| 406 | + .toArray( |
| 407 | + size -> { |
| 408 | + boolean shouldbbeReturnAsBigDecimal = bigDecimalCount.get() > 0; |
| 409 | + Class<?> returnedClass = |
| 410 | + shouldbbeReturnAsBigDecimal ? BigDecimal.class : Long.class; |
| 411 | + return java.lang.reflect.Array.newInstance(returnedClass, size); |
| 412 | + }); |
| 413 | + return elements; |
| 414 | + } |
| 415 | + |
| 416 | + private Stream getStream(Iterator nodeElements, TypeConverter converter) { |
| 417 | + return StreamSupport.stream( |
| 418 | + Spliterators.spliteratorUnknownSize(nodeElements, Spliterator.ORDERED), false) |
| 419 | + .map( |
| 420 | + elem -> { |
| 421 | + try { |
| 422 | + return convert(converter, (JsonNode) elem); |
| 423 | + } catch (SFException e) { |
| 424 | + throw new RuntimeException(e); |
| 425 | + } |
| 426 | + }); |
| 427 | + } |
| 428 | + |
| 429 | + private static Object convert(TypeConverter converter, JsonNode elem) throws SFException { |
| 430 | + JsonNode node = elem; |
| 431 | + if (node.isValueNode()) { |
| 432 | + return converter.convert(node.asText()); |
| 433 | + } else { |
| 434 | + return converter.convert(node.toString()); |
| 435 | + } |
| 436 | + } |
| 437 | + |
| 438 | + /** |
| 439 | + * Sometimes large BIGINTS overflow the java Long type. In these cases, return a BigDecimal type |
| 440 | + * instead. |
| 441 | + * |
| 442 | + * @param columnIndex the column index |
| 443 | + * @return an object of type long or BigDecimal depending on number size |
| 444 | + * @throws SFException |
| 445 | + */ |
| 446 | + private Object getBigInt(int columnIndex, Object obj) throws SFException { |
| 447 | + return converters.getNumberConverter().getBigInt(obj, columnIndex); |
| 448 | + } |
253 | 449 | }
|
0 commit comments