|
| 1 | +/* |
| 2 | + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. |
| 3 | + */ |
| 4 | + |
| 5 | +package io.airbyte.integrations.destination.singlestore; |
| 6 | + |
| 7 | +import static io.airbyte.cdk.db.jdbc.JdbcUtils.parseJdbcParameters; |
| 8 | + |
| 9 | +import com.fasterxml.jackson.databind.JsonNode; |
| 10 | +import com.google.common.collect.ImmutableMap; |
| 11 | +import io.airbyte.cdk.db.factory.DataSourceFactory; |
| 12 | +import io.airbyte.cdk.db.factory.DatabaseDriver; |
| 13 | +import io.airbyte.cdk.db.jdbc.JdbcUtils; |
| 14 | +import io.airbyte.commons.json.Jsons; |
| 15 | +import io.airbyte.commons.map.MoreMaps; |
| 16 | +import java.time.Duration; |
| 17 | +import java.util.Map; |
| 18 | +import javax.sql.DataSource; |
| 19 | +import org.jetbrains.annotations.NotNull; |
| 20 | + |
| 21 | +public final class SingleStoreConnectorFactory { |
| 22 | + |
| 23 | + private static final String SINGLE_STORE_METRIC_NAME = "Airbyte Destination Connector"; |
| 24 | + static final Map<String, String> DEFAULT_JDBC_PARAMETERS = ImmutableMap.of("allowLocalInfile", "true"); |
| 25 | + static final Map<String, String> DEFAULT_SSL_JDBC_PARAMETERS = MoreMaps.merge(ImmutableMap.of("sslMode", "trust"), DEFAULT_JDBC_PARAMETERS); |
| 26 | + |
| 27 | + private SingleStoreConnectorFactory() {} |
| 28 | + |
| 29 | + public static DataSource createDataSource(JsonNode config) { |
| 30 | + final String jdbcUrl = String.format("jdbc:singlestore://%s:%s/%s?_connector_name=%s", config.get(JdbcUtils.HOST_KEY).asText(), |
| 31 | + config.get(JdbcUtils.PORT_KEY).asText(), config.get(JdbcUtils.DATABASE_KEY).asText(), SINGLE_STORE_METRIC_NAME); |
| 32 | + final ImmutableMap.Builder<Object, Object> configBuilder = |
| 33 | + ImmutableMap.builder().put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText()).put(JdbcUtils.JDBC_URL_KEY, jdbcUrl); |
| 34 | + if (config.has(JdbcUtils.PASSWORD_KEY)) { |
| 35 | + configBuilder.put(JdbcUtils.PASSWORD_KEY, config.get(JdbcUtils.PASSWORD_KEY).asText()); |
| 36 | + } |
| 37 | + if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { |
| 38 | + configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config.get(JdbcUtils.JDBC_URL_PARAMS_KEY)); |
| 39 | + } |
| 40 | + var jdbcConfig = Jsons.jsonNode(configBuilder.build()); |
| 41 | + var connectionProperties = MoreMaps.merge(parseJdbcParameters(config, JdbcUtils.JDBC_URL_PARAMS_KEY), getDefaultConnectionProperties(config)); |
| 42 | + var builder = |
| 43 | + new DataSourceFactory.DataSourceBuilder( |
| 44 | + jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText(), |
| 45 | + jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null, |
| 46 | + DatabaseDriver.SINGLESTORE.getDriverClassName(), |
| 47 | + jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText()) |
| 48 | + .withConnectionProperties(connectionProperties); |
| 49 | + if (connectionProperties.get("connectTimeout") != null) { |
| 50 | + builder.withConnectionTimeout(Duration.ofMillis(Long.parseLong(connectionProperties.get("connectTimeout")))); |
| 51 | + } |
| 52 | + return modifyDataSourceBuilder(builder).build(); |
| 53 | + } |
| 54 | + |
| 55 | + private static Map<String, String> getDefaultConnectionProperties(@NotNull JsonNode config) { |
| 56 | + if (JdbcUtils.useSsl(config)) { |
| 57 | + return DEFAULT_SSL_JDBC_PARAMETERS; |
| 58 | + } else { |
| 59 | + return DEFAULT_JDBC_PARAMETERS; |
| 60 | + } |
| 61 | + } |
| 62 | + |
| 63 | + private static DataSourceFactory.DataSourceBuilder modifyDataSourceBuilder(@NotNull DataSourceFactory.DataSourceBuilder builder) { |
| 64 | + return builder.withConnectionTimeout(Duration.ofSeconds(60)) |
| 65 | + .withConnectionInitSql(""" |
| 66 | + CREATE OR REPLACE FUNCTION can_cast(v VARCHAR(254), t VARCHAR(30)) RETURNS BOOL AS |
| 67 | + DECLARE |
| 68 | + v_pat VARCHAR(255) = CONCAT(v, "%"); |
| 69 | + BEGIN |
| 70 | + IF v is NULL OR t = 'varchar' THEN |
| 71 | + RETURN TRUE; |
| 72 | + ELSIF t = 'bigint' THEN |
| 73 | + RETURN v !:> BIGINT !:> VARCHAR(255) = REPLACE(v, ' ', ''); |
| 74 | + ELSIF t = 'date' THEN |
| 75 | + RETURN v !:> DATE !:> VARCHAR(255) = REPLACE(v, ' ', ''); |
| 76 | + ELSIF t = 'timestamp' THEN |
| 77 | + RETURN v !:> TIMESTAMP(6) !:> VARCHAR(255) LIKE REGEXP_REPLACE(REPLACE(v_pat, 'T', ' '), 'z|Z', ''); |
| 78 | + ELSIF t = 'time' THEN |
| 79 | + RETURN v !:> TIME(6) !:> VARCHAR(255) LIKE v_pat; |
| 80 | + ELSIF t = 'json' THEN |
| 81 | + RETURN (v:> VARCHAR(255) = '') OR (v !:> JSON IS NOT NULL); |
| 82 | + ELSIF t = 'decimal' THEN |
| 83 | + RETURN (v !:> DECIMAL(38, 9) !:> VARCHAR(255)) LIKE v_pat; |
| 84 | + ELSIF t = 'boolean' THEN |
| 85 | + RETURN UCASE(v) = 'TRUE' OR UCASE(v) = 'FALSE'; |
| 86 | + ELSE |
| 87 | + RETURN FALSE; |
| 88 | + END IF; |
| 89 | + END |
| 90 | + """); |
| 91 | + } |
| 92 | + |
| 93 | +} |
0 commit comments