Skip to content

Commit a546642

Browse files
Merge branch 'master' into daryna/source-greenhouce/milliseconds-cursor
2 parents 65456ae + 78aec78 commit a546642

File tree

2,086 files changed

+118875
-89877
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

2,086 files changed

+118875
-89877
lines changed

.github/labeler.yml

-20
This file was deleted.

.github/workflows/assign-issue-to-project.yaml

-34
This file was deleted.

.github/workflows/community_ci.yml

+4-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ jobs:
3737
echo "The fork has changes in protected paths. This is not allowed."
3838
exit 1
3939
40-
connectors_early_ci:
41-
name: Run connectors early CI on fork
40+
connectors_pre_release_checks:
41+
name: Run pre-release checks on fork
4242
if: github.event.pull_request.head.repo.fork == true
4343
needs: fail_on_protected_path_changes
4444
environment: community-ci-auto
@@ -75,7 +75,7 @@ jobs:
7575
with:
7676
context: "pull_request"
7777
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
78-
subcommand: "connectors --modified test --only-step=qa_checks --only-step=version_inc_check --global-status-check-context='Connectors early CI checks' --global-status-check-description='Running early CI checks on connectors'"
78+
subcommand: "connectors --modified test --only-step=qa_checks --only-step=version_inc_check --global-status-check-context='Connector Pre-Release Checks' --global-status-check-description='Running pre-release checks...'"
7979
is_fork: "true"
8080
git_repo_url: ${{ github.event.pull_request.head.repo.clone_url }}
8181
git_branch: ${{ github.head_ref }}
@@ -144,7 +144,7 @@ jobs:
144144
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
145145
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
146146
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
147-
subcommand: "connectors --modified test"
147+
subcommand: "connectors --modified test --skip-step=qa_checks --skip-step=version_inc_check"
148148
is_fork: "true"
149149
- name: Upload pipeline reports
150150
id: upload-artifact

.github/workflows/label-github-issues-by-path.yml

-17
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,19 +0,0 @@
1-
name: Trigger action in cloud based on push
2-
on:
3-
push:
4-
branches:
5-
- master
6-
workflow_dispatch:
7-
8-
jobs:
9-
repo-sync:
10-
name: "Fire a Repo Dispatch event to airbyte-cloud"
11-
runs-on: ubuntu-24.04
12-
steps:
13-
- name: Repository Dispatch
14-
uses: peter-evans/repository-dispatch@v2
15-
with:
16-
token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
17-
repository: airbytehq/airbyte-cloud
18-
event-type: oss-push-to-master
19-
client-payload: '{"ref": "${{ github.ref }}", "sha": "${{ github.sha }}"}'

.secrets

-1
This file was deleted.

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedBootstrap.kt

+14-2
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,25 @@ sealed class FeedBootstrap<T : Feed>(
118118
.filter { it.streams.contains(stream) }
119119
.firstOrNull()
120120

121+
// Ideally we should check if sync is trigger-based CDC by checking source connector
122+
// configuration. But we don't have that information here. So this is just a hacky solution
123+
private val isTriggerBasedCdc: Boolean =
124+
precedingGlobalFeed == null &&
125+
metaFieldDecorator.globalCursor != null &&
126+
stream.schema.none { it.id == metaFieldDecorator.globalCursor?.id } &&
127+
stream.configuredCursor?.id == metaFieldDecorator.globalCursor?.id &&
128+
stream.configuredSyncMode == ConfiguredSyncMode.INCREMENTAL
129+
121130
private val defaultRecordData: ObjectNode =
122131
Jsons.objectNode().also { recordData: ObjectNode ->
123132
stream.schema.forEach { recordData.putNull(it.id) }
124-
if (feed is Stream && precedingGlobalFeed != null) {
133+
if (feed is Stream && precedingGlobalFeed != null || isTriggerBasedCdc) {
125134
metaFieldDecorator.decorateRecordData(
126135
timestamp = outputConsumer.recordEmittedAt.atOffset(ZoneOffset.UTC),
127-
globalStateValue = stateManager.scoped(precedingGlobalFeed).current(),
136+
globalStateValue =
137+
if (precedingGlobalFeed != null)
138+
stateManager.scoped(precedingGlobalFeed).current()
139+
else null,
128140
stream,
129141
recordData,
130142
)

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/FeedBootstrapTest.kt

+46
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,52 @@ class FeedBootstrapTest {
184184
)
185185
}
186186

187+
@Test
188+
fun testTriggerBasedCdcMetadataDecoration() {
189+
// Create a stream without the global cursor in its schema to simulate trigger-based CDC
190+
val triggerBasedStream =
191+
Stream(
192+
id = StreamIdentifier.from(StreamDescriptor().withName("tbl").withNamespace("ns")),
193+
schema =
194+
setOf(
195+
k,
196+
v,
197+
),
198+
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
199+
configuredPrimaryKey = listOf(k),
200+
configuredCursor =
201+
GlobalCursor, // For trigger based CDC the cursor is uniquely defined, we just
202+
// use this object for test case
203+
)
204+
205+
// Create state manager and bootstrap without a global feed
206+
val stateManager =
207+
StateManager(initialStreamStates = mapOf(triggerBasedStream to Jsons.arrayNode()))
208+
val bootstrap = triggerBasedStream.bootstrap(stateManager)
209+
val consumer = bootstrap.streamRecordConsumers().toList().first().second
210+
211+
// Test that a record gets CDC metadata decoration even without a global feed
212+
consumer.accept(Jsons.readTree("""{"k":3,"v":"trigger"}""") as ObjectNode, changes = null)
213+
214+
val recordOutput = outputConsumer.records().map(Jsons::writeValueAsString).first()
215+
val recordJson = Jsons.readTree(recordOutput)
216+
val data = recordJson.get("data")
217+
218+
// Verify CDC metadata fields are present and properly decorated
219+
Assertions.assertNotNull(data.get("_ab_cdc_lsn"))
220+
Assertions.assertNotNull(data.get("_ab_cdc_updated_at"))
221+
Assertions.assertNotNull(data.get("_ab_cdc_deleted_at"))
222+
223+
// The _ab_cdc_lsn should be decorated with the transaction timestamp
224+
Assertions.assertTrue(data.get("_ab_cdc_lsn").isTextual)
225+
226+
// _ab_cdc_updated_at should be a timestamp string
227+
Assertions.assertTrue(data.get("_ab_cdc_updated_at").isTextual)
228+
229+
// _ab_cdc_deleted_at should be null for non-deleted records
230+
Assertions.assertTrue(data.get("_ab_cdc_deleted_at").isNull)
231+
}
232+
187233
companion object {
188234
const val GLOBAL_RECORD_DATA =
189235
"""{"k":1,"v":"foo","_ab_cdc_lsn":123,"_ab_cdc_updated_at":"2024-03-01T01:02:03.456789","_ab_cdc_deleted_at":null}"""

airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/TestMetaFieldDecorator.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode
88
import com.fasterxml.jackson.databind.node.ObjectNode
99
import io.airbyte.cdk.command.OpaqueStateValue
1010
import io.airbyte.cdk.read.Stream
11-
import io.airbyte.cdk.util.Jsons
1211
import io.micronaut.context.annotation.Requires
1312
import io.micronaut.context.env.Environment
1413
import jakarta.inject.Singleton
@@ -43,7 +42,7 @@ class TestMetaFieldDecorator : MetaFieldDecorator {
4342
recordData.set<JsonNode>(
4443
GlobalCursor.id,
4544
if (globalStateValue == null) {
46-
Jsons.nullNode()
45+
CdcOffsetDateTimeMetaFieldType.jsonEncoder.encode(timestamp)
4746
} else {
4847
CdcStringMetaFieldType.jsonEncoder.encode(globalStateValue.toString())
4948
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationCatalog.kt

+5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
3535
?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name")
3636
}
3737

38+
fun getStream(descriptor: DestinationStream.Descriptor): DestinationStream {
39+
return byDescriptor[descriptor]
40+
?: throw IllegalArgumentException("Stream not found: $descriptor")
41+
}
42+
3843
fun asProtocolObject(): ConfiguredAirbyteCatalog =
3944
ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() })
4045

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt

+40-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,17 @@ import io.airbyte.cdk.load.command.DestinationConfiguration
99
import io.airbyte.cdk.load.command.DestinationStream
1010
import io.airbyte.cdk.load.message.BatchEnvelope
1111
import io.airbyte.cdk.load.message.ChannelMessageQueue
12+
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
1213
import io.airbyte.cdk.load.message.MultiProducerChannel
14+
import io.airbyte.cdk.load.message.PartitionedQueue
15+
import io.airbyte.cdk.load.message.PipelineEvent
16+
import io.airbyte.cdk.load.message.StreamKey
17+
import io.airbyte.cdk.load.pipeline.BatchUpdate
1318
import io.airbyte.cdk.load.state.ReservationManager
19+
import io.airbyte.cdk.load.state.Reserved
1420
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
1521
import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage
22+
import io.airbyte.cdk.load.write.LoadStrategy
1623
import io.github.oshai.kotlinlogging.KotlinLogging
1724
import io.micronaut.context.annotation.Factory
1825
import io.micronaut.context.annotation.Value
@@ -94,5 +101,37 @@ class SyncBeanFactory {
94101

95102
@Singleton
96103
@Named("openStreamQueue")
97-
class OpenStreamQueue : ChannelMessageQueue<DestinationStream>()
104+
class OpenStreamQueue : ChannelMessageQueue<DestinationStream>(Channel(Channel.UNLIMITED))
105+
106+
/**
107+
* If the client uses a new-style LoadStrategy, then we need to checkpoint by checkpoint id
108+
* instead of record index.
109+
*/
110+
@Singleton
111+
@Named("checkpointById")
112+
fun isCheckpointById(loadStrategy: LoadStrategy? = null): Boolean = loadStrategy != null
113+
114+
/**
115+
* A single record queue for the whole sync, containing all streams, optionally partitioned by a
116+
* configurable number of partitions. Number of partitions is controlled by the specified
117+
* LoadStrategy, if any.
118+
*/
119+
@Singleton
120+
@Named("recordQueue")
121+
fun recordQueue(
122+
loadStrategy: LoadStrategy? = null,
123+
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>> {
124+
return PartitionedQueue(
125+
Array(loadStrategy?.inputPartitions ?: 1) {
126+
ChannelMessageQueue(Channel(Channel.UNLIMITED))
127+
}
128+
)
129+
}
130+
131+
/** A queue for updating batch states, which is not partitioned. */
132+
@Singleton
133+
@Named("batchStateUpdateQueue")
134+
fun batchStateUpdateQueue(): ChannelMessageQueue<BatchUpdate> {
135+
return ChannelMessageQueue(Channel(100))
136+
}
98137
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt

+77
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import com.fasterxml.jackson.databind.JsonSerializer
1111
import com.fasterxml.jackson.databind.SerializerProvider
1212
import com.fasterxml.jackson.databind.annotation.JsonSerialize
1313
import com.fasterxml.jackson.databind.node.NullNode
14+
import io.airbyte.cdk.load.message.Meta
15+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
16+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
1417
import java.math.BigDecimal
1518
import java.math.BigInteger
1619
import java.time.LocalDate
@@ -153,3 +156,77 @@ private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
153156
}
154157

155158
@JvmInline value class UnknownValue(val value: JsonNode) : AirbyteValue
159+
160+
/**
161+
* Represents an "enriched" (/augmented) Airbyte value with additional metadata.
162+
*
163+
* @property value The actual [AirbyteValue]
164+
* @property type The type ([AirbyteType]) of the [AirbyteValue]
165+
* @property changes List of [Meta.Change]s that have been applied to this value
166+
* @property name Field name
167+
* @property fieldCategory [FieldCategory] of the field
168+
*/
169+
data class EnrichedAirbyteValue(
170+
val value: AirbyteValue,
171+
val type: AirbyteType,
172+
val changes: List<Meta.Change> = emptyList(),
173+
val name: String,
174+
val fieldCategory: FieldCategory
175+
) {
176+
init {
177+
require(name.isNotBlank()) { "Field name cannot be blank" }
178+
}
179+
180+
/**
181+
* Creates a nullified version of this value with the specified reason.
182+
*
183+
* @param reason The [Reason] for nullification, defaults to DESTINATION_SERIALIZATION_ERROR
184+
* @return A new [EnrichedAirbyteValue] with a null value and an additional change record
185+
*/
186+
fun toNullified(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR): EnrichedAirbyteValue {
187+
val nullChange =
188+
Meta.Change(
189+
field = name,
190+
change = AirbyteRecordMessageMetaChange.Change.NULLED,
191+
reason = reason
192+
)
193+
194+
// Return a copy with null value and the new change added to the changes list
195+
return copy(value = NullValue, changes = changes + nullChange)
196+
}
197+
198+
/**
199+
* Creates a truncated version of this value with the specified reason and new value.
200+
*
201+
* @param reason The [Reason] for truncation, defaults to DESTINATION_RECORD_SIZE_LIMITATION
202+
* @param newValue The new (truncated) value to use
203+
* @return A new [EnrichedAirbyteValue] with the truncated value and an additional change record
204+
*/
205+
fun toTruncated(
206+
reason: Reason = Reason.DESTINATION_RECORD_SIZE_LIMITATION,
207+
newValue: AirbyteValue
208+
): EnrichedAirbyteValue {
209+
val truncateChange =
210+
Meta.Change(
211+
field = name,
212+
change = AirbyteRecordMessageMetaChange.Change.TRUNCATED,
213+
reason = reason
214+
)
215+
216+
// Return a copy with null value and the new change added to the changes list
217+
return copy(value = newValue, changes = changes + truncateChange)
218+
}
219+
}
220+
221+
/**
222+
* The [EnrichedAirbyteValue] category allows us to quickly understand if the field is an Airbyte
223+
* controlled field or if it is declared by the source.
224+
*/
225+
enum class FieldCategory {
226+
RAW_ID,
227+
EXTRACTED_AT,
228+
META,
229+
GENERATION_ID,
230+
// For fields that don't match any of the predefined Airbyte columns
231+
CLIENT_DATA
232+
}

0 commit comments

Comments
 (0)