Skip to content

Commit c564fd8

Browse files
edgaogisripa
authored andcommitted
add airbyte_meta in raw table creation
also add migrations to jdbc destinations write airbyte_meta in staging
1 parent d6454f8 commit c564fd8

File tree

12 files changed

+61
-26
lines changed

12 files changed

+61
-26
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,14 @@ protected BaseSerializedBuffer(final BufferStorage bufferStorage) throws Excepti
6565
* TODO: (ryankfu) move destination to use serialized record string instead of passing entire
6666
* AirbyteRecord
6767
*
68-
* @param recordString serialized record
69-
* @param emittedAt timestamp of the record in milliseconds
68+
* @param recordString serialized record
69+
* @param airbyteMetaString
70+
* @param emittedAt timestamp of the record in milliseconds
7071
* @throws IOException
7172
*/
72-
protected void writeRecord(final String recordString, final long emittedAt) throws IOException {
73+
protected void writeRecord(final String recordString, String airbyteMetaString, final long emittedAt) throws IOException {
74+
// TODO Why are we deserializing as an airbyte record message? recordString should just be a naked data blob.
75+
// is this code ever actually called? do we always override it? can we make this method abstract?
7376
writeRecord(Jsons.deserialize(recordString, AirbyteRecordMessage.class).withEmittedAt(emittedAt));
7477
}
7578

@@ -111,7 +114,7 @@ public long accept(final AirbyteRecordMessage record) throws Exception {
111114
}
112115

113116
@Override
114-
public long accept(final String recordString, final long emittedAt) throws Exception {
117+
public long accept(final String recordString, final String airbyteMetaString, final long emittedAt) throws Exception {
115118
if (!isStarted) {
116119
if (useCompression) {
117120
compressedBuffer = new GzipCompressorOutputStream(byteCounter);
@@ -123,7 +126,7 @@ public long accept(final String recordString, final long emittedAt) throws Excep
123126
}
124127
if (inputStream == null && !isClosed) {
125128
final long startCount = byteCounter.getCount();
126-
writeRecord(recordString, emittedAt);
129+
writeRecord(recordString, airbyteMetaString, emittedAt);
127130
return byteCounter.getCount() - startCount;
128131
} else {
129132
throw new IllegalCallerException("Buffer is already closed, it cannot accept more messages");

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ public interface SerializableBuffer extends AutoCloseable {
4343
* the entire AirbyteRecordMessage
4444
*
4545
* @param recordString serialized record
46+
* @param airbyteMetaString The serialized airbyte_meta entry
4647
* @param emittedAt timestamp of the record in milliseconds
4748
* @return number of bytes written to the buffer
4849
* @throws Exception
4950
*/
50-
long accept(String recordString, long emittedAt) throws Exception;
51+
long accept(String recordString, String airbyteMetaString, long emittedAt) throws Exception;
5152

5253
/**
5354
* Flush a buffer implementation.

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/partial_messages/PartialAirbyteRecordMessage.java

+8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.fasterxml.jackson.annotation.JsonProperty;
88
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
99
import com.fasterxml.jackson.databind.JsonNode;
10+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta;
1011
import io.airbyte.protocol.models.v0.StreamDescriptor;
1112
import java.util.Objects;
1213

@@ -27,6 +28,9 @@ public class PartialAirbyteRecordMessage {
2728
@JsonPropertyDescription("when the data was emitted from the source. epoch in millisecond.")
2829
private long emittedAt;
2930

31+
@JsonProperty("meta")
32+
private AirbyteRecordMessageMeta meta;
33+
3034
public PartialAirbyteRecordMessage() {}
3135

3236
@JsonProperty("namespace")
@@ -89,6 +93,10 @@ public PartialAirbyteRecordMessage withEmittedAt(final Long emittedAt) {
8993
return this;
9094
}
9195

96+
public AirbyteRecordMessageMeta getMeta() {
97+
return meta;
98+
}
99+
92100
@Override
93101
public boolean equals(final Object o) {
94102
if (this == o) {

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java

+21-7
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
3737
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
3838
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
39+
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
3940
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
41+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
4042
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
4143
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
4244
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
@@ -54,7 +56,8 @@
5456
import org.slf4j.Logger;
5557
import org.slf4j.LoggerFactory;
5658

57-
public abstract class AbstractJdbcDestination extends JdbcConnector implements Destination {
59+
public abstract class AbstractJdbcDestination<DestinationState extends MinimumDestinationState>
60+
extends JdbcConnector implements Destination {
5861

5962
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class);
6063

@@ -254,9 +257,19 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri
254257

255258
protected abstract JdbcSqlGenerator getSqlGenerator();
256259

257-
protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
258-
final JdbcDatabase database,
259-
final String rawTableSchema);
260+
protected abstract JdbcDestinationHandler<DestinationState> getDestinationHandler(final String databaseName,
261+
final JdbcDatabase database,
262+
final String rawTableSchema);
263+
264+
/**
265+
* Provide any migrations that the destination needs to run. Most destinations will need to provide an instande of
266+
* {@link io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator} at minimum.
267+
*/
268+
protected abstract List<Migration<DestinationState>> getMigrations(
269+
final JdbcDatabase database,
270+
final String databaseName,
271+
final SqlGenerator sqlGenerator,
272+
final DestinationHandler<DestinationState> destinationHandler);
260273

261274
/**
262275
* "database" key at root of the config json, for any other variants in config, override this
@@ -321,15 +334,16 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
321334
final String databaseName = getDatabaseName(config);
322335
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
323336
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
324-
final DestinationHandler<? extends MinimumDestinationState> destinationHandler =
337+
final DestinationHandler<DestinationState> destinationHandler =
325338
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
326339
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
327340
final TyperDeduper typerDeduper;
341+
List<Migration<DestinationState>> migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler);
328342
if (disableTypeDedupe) {
329-
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
343+
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations);
330344
} else {
331345
typerDeduper =
332-
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
346+
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations);
333347
}
334348
return typerDeduper;
335349
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/JdbcSqlOperations.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,18 @@ protected String createTableQueryV2(final String schemaName, final String tableN
115115
CREATE TABLE IF NOT EXISTS %s.%s (
116116
%s VARCHAR PRIMARY KEY,
117117
%s JSONB,
118+
%s JSONB,
118119
%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
119120
%s TIMESTAMP WITH TIME ZONE DEFAULT NULL
120121
);
121122
""",
122-
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA,
123-
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT);
123+
schemaName,
124+
tableName,
125+
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
126+
JavaBaseConstants.COLUMN_NAME_DATA,
127+
JavaBaseConstants.COLUMN_NAME_AB_META,
128+
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
129+
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT);
124130
}
125131

126132
// TODO: This method seems to be used by Postgres and others while staging to local temp files.
@@ -133,9 +139,10 @@ protected void writeBatchToFile(final File tmpFile, final List<PartialAirbyteMes
133139
// TODO we only need to do this is formatData is overridden. If not, we can just do jsonData =
134140
// record.getSerialized()
135141
final var jsonData = Jsons.serialize(formatData(Jsons.deserializeExact(record.getSerialized())));
142+
final var airbyteMeta = Jsons.serialize(record.getRecord().getMeta());
136143
final var extractedAt = Timestamp.from(Instant.ofEpochMilli(record.getRecord().getEmittedAt()));
137144
if (TypingAndDedupingFlag.isDestinationV2()) {
138-
csvPrinter.printRecord(uuid, jsonData, extractedAt, null);
145+
csvPrinter.printRecord(uuid, jsonData, airbyteMeta, extractedAt, null);
139146
} else {
140147
csvPrinter.printRecord(uuid, jsonData, extractedAt);
141148
}

airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ dependencies {
149149
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
150150
api 'com.google.guava:guava:33.0.0-jre'
151151
api 'commons-io:commons-io:2.15.1'
152-
api ('io.airbyte.airbyte-protocol:protocol-models:0.5.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
152+
api ('io.airbyte.airbyte-protocol:protocol-models:0.6.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
153153
api 'javax.annotation:javax.annotation-api:1.3.2'
154154
api 'org.apache.commons:commons-compress:1.25.0'
155155
api 'org.apache.commons:commons-lang3:3.14.0'

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ public List<Object> getDataRow(final JsonNode formattedData) {
2929
return new LinkedList<>(getRecordColumns(formattedData));
3030
}
3131

32-
public List<Object> getDataRow(final UUID id, final String formattedString, final long emittedAt) {
32+
@Override
33+
public List<Object> getDataRow(final UUID id, final String formattedString, final String airbyteMetaString, final long emittedAt) {
3334
throw new UnsupportedOperationException("Not implemented in BaseSheetGenerator");
3435
}
3536

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ protected void writeRecord(final AirbyteRecordMessage record) throws IOException
7272
}
7373

7474
@Override
75-
protected void writeRecord(final String recordString, final long emittedAt) throws IOException {
76-
csvPrinter.printRecord(csvSheetGenerator.getDataRow(UUID.randomUUID(), recordString, emittedAt));
75+
protected void writeRecord(final String recordString, String airbyteMetaString, final long emittedAt) throws IOException {
76+
csvPrinter.printRecord(csvSheetGenerator.getDataRow(UUID.randomUUID(), recordString, airbyteMetaString, emittedAt));
7777
}
7878

7979
@Override

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public interface CsvSheetGenerator {
2424

2525
List<Object> getDataRow(JsonNode formattedData);
2626

27-
List<Object> getDataRow(UUID id, String formattedString, long emittedAt);
27+
List<Object> getDataRow(UUID id, String formattedString, String formattedAirbyteMetaString, long emittedAt);
2828

2929
final class Factory {
3030

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public List<String> getHeaderRow() {
4848

4949
@Override
5050
public List<Object> getDataRow(final UUID id, final AirbyteRecordMessage recordMessage) {
51-
return getDataRow(id, Jsons.serialize(recordMessage.getData()), recordMessage.getEmittedAt());
51+
return getDataRow(id, Jsons.serialize(recordMessage.getData()), Jsons.serialize(recordMessage.getMeta()), recordMessage.getEmittedAt());
5252
}
5353

5454
@Override
@@ -57,13 +57,14 @@ public List<Object> getDataRow(final JsonNode formattedData) {
5757
}
5858

5959
@Override
60-
public List<Object> getDataRow(final UUID id, final String formattedString, final long emittedAt) {
60+
public List<Object> getDataRow(final UUID id, final String formattedString, String formattedAirbyteMetaString, final long emittedAt) {
6161
if (useDestinationsV2Columns) {
6262
return List.of(
6363
id,
6464
Instant.ofEpochMilli(emittedAt),
6565
"",
66-
formattedString);
66+
formattedString,
67+
formattedAirbyteMetaString);
6768
} else {
6869
return List.of(
6970
id,

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public long accept(final AirbyteRecordMessage record) throws Exception {
102102
}
103103

104104
@Override
105-
public long accept(final String recordString, final long emittedAt) throws Exception {
105+
public long accept(final String recordString, final String airbyteMetaString, final long emittedAt) throws Exception {
106106
throw new UnsupportedOperationException("This method is not supported for ParquetSerializedBuffer");
107107
}
108108

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/java/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag
7575
// todo (cgardens) - most writers just go ahead and re-serialize the contents of the record message.
7676
// we should either just pass the raw string or at least have a way to do that and create a default
7777
// impl that maintains backwards compatible behavior.
78-
writer.accept(record.getSerialized(), record.getRecord().getEmittedAt());
78+
writer.accept(record.getSerialized(), Jsons.serialize(record.getRecord().getMeta()), record.getRecord().getEmittedAt());
7979
} catch (final Exception e) {
8080
throw new RuntimeException(e);
8181
}

0 commit comments

Comments
 (0)