Skip to content

Commit 82180ae

Browse files
committed
major implementation
1 parent 66894b7 commit 82180ae

File tree

5 files changed

+172
-131
lines changed

5 files changed

+172
-131
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToAirbyteTypeWithMeta.kt

+15-46
Original file line numberDiff line numberDiff line change
@@ -5,55 +5,10 @@
55
package io.airbyte.cdk.load.data
66

77
import io.airbyte.cdk.load.message.Meta
8+
import io.airbyte.cdk.load.message.Meta.AirbyteMetaFields
89

910
class AirbyteTypeToAirbyteTypeWithMeta(private val flatten: Boolean) {
1011
fun convert(schema: AirbyteType): ObjectType {
11-
val properties =
12-
linkedMapOf(
13-
Meta.COLUMN_NAME_AB_RAW_ID to FieldType(StringType, nullable = false),
14-
Meta.COLUMN_NAME_AB_EXTRACTED_AT to FieldType(IntegerType, nullable = false),
15-
Meta.COLUMN_NAME_AB_META to
16-
FieldType(
17-
nullable = false,
18-
type =
19-
ObjectType(
20-
linkedMapOf(
21-
"sync_id" to FieldType(IntegerType, nullable = false),
22-
"changes" to
23-
FieldType(
24-
nullable = false,
25-
type =
26-
ArrayType(
27-
FieldType(
28-
nullable = false,
29-
type =
30-
ObjectType(
31-
linkedMapOf(
32-
"field" to
33-
FieldType(
34-
StringType,
35-
nullable = false
36-
),
37-
"change" to
38-
FieldType(
39-
StringType,
40-
nullable = false
41-
),
42-
"reason" to
43-
FieldType(
44-
StringType,
45-
nullable = false
46-
),
47-
)
48-
)
49-
)
50-
)
51-
)
52-
)
53-
)
54-
),
55-
Meta.COLUMN_NAME_AB_GENERATION_ID to FieldType(IntegerType, nullable = false)
56-
)
5712
if (flatten) {
5813
if (schema is ObjectType) {
5914
schema.properties.forEach { (name, field) -> properties[name] = field }
@@ -69,6 +24,20 @@ class AirbyteTypeToAirbyteTypeWithMeta(private val flatten: Boolean) {
6924
}
7025
return ObjectType(properties)
7126
}
27+
28+
companion object {
29+
val properties =
30+
linkedMapOf(
31+
Meta.COLUMN_NAME_AB_RAW_ID to
32+
FieldType(AirbyteMetaFields.RAW_ID.type, nullable = false),
33+
Meta.COLUMN_NAME_AB_EXTRACTED_AT to
34+
FieldType(AirbyteMetaFields.EXTRACTED_AT.type, nullable = false),
35+
Meta.COLUMN_NAME_AB_META to
36+
FieldType(AirbyteMetaFields.META.type, nullable = false),
37+
Meta.COLUMN_NAME_AB_GENERATION_ID to
38+
FieldType(AirbyteMetaFields.GENERATION_ID.type, nullable = false),
39+
)
40+
}
7241
}
7342

7443
fun AirbyteType.withAirbyteMeta(flatten: Boolean = false): ObjectType =

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt

+1-15
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,12 @@ private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
163163
* @property type The type ([AirbyteType]) of the [AirbyteValue]
164164
* @property changes List of [Meta.Change]s that have been applied to this value
165165
* @property name Field name
166-
* @property fieldCategory [FieldCategory] of the field
166+
* @property fieldCategory [AirbyteMetaFields] of the field
167167
*/
168168
class EnrichedAirbyteValue(
169169
var value: AirbyteValue,
170170
val type: AirbyteType,
171171
val name: String,
172-
val fieldCategory: FieldCategory,
173172
val changes: MutableList<Meta.Change> = mutableListOf()
174173
) {
175174
init {
@@ -207,16 +206,3 @@ class EnrichedAirbyteValue(
207206
changes.add(truncateChange)
208207
}
209208
}
210-
211-
/**
212-
* The [EnrichedAirbyteValue] category allows us to quickly understand if the field is an Airbyte
213-
* controlled field or if it is declared by the source.
214-
*/
215-
enum class FieldCategory {
216-
RAW_ID,
217-
EXTRACTED_AT,
218-
META,
219-
GENERATION_ID,
220-
// For fields that don't match any of the predefined Airbyte columns
221-
CLIENT_DATA
222-
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt

+11
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,14 @@ fun DestinationRecordAirbyteValue.dataWithAirbyteMeta(
6363
stream: DestinationStream,
6464
flatten: Boolean = false
6565
) = DestinationRecordToAirbyteValueWithMeta(stream, flatten).convert(data, emittedAtMs, meta)
66+
67+
fun Meta.Change.toAirbyteValue(): ObjectValue =
68+
ObjectValue(
69+
linkedMapOf(
70+
"field" to StringValue(field),
71+
"change" to StringValue(change.name),
72+
"reason" to StringValue(reason.name)
73+
)
74+
)
75+
76+
fun List<Meta.Change>.toAirbyteValues(): List<ObjectValue> = map { it.toAirbyteValue() }

0 commit comments

Comments
 (0)