|
17 | 17 |
|
18 | 18 | import com.fasterxml.jackson.databind.JsonNode;
|
19 | 19 | import com.fasterxml.jackson.databind.node.ObjectNode;
|
| 20 | +import com.google.common.annotations.VisibleForTesting; |
20 | 21 | import com.google.common.base.Preconditions;
|
21 | 22 | import com.google.common.collect.ImmutableList;
|
22 | 23 | import com.google.common.collect.ImmutableMap;
|
|
42 | 43 | import io.airbyte.cdk.integrations.source.relationaldb.state.StateManagerFactory;
|
43 | 44 | import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
|
44 | 45 | import io.airbyte.commons.exceptions.ConfigErrorException;
|
| 46 | +import io.airbyte.commons.features.EnvVariableFeatureFlags; |
| 47 | +import io.airbyte.commons.features.FeatureFlags; |
45 | 48 | import io.airbyte.commons.functional.CheckedConsumer;
|
46 | 49 | import io.airbyte.commons.json.Jsons;
|
47 | 50 | import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
|
@@ -102,13 +105,26 @@ SELECT CASE WHEN (SELECT TOP 1 1 FROM "%s"."%s" WHERE "%s" IS NULL)=1 then 1 els
|
102 | 105 | private MssqlInitialLoadStateManager initialLoadStateManager = null;
|
103 | 106 | public static final String JDBC_DELIMITER = ";";
|
104 | 107 | private List<String> schemas;
|
| 108 | + private int stateEmissionFrequency; |
| 109 | + private final FeatureFlags featureFlags; |
105 | 110 |
|
106 | 111 | public static Source sshWrappedSource(final MssqlSource source) {
|
107 | 112 | return new SshWrappedSource(source, JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
|
108 | 113 | }
|
109 | 114 |
|
110 | 115 | public MssqlSource() {
|
| 116 | + this(new EnvVariableFeatureFlags()); |
| 117 | + } |
| 118 | + |
| 119 | + public MssqlSource(final FeatureFlags featureFlags) { |
111 | 120 | super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new MssqlSourceOperations());
|
| 121 | + this.featureFlags = featureFlags; |
| 122 | + this.stateEmissionFrequency = INTERMEDIATE_STATE_EMISSION_FREQUENCY; |
| 123 | + } |
| 124 | + |
| 125 | + @Override |
| 126 | + public FeatureFlags getFeatureFlags() { |
| 127 | + return featureFlags; |
112 | 128 | }
|
113 | 129 |
|
114 | 130 | @Override
|
@@ -444,7 +460,12 @@ protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws S
|
444 | 460 |
|
445 | 461 | @Override
|
446 | 462 | protected int getStateEmissionFrequency() {
|
447 |
| - return INTERMEDIATE_STATE_EMISSION_FREQUENCY; |
| 463 | + return this.stateEmissionFrequency; |
| 464 | + } |
| 465 | + |
| 466 | + @VisibleForTesting |
| 467 | + protected void setStateEmissionFrequencyForDebug(final int stateEmissionFrequency) { |
| 468 | + this.stateEmissionFrequency = stateEmissionFrequency; |
448 | 469 | }
|
449 | 470 |
|
450 | 471 | @Override
|
@@ -517,7 +538,7 @@ private void readSsl(final JsonNode sslMethod, final List<String> additionalPara
|
517 | 538 |
|
518 | 539 | if (config.has("certificate")) {
|
519 | 540 | String certificate = config.get("certificate").asText();
|
520 |
| - String password = RandomStringUtils.randomAlphanumeric(100); |
| 541 | + String password = RandomStringUtils.secure().nextAlphanumeric(100); |
521 | 542 | final URI keyStoreUri;
|
522 | 543 | try {
|
523 | 544 | keyStoreUri = SSLCertificateUtils.keyStoreFromCertificate(certificate, password, null, null);
|
|
0 commit comments