Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3 Data Lake: Refactor value-mapping pipeline #55807

Merged
merged 29 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,20 @@
package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.message.Meta.AirbyteMetaFields

class AirbyteTypeToAirbyteTypeWithMeta(private val flatten: Boolean) {
fun convert(schema: AirbyteType): ObjectType {
val properties =
linkedMapOf(
Meta.COLUMN_NAME_AB_RAW_ID to FieldType(StringType, nullable = false),
Meta.COLUMN_NAME_AB_EXTRACTED_AT to FieldType(IntegerType, nullable = false),
Meta.COLUMN_NAME_AB_RAW_ID to
FieldType(AirbyteMetaFields.RAW_ID.type, nullable = false),
Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(AirbyteMetaFields.EXTRACTED_AT.type, nullable = false),
Meta.COLUMN_NAME_AB_META to
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"sync_id" to FieldType(IntegerType, nullable = false),
"changes" to
FieldType(
nullable = false,
type =
ArrayType(
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"field" to
FieldType(
StringType,
nullable = false
),
"change" to
FieldType(
StringType,
nullable = false
),
"reason" to
FieldType(
StringType,
nullable = false
),
)
)
)
)
)
)
)
),
Meta.COLUMN_NAME_AB_GENERATION_ID to FieldType(IntegerType, nullable = false)
FieldType(AirbyteMetaFields.META.type, nullable = false),
Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(AirbyteMetaFields.GENERATION_ID.type, nullable = false),
)
if (flatten) {
if (schema is ObjectType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.node.NullNode
import io.airbyte.cdk.load.message.Meta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.*
import java.math.BigDecimal
import java.math.BigInteger
import java.time.LocalDate
Expand Down Expand Up @@ -164,14 +163,13 @@ private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
* @property type The type ([AirbyteType]) of the [AirbyteValue]
* @property changes List of [Meta.Change]s that have been applied to this value
* @property name Field name
* @property fieldCategory [FieldCategory] of the field
* @property fieldCategory [AirbyteMetaFields] of the field
*/
data class EnrichedAirbyteValue(
val value: AirbyteValue,
class EnrichedAirbyteValue(
var value: AirbyteValue,
val type: AirbyteType,
val changes: List<Meta.Change> = emptyList(),
val name: String,
val fieldCategory: FieldCategory
val changes: MutableList<Meta.Change> = mutableListOf()
) {
init {
require(name.isNotBlank()) { "Field name cannot be blank" }
Expand All @@ -183,16 +181,11 @@ data class EnrichedAirbyteValue(
* @param reason The [Reason] for nullification, defaults to DESTINATION_SERIALIZATION_ERROR
* @return A new [EnrichedAirbyteValue] with a null value and an additional change record
*/
fun toNullified(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR): EnrichedAirbyteValue {
val nullChange =
Meta.Change(
field = name,
change = AirbyteRecordMessageMetaChange.Change.NULLED,
reason = reason
)
fun nullify(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR) {
val nullChange = Meta.Change(field = name, change = Change.NULLED, reason = reason)

// Return a copy with null value and the new change added to the changes list
return copy(value = NullValue, changes = changes + nullChange)
value = NullValue
changes.add(nullChange)
}

/**
Expand All @@ -202,31 +195,14 @@ data class EnrichedAirbyteValue(
* @param newValue The new (truncated) value to use
* @return A new [EnrichedAirbyteValue] with the truncated value and an additional change record
*/
fun toTruncated(
fun truncate(
reason: Reason = Reason.DESTINATION_RECORD_SIZE_LIMITATION,
newValue: AirbyteValue
): EnrichedAirbyteValue {
val truncateChange =
Meta.Change(
field = name,
change = AirbyteRecordMessageMetaChange.Change.TRUNCATED,
reason = reason
)
) {
val truncateChange = Meta.Change(field = name, change = Change.TRUNCATED, reason = reason)

// Return a copy with null value and the new change added to the changes list
return copy(value = newValue, changes = changes + truncateChange)
value = newValue
changes.add(truncateChange)
}
}

/**
* The [EnrichedAirbyteValue] category allows us to quickly understand if the field is an Airbyte
* controlled field or if it is declared by the source.
*/
enum class FieldCategory {
RAW_ID,
EXTRACTED_AT,
META,
GENERATION_ID,
// For fields that don't match any of the predefined Airbyte columns
CLIENT_DATA
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,39 @@ import java.time.format.DateTimeFormatter
* common-sense conversions among numeric types, as well as upcasting any value to StringValue.
*/
object AirbyteValueCoercer {
fun coerce(value: AirbyteValue, type: AirbyteType): AirbyteValue? {
// Don't modify nulls.
if (value == NullValue) {
return NullValue
}
return try {
when (type) {
BooleanType -> coerceBoolean(value)
DateType -> coerceDate(value)
IntegerType -> coerceInt(value)
NumberType -> coerceNumber(value)
StringType -> coerceString(value)
TimeTypeWithTimezone -> coerceTimeTz(value)
TimeTypeWithoutTimezone -> coerceTimeNtz(value)
TimestampTypeWithTimezone -> coerceTimestampTz(value)
TimestampTypeWithoutTimezone -> coerceTimestampNtz(value)
is ArrayType,
ArrayTypeWithoutSchema -> coerceArray(value)
is ObjectType,
ObjectTypeWithEmptySchema,
ObjectTypeWithoutSchema -> coerceObject(value)

// Don't touch unions, just pass it through
is UnionType -> value
// Similarly, if we don't know what type it's supposed to be,
// leave it unchanged.
is UnknownType -> value
}
} catch (e: Exception) {
null
}
}

fun coerceBoolean(value: AirbyteValue): BooleanValue? = requireType<BooleanValue>(value)

fun coerceInt(value: AirbyteValue): IntegerValue? =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,14 @@ fun DestinationRecordAirbyteValue.dataWithAirbyteMeta(
stream: DestinationStream,
flatten: Boolean = false
) = DestinationRecordToAirbyteValueWithMeta(stream, flatten).convert(data, emittedAtMs, meta)

fun Meta.Change.toAirbyteValue(): ObjectValue =
ObjectValue(
linkedMapOf(
"field" to StringValue(field),
"change" to StringValue(change.name),
"reason" to StringValue(reason.name)
)
)

fun List<Meta.Change>.toAirbyteValues(): List<ObjectValue> = map { it.toAirbyteValue() }
Loading
Loading