Skip to content

Commit f7b7735

Browse files
committed
Merge branch 'master' into btkcodedev/faunaMigrtepoetry
2 parents e7e6dda + c78f588 commit f7b7735

File tree

1,498 files changed

+75729
-55991
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,498 files changed

+75729
-55991
lines changed

.github/labeler.yml

-20
This file was deleted.

.github/workflows/assign-issue-to-project.yaml

-34
This file was deleted.

.github/workflows/community_ci.yml

+4-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ jobs:
3737
echo "The fork has changes in protected paths. This is not allowed."
3838
exit 1
3939
40-
connectors_early_ci:
41-
name: Run connectors early CI on fork
40+
connectors_pre_release_checks:
41+
name: Run pre-release checks on fork
4242
if: github.event.pull_request.head.repo.fork == true
4343
needs: fail_on_protected_path_changes
4444
environment: community-ci-auto
@@ -75,7 +75,7 @@ jobs:
7575
with:
7676
context: "pull_request"
7777
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
78-
subcommand: "connectors --modified test --only-step=qa_checks --only-step=version_inc_check --global-status-check-context='Connectors early CI checks' --global-status-check-description='Running early CI checks on connectors'"
78+
subcommand: "connectors --modified test --only-step=qa_checks --only-step=version_inc_check --global-status-check-context='Connector Pre-Release Checks' --global-status-check-description='Running pre-release checks...'"
7979
is_fork: "true"
8080
git_repo_url: ${{ github.event.pull_request.head.repo.clone_url }}
8181
git_branch: ${{ github.head_ref }}
@@ -144,7 +144,7 @@ jobs:
144144
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
145145
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
146146
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
147-
subcommand: "connectors --modified test"
147+
subcommand: "connectors --modified test --skip-step=qa_checks --skip-step=version_inc_check"
148148
is_fork: "true"
149149
- name: Upload pipeline reports
150150
id: upload-artifact

.github/workflows/label-github-issues-by-path.yml

-17
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,19 +0,0 @@
1-
name: Trigger action in cloud based on push
2-
on:
3-
push:
4-
branches:
5-
- master
6-
workflow_dispatch:
7-
8-
jobs:
9-
repo-sync:
10-
name: "Fire a Repo Dispatch event to airbyte-cloud"
11-
runs-on: ubuntu-24.04
12-
steps:
13-
- name: Repository Dispatch
14-
uses: peter-evans/repository-dispatch@v2
15-
with:
16-
token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
17-
repository: airbytehq/airbyte-cloud
18-
event-type: oss-push-to-master
19-
client-payload: '{"ref": "${{ github.ref }}", "sha": "${{ github.sha }}"}'

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedBootstrap.kt

+14-2
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,25 @@ sealed class FeedBootstrap<T : Feed>(
118118
.filter { it.streams.contains(stream) }
119119
.firstOrNull()
120120

121+
// Ideally we should check if sync is trigger-based CDC by checking source connector
122+
// configuration. But we don't have that information here. So this is just a hacky solution
123+
private val isTriggerBasedCdc: Boolean =
124+
precedingGlobalFeed == null &&
125+
metaFieldDecorator.globalCursor != null &&
126+
stream.schema.none { it.id == metaFieldDecorator.globalCursor?.id } &&
127+
stream.configuredCursor?.id == metaFieldDecorator.globalCursor?.id &&
128+
stream.configuredSyncMode == ConfiguredSyncMode.INCREMENTAL
129+
121130
private val defaultRecordData: ObjectNode =
122131
Jsons.objectNode().also { recordData: ObjectNode ->
123132
stream.schema.forEach { recordData.putNull(it.id) }
124-
if (feed is Stream && precedingGlobalFeed != null) {
133+
if (feed is Stream && precedingGlobalFeed != null || isTriggerBasedCdc) {
125134
metaFieldDecorator.decorateRecordData(
126135
timestamp = outputConsumer.recordEmittedAt.atOffset(ZoneOffset.UTC),
127-
globalStateValue = stateManager.scoped(precedingGlobalFeed).current(),
136+
globalStateValue =
137+
if (precedingGlobalFeed != null)
138+
stateManager.scoped(precedingGlobalFeed).current()
139+
else null,
128140
stream,
129141
recordData,
130142
)

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/FeedBootstrapTest.kt

+46
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,52 @@ class FeedBootstrapTest {
184184
)
185185
}
186186

187+
@Test
188+
fun testTriggerBasedCdcMetadataDecoration() {
189+
// Create a stream without the global cursor in its schema to simulate trigger-based CDC
190+
val triggerBasedStream =
191+
Stream(
192+
id = StreamIdentifier.from(StreamDescriptor().withName("tbl").withNamespace("ns")),
193+
schema =
194+
setOf(
195+
k,
196+
v,
197+
),
198+
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
199+
configuredPrimaryKey = listOf(k),
200+
configuredCursor =
201+
GlobalCursor, // For trigger based CDC the cursor is uniquely defined, we just
202+
// use this object for test case
203+
)
204+
205+
// Create state manager and bootstrap without a global feed
206+
val stateManager =
207+
StateManager(initialStreamStates = mapOf(triggerBasedStream to Jsons.arrayNode()))
208+
val bootstrap = triggerBasedStream.bootstrap(stateManager)
209+
val consumer = bootstrap.streamRecordConsumers().toList().first().second
210+
211+
// Test that a record gets CDC metadata decoration even without a global feed
212+
consumer.accept(Jsons.readTree("""{"k":3,"v":"trigger"}""") as ObjectNode, changes = null)
213+
214+
val recordOutput = outputConsumer.records().map(Jsons::writeValueAsString).first()
215+
val recordJson = Jsons.readTree(recordOutput)
216+
val data = recordJson.get("data")
217+
218+
// Verify CDC metadata fields are present and properly decorated
219+
Assertions.assertNotNull(data.get("_ab_cdc_lsn"))
220+
Assertions.assertNotNull(data.get("_ab_cdc_updated_at"))
221+
Assertions.assertNotNull(data.get("_ab_cdc_deleted_at"))
222+
223+
// The _ab_cdc_lsn should be decorated with the transaction timestamp
224+
Assertions.assertTrue(data.get("_ab_cdc_lsn").isTextual)
225+
226+
// _ab_cdc_updated_at should be a timestamp string
227+
Assertions.assertTrue(data.get("_ab_cdc_updated_at").isTextual)
228+
229+
// _ab_cdc_deleted_at should be null for non-deleted records
230+
Assertions.assertTrue(data.get("_ab_cdc_deleted_at").isNull)
231+
}
232+
187233
companion object {
188234
const val GLOBAL_RECORD_DATA =
189235
"""{"k":1,"v":"foo","_ab_cdc_lsn":123,"_ab_cdc_updated_at":"2024-03-01T01:02:03.456789","_ab_cdc_deleted_at":null}"""

airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/TestMetaFieldDecorator.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode
88
import com.fasterxml.jackson.databind.node.ObjectNode
99
import io.airbyte.cdk.command.OpaqueStateValue
1010
import io.airbyte.cdk.read.Stream
11-
import io.airbyte.cdk.util.Jsons
1211
import io.micronaut.context.annotation.Requires
1312
import io.micronaut.context.env.Environment
1413
import jakarta.inject.Singleton
@@ -43,7 +42,7 @@ class TestMetaFieldDecorator : MetaFieldDecorator {
4342
recordData.set<JsonNode>(
4443
GlobalCursor.id,
4544
if (globalStateValue == null) {
46-
Jsons.nullNode()
45+
CdcOffsetDateTimeMetaFieldType.jsonEncoder.encode(timestamp)
4746
} else {
4847
CdcStringMetaFieldType.jsonEncoder.encode(globalStateValue.toString())
4948
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt

+77
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import com.fasterxml.jackson.databind.JsonSerializer
1111
import com.fasterxml.jackson.databind.SerializerProvider
1212
import com.fasterxml.jackson.databind.annotation.JsonSerialize
1313
import com.fasterxml.jackson.databind.node.NullNode
14+
import io.airbyte.cdk.load.message.Meta
15+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
16+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
1417
import java.math.BigDecimal
1518
import java.math.BigInteger
1619
import java.time.LocalDate
@@ -153,3 +156,77 @@ private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
153156
}
154157

155158
@JvmInline value class UnknownValue(val value: JsonNode) : AirbyteValue
159+
160+
/**
161+
* Represents an "enriched" (/augmented) Airbyte value with additional metadata.
162+
*
163+
* @property value The actual [AirbyteValue]
164+
* @property type The type ([AirbyteType]) of the [AirbyteValue]
165+
* @property changes List of [Meta.Change]s that have been applied to this value
166+
* @property name Field name
167+
* @property fieldCategory [FieldCategory] of the field
168+
*/
169+
data class EnrichedAirbyteValue(
170+
val value: AirbyteValue,
171+
val type: AirbyteType,
172+
val changes: List<Meta.Change> = emptyList(),
173+
val name: String,
174+
val fieldCategory: FieldCategory
175+
) {
176+
init {
177+
require(name.isNotBlank()) { "Field name cannot be blank" }
178+
}
179+
180+
/**
181+
* Creates a nullified version of this value with the specified reason.
182+
*
183+
* @param reason The [Reason] for nullification, defaults to DESTINATION_SERIALIZATION_ERROR
184+
* @return A new [EnrichedAirbyteValue] with a null value and an additional change record
185+
*/
186+
fun toNullified(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR): EnrichedAirbyteValue {
187+
val nullChange =
188+
Meta.Change(
189+
field = name,
190+
change = AirbyteRecordMessageMetaChange.Change.NULLED,
191+
reason = reason
192+
)
193+
194+
// Return a copy with null value and the new change added to the changes list
195+
return copy(value = NullValue, changes = changes + nullChange)
196+
}
197+
198+
/**
199+
* Creates a truncated version of this value with the specified reason and new value.
200+
*
201+
* @param reason The [Reason] for truncation, defaults to DESTINATION_RECORD_SIZE_LIMITATION
202+
* @param newValue The new (truncated) value to use
203+
* @return A new [EnrichedAirbyteValue] with the truncated value and an additional change record
204+
*/
205+
fun toTruncated(
206+
reason: Reason = Reason.DESTINATION_RECORD_SIZE_LIMITATION,
207+
newValue: AirbyteValue
208+
): EnrichedAirbyteValue {
209+
val truncateChange =
210+
Meta.Change(
211+
field = name,
212+
change = AirbyteRecordMessageMetaChange.Change.TRUNCATED,
213+
reason = reason
214+
)
215+
216+
// Return a copy with null value and the new change added to the changes list
217+
return copy(value = newValue, changes = changes + truncateChange)
218+
}
219+
}
220+
221+
/**
222+
* The [EnrichedAirbyteValue] category allows us to quickly understand if the field is an Airbyte
223+
* controlled field or if it is declared by the source.
224+
*/
225+
enum class FieldCategory {
226+
RAW_ID,
227+
EXTRACTED_AT,
228+
META,
229+
GENERATION_ID,
230+
// For fields that don't match any of the predefined Airbyte columns
231+
CLIENT_DATA
232+
}

airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt

+12-4
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,7 @@ abstract class IntegrationTest(
208208
configContents,
209209
catalog.asProtocolObject(),
210210
useFileTransfer = useFileTransfer,
211-
micronautProperties =
212-
micronautProperties + fileTransferProperty + defaultMicronautProperties,
211+
micronautProperties = micronautProperties + fileTransferProperty,
213212
)
214213
return runBlocking(Dispatchers.IO) {
215214
launch { destination.run() }
@@ -237,6 +236,10 @@ abstract class IntegrationTest(
237236
*
238237
* A common pattern is to call [runSyncUntilStateAck], and then call `dumpAndDiffRecords(...,
239238
* allowUnexpectedRecord = true)` to verify that [records] were written to the destination.
239+
*
240+
* This forces the connector to run with microbatching enabled - without that option, tests
241+
* using this method would take significantly longer, because they would need to push 100MB
242+
* (ish) to the destination before it would ack a state message.
240243
*/
241244
fun runSyncUntilStateAck(
242245
configContents: String,
@@ -252,7 +255,7 @@ abstract class IntegrationTest(
252255
configContents,
253256
DestinationCatalog(listOf(stream)).asProtocolObject(),
254257
useFileTransfer,
255-
micronautProperties = micronautProperties + defaultMicronautProperties,
258+
micronautProperties = micronautProperties + micronautPropertyEnableMicrobatching,
256259
)
257260
return runBlocking(Dispatchers.IO) {
258261
launch {
@@ -302,7 +305,12 @@ abstract class IntegrationTest(
302305
val randomizedNamespaceRegex = Regex("test(\\d{8})[A-Za-z]{4}")
303306
val randomizedNamespaceDateFormatter: DateTimeFormatter =
304307
DateTimeFormatter.ofPattern("yyyyMMdd")
305-
val defaultMicronautProperties: Map<Property, String> =
308+
309+
/**
310+
* When set, this property forces the CDK to invoke processRecords once per record. This
311+
* allows tests which depend on state acks to run quickly.
312+
*/
313+
val micronautPropertyEnableMicrobatching: Map<Property, String> =
306314
mapOf(EnvVarConstants.RECORD_BATCH_SIZE to "1")
307315

308316
/**

0 commit comments

Comments
 (0)