Skip to content

Commit d7900a6

Browse files
use lists instead of streams in TestDatabase
1 parent c75bae8 commit d7900a6

File tree

5 files changed

+42
-41
lines changed

5 files changed

+42
-41
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java

+11-16
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
27-
import java.util.stream.Stream;
2827
import javax.sql.DataSource;
2928
import org.jooq.DSLContext;
3029
import org.jooq.SQLDialect;
30+
import org.jooq.exception.DataAccessException;
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333
import org.testcontainers.containers.JdbcDatabaseContainer;
@@ -87,7 +87,7 @@ public T onClose(String fmtSql, Object... fmtArgs) {
8787
* Executes a SQL statement after calling String.format on the arguments.
8888
*/
8989
public T with(String fmtSql, Object... fmtArgs) {
90-
execSQL(Stream.of(String.format(fmtSql, fmtArgs)));
90+
execSQL(List.of(String.format(fmtSql, fmtArgs)));
9191
return self();
9292
}
9393

@@ -113,9 +113,9 @@ final public boolean isInitialized() {
113113
return dslContext != null;
114114
}
115115

116-
abstract protected Stream<Stream<String>> inContainerBootstrapCmd();
116+
abstract protected List<List<String>> inContainerBootstrapCmd();
117117

118-
abstract protected Stream<String> inContainerUndoBootstrapCmd();
118+
abstract protected List<String> inContainerUndoBootstrapCmd();
119119

120120
abstract public DatabaseDriver getDatabaseDriver();
121121

@@ -167,22 +167,17 @@ public Database getDatabase() {
167167
return new Database(getDslContext());
168168
}
169169

170-
protected void execSQL(final Stream<String> sql) {
170+
protected void execSQL(final List<String> sqls) {
171171
try {
172-
getDatabase().query(ctx -> {
173-
sql.forEach(statement -> {
174-
LOGGER.debug("{}", statement);
175-
ctx.execute(statement);
176-
});
177-
return null;
178-
});
179-
} catch (SQLException e) {
172+
for (String sql : sqls) {
173+
getDslContext().execute(sql);
174+
}
175+
} catch (DataAccessException e) {
180176
throw new RuntimeException(e);
181177
}
182178
}
183179

184-
protected void execInContainer(Stream<String> cmds) {
185-
final List<String> cmd = cmds.toList();
180+
protected void execInContainer(List<String> cmd) {
186181
if (cmd.isEmpty()) {
187182
return;
188183
}
@@ -232,7 +227,7 @@ public B integrationTestConfigBuilder() {
232227

233228
@Override
234229
public void close() {
235-
execSQL(this.cleanupSQL.stream());
230+
execSQL(this.cleanupSQL);
236231
execInContainer(inContainerUndoBootstrapCmd());
237232
}
238233

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020
import io.airbyte.commons.json.Jsons;
2121
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
2222
import java.sql.JDBCType;
23+
import java.util.Arrays;
24+
import java.util.Collections;
2325
import java.util.List;
2426
import java.util.Map;
2527
import java.util.Set;
26-
import java.util.stream.Stream;
2728
import org.jooq.SQLDialect;
2829
import org.junit.jupiter.api.AfterAll;
2930
import org.junit.jupiter.api.BeforeAll;
@@ -140,24 +141,27 @@ public BareBonesTestDatabase(PostgreSQLContainer<?> container) {
140141
}
141142

142143
@Override
143-
protected Stream<Stream<String>> inContainerBootstrapCmd() {
144-
final var sql = Stream.of(
144+
protected List<List<String>> inContainerBootstrapCmd() {
145+
final var sqls = List.of(
145146
String.format("CREATE DATABASE %s", getDatabaseName()),
146147
String.format("CREATE USER %s PASSWORD '%s'", getUserName(), getPassword()),
147148
String.format("GRANT ALL PRIVILEGES ON DATABASE %s TO %s", getDatabaseName(), getUserName()),
148149
String.format("ALTER USER %s WITH SUPERUSER", getUserName()));
149-
return Stream.of(Stream.concat(
150-
Stream.of("psql",
151-
"-d", getContainer().getDatabaseName(),
152-
"-U", getContainer().getUsername(),
153-
"-v", "ON_ERROR_STOP=1",
154-
"-a"),
155-
sql.flatMap(stmt -> Stream.of("-c", stmt))));
150+
List<String> cmd = Arrays.asList("psql",
151+
"-d", getContainer().getDatabaseName(),
152+
"-U", getContainer().getUsername(),
153+
"-v", "ON_ERROR_STOP=1",
154+
"-a");
155+
for (String sql : sqls) {
156+
cmd.add("-c");
157+
cmd.add(sql);
158+
}
159+
return List.of(cmd);
156160
}
157161

158162
@Override
159-
protected Stream<String> inContainerUndoBootstrapCmd() {
160-
return Stream.empty();
163+
protected List<String> inContainerUndoBootstrapCmd() {
164+
return Collections.emptyList();
161165
}
162166

163167
@Override

airbyte-integrations/connectors/source-mssql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.19.0'
6+
cdkVersionRequired = '0.19.1'
77
features = ['db-sources']
88
useLocalCdk = true
99
}

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/cdc/MssqlDebeziumStateUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public JsonNode constructInitialDebeziumState(final Properties properties,
125125
assert Objects.nonNull(schemaHistory.schema());
126126

127127
final JsonNode asJson = serialize(offset, schemaHistory);
128-
LOGGER.info("Initial Debezium state constructed: {}", asJson);
128+
//LOGGER.info("Initial Debezium state constructed: {}", asJson);
129129

130130
if (asJson.get(MssqlCdcStateConstants.MSSQL_DB_HISTORY).asText().isBlank()) {
131131
throw new RuntimeException("Schema history snapshot returned empty history.");

airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java

+13-11
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import java.io.IOException;
1212
import java.io.UncheckedIOException;
1313
import java.sql.SQLException;
14+
import java.util.Arrays;
15+
import java.util.Collections;
1416
import java.util.HashMap;
1517
import java.util.List;
1618
import java.util.Map;
17-
import java.util.stream.Collectors;
1819
import java.util.stream.Stream;
20+
import org.apache.commons.lang3.StringUtils;
1921
import org.jooq.SQLDialect;
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
@@ -157,10 +159,10 @@ public String getJdbcUrl() {
157159
}
158160

159161
@Override
160-
protected Stream<Stream<String>> inContainerBootstrapCmd() {
161-
return Stream.of(
162-
mssqlCmd(Stream.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
163-
mssqlCmd(Stream.of(
162+
protected List<List<String>> inContainerBootstrapCmd() {
163+
return List.of(
164+
mssqlCmd(List.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
165+
mssqlCmd(List.of(
164166
String.format("USE %s", getDatabaseName()),
165167
String.format("CREATE LOGIN %s WITH PASSWORD = '%s', DEFAULT_DATABASE = %s", getUserName(), getPassword(), getDatabaseName()),
166168
String.format("ALTER SERVER ROLE [sysadmin] ADD MEMBER %s", getUserName()),
@@ -174,22 +176,22 @@ protected Stream<Stream<String>> inContainerBootstrapCmd() {
174176
* aren't really worth it.
175177
*/
176178
@Override
177-
protected Stream<String> inContainerUndoBootstrapCmd() {
178-
return Stream.empty();
179+
protected List<String> inContainerUndoBootstrapCmd() {
180+
return Collections.emptyList();
179181
}
180182

181183
public void dropDatabaseAndUser() {
182-
execInContainer(mssqlCmd(Stream.of(
184+
execInContainer(mssqlCmd(List.of(
183185
String.format("USE master"),
184186
String.format("ALTER DATABASE %s SET single_user WITH ROLLBACK IMMEDIATE", getDatabaseName()),
185187
String.format("DROP DATABASE %s", getDatabaseName()))));
186188
}
187189

188-
public Stream<String> mssqlCmd(final Stream<String> sql) {
189-
return Stream.of("/opt/mssql-tools/bin/sqlcmd",
190+
public List<String> mssqlCmd(final List<String> sql) {
191+
return Arrays.asList("/opt/mssql-tools/bin/sqlcmd",
190192
"-U", getContainer().getUsername(),
191193
"-P", getContainer().getPassword(),
192-
"-Q", sql.collect(Collectors.joining("; ")),
194+
"-Q", StringUtils.join(sql, "; "),
193195
"-b", "-e");
194196
}
195197

0 commit comments

Comments
 (0)