Skip to content

Commit 22f150b

Browse files
Merge branch 'master' into 11873-ai-agent-in-docs
2 parents b236020 + d8d480f commit 22f150b

File tree

1,128 files changed

+15104
-13938
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,128 files changed

+15104
-13938
lines changed

.github/labeler.yml

-20
This file was deleted.

.github/workflows/assign-issue-to-project.yaml

-34
This file was deleted.

.github/workflows/community_ci.yml

+4-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ jobs:
3737
echo "The fork has changes in protected paths. This is not allowed."
3838
exit 1
3939
40-
connectors_early_ci:
41-
name: Run connectors early CI on fork
40+
connectors_pre_release_checks:
41+
name: Run pre-release checks on fork
4242
if: github.event.pull_request.head.repo.fork == true
4343
needs: fail_on_protected_path_changes
4444
environment: community-ci-auto
@@ -75,7 +75,7 @@ jobs:
7575
with:
7676
context: "pull_request"
7777
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
78-
subcommand: "connectors --modified test --only-step=qa_checks --only-step=version_inc_check --global-status-check-context='Connectors early CI checks' --global-status-check-description='Running early CI checks on connectors'"
78+
subcommand: "connectors --modified test --only-step=qa_checks --only-step=version_inc_check --global-status-check-context='Connector Pre-Release Checks' --global-status-check-description='Running pre-release checks...'"
7979
is_fork: "true"
8080
git_repo_url: ${{ github.event.pull_request.head.repo.clone_url }}
8181
git_branch: ${{ github.head_ref }}
@@ -144,7 +144,7 @@ jobs:
144144
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
145145
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
146146
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
147-
subcommand: "connectors --modified test"
147+
subcommand: "connectors --modified test --skip-step=qa_checks --skip-step=version_inc_check"
148148
is_fork: "true"
149149
- name: Upload pipeline reports
150150
id: upload-artifact

.github/workflows/label-github-issues-by-path.yml

-17
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,19 +0,0 @@
1-
name: Trigger action in cloud based on push
2-
on:
3-
push:
4-
branches:
5-
- master
6-
workflow_dispatch:
7-
8-
jobs:
9-
repo-sync:
10-
name: "Fire a Repo Dispatch event to airbyte-cloud"
11-
runs-on: ubuntu-24.04
12-
steps:
13-
- name: Repository Dispatch
14-
uses: peter-evans/repository-dispatch@v2
15-
with:
16-
token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
17-
repository: airbytehq/airbyte-cloud
18-
event-type: oss-push-to-master
19-
client-payload: '{"ref": "${{ github.ref }}", "sha": "${{ github.sha }}"}'

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/FeedBootstrap.kt

+14-2
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,25 @@ sealed class FeedBootstrap<T : Feed>(
118118
.filter { it.streams.contains(stream) }
119119
.firstOrNull()
120120

121+
// Ideally we should check if sync is trigger-based CDC by checking source connector
122+
// configuration. But we don't have that information here. So this is just a hacky solution
123+
private val isTriggerBasedCdc: Boolean =
124+
precedingGlobalFeed == null &&
125+
metaFieldDecorator.globalCursor != null &&
126+
stream.schema.none { it.id == metaFieldDecorator.globalCursor?.id } &&
127+
stream.configuredCursor?.id == metaFieldDecorator.globalCursor?.id &&
128+
stream.configuredSyncMode == ConfiguredSyncMode.INCREMENTAL
129+
121130
private val defaultRecordData: ObjectNode =
122131
Jsons.objectNode().also { recordData: ObjectNode ->
123132
stream.schema.forEach { recordData.putNull(it.id) }
124-
if (feed is Stream && precedingGlobalFeed != null) {
133+
if (feed is Stream && precedingGlobalFeed != null || isTriggerBasedCdc) {
125134
metaFieldDecorator.decorateRecordData(
126135
timestamp = outputConsumer.recordEmittedAt.atOffset(ZoneOffset.UTC),
127-
globalStateValue = stateManager.scoped(precedingGlobalFeed).current(),
136+
globalStateValue =
137+
if (precedingGlobalFeed != null)
138+
stateManager.scoped(precedingGlobalFeed).current()
139+
else null,
128140
stream,
129141
recordData,
130142
)

airbyte-cdk/bulk/core/extract/src/test/kotlin/io/airbyte/cdk/read/FeedBootstrapTest.kt

+46
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,52 @@ class FeedBootstrapTest {
184184
)
185185
}
186186

187+
@Test
188+
fun testTriggerBasedCdcMetadataDecoration() {
189+
// Create a stream without the global cursor in its schema to simulate trigger-based CDC
190+
val triggerBasedStream =
191+
Stream(
192+
id = StreamIdentifier.from(StreamDescriptor().withName("tbl").withNamespace("ns")),
193+
schema =
194+
setOf(
195+
k,
196+
v,
197+
),
198+
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
199+
configuredPrimaryKey = listOf(k),
200+
configuredCursor =
201+
GlobalCursor, // For trigger based CDC the cursor is uniquely defined, we just
202+
// use this object for test case
203+
)
204+
205+
// Create state manager and bootstrap without a global feed
206+
val stateManager =
207+
StateManager(initialStreamStates = mapOf(triggerBasedStream to Jsons.arrayNode()))
208+
val bootstrap = triggerBasedStream.bootstrap(stateManager)
209+
val consumer = bootstrap.streamRecordConsumers().toList().first().second
210+
211+
// Test that a record gets CDC metadata decoration even without a global feed
212+
consumer.accept(Jsons.readTree("""{"k":3,"v":"trigger"}""") as ObjectNode, changes = null)
213+
214+
val recordOutput = outputConsumer.records().map(Jsons::writeValueAsString).first()
215+
val recordJson = Jsons.readTree(recordOutput)
216+
val data = recordJson.get("data")
217+
218+
// Verify CDC metadata fields are present and properly decorated
219+
Assertions.assertNotNull(data.get("_ab_cdc_lsn"))
220+
Assertions.assertNotNull(data.get("_ab_cdc_updated_at"))
221+
Assertions.assertNotNull(data.get("_ab_cdc_deleted_at"))
222+
223+
// The _ab_cdc_lsn should be decorated with the transaction timestamp
224+
Assertions.assertTrue(data.get("_ab_cdc_lsn").isTextual)
225+
226+
// _ab_cdc_updated_at should be a timestamp string
227+
Assertions.assertTrue(data.get("_ab_cdc_updated_at").isTextual)
228+
229+
// _ab_cdc_deleted_at should be null for non-deleted records
230+
Assertions.assertTrue(data.get("_ab_cdc_deleted_at").isNull)
231+
}
232+
187233
companion object {
188234
const val GLOBAL_RECORD_DATA =
189235
"""{"k":1,"v":"foo","_ab_cdc_lsn":123,"_ab_cdc_updated_at":"2024-03-01T01:02:03.456789","_ab_cdc_deleted_at":null}"""

airbyte-cdk/bulk/core/extract/src/testFixtures/kotlin/io/airbyte/cdk/discover/TestMetaFieldDecorator.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode
88
import com.fasterxml.jackson.databind.node.ObjectNode
99
import io.airbyte.cdk.command.OpaqueStateValue
1010
import io.airbyte.cdk.read.Stream
11-
import io.airbyte.cdk.util.Jsons
1211
import io.micronaut.context.annotation.Requires
1312
import io.micronaut.context.env.Environment
1413
import jakarta.inject.Singleton
@@ -43,7 +42,7 @@ class TestMetaFieldDecorator : MetaFieldDecorator {
4342
recordData.set<JsonNode>(
4443
GlobalCursor.id,
4544
if (globalStateValue == null) {
46-
Jsons.nullNode()
45+
CdcOffsetDateTimeMetaFieldType.jsonEncoder.encode(timestamp)
4746
} else {
4847
CdcStringMetaFieldType.jsonEncoder.encode(globalStateValue.toString())
4948
}

airbyte-cdk/bulk/toolkits/load-azure-blob-storage/src/main/kotlin/io/airbyte/cdk/load/file/azureBlobStorage/AzureBlobStreamingUpload.kt

+5-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import io.airbyte.cdk.load.command.azureBlobStorage.AzureBlobStorageConfiguratio
99
import io.airbyte.cdk.load.file.object_storage.StreamingUpload
1010
import io.airbyte.cdk.load.util.setOnce
1111
import io.github.oshai.kotlinlogging.KotlinLogging
12-
import java.io.BufferedInputStream
1312
import java.nio.ByteBuffer
1413
import java.util.Base64
1514
import java.util.concurrent.ConcurrentSkipListMap
@@ -42,14 +41,16 @@ class AzureBlobStreamingUpload(
4241

4342
// The stageBlock call can be done asynchronously or blocking.
4443
// Here we use the blocking call in a coroutine context.
45-
BufferedInputStream(part.inputStream()).use {
44+
part.inputStream().use {
4645
blockBlobClient.stageBlock(
4746
blockId,
4847
it,
4948
part.size.toLong(),
5049
)
5150
}
5251

52+
log.info { "Staged block #$index => $rawBlockId (encoded = $blockId)" }
53+
5354
// Keep track of the blocks in the order they arrived (or the index).
5455
blockIds[index] = blockId
5556
}
@@ -77,6 +78,8 @@ class AzureBlobStreamingUpload(
7778
blockBlobClient.setMetadata(filteredMetadata)
7879
}
7980
}
81+
} else {
82+
log.warn { "Complete called multiple times for ${blockBlobClient.blobName}" }
8083
}
8184

8285
return AzureBlob(blockBlobClient.blobName, config)

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt

-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ class MSSQLCSVFormattingWriter(
117117
private val printer = finalSchema.toCsvPrinterWithHeader(outputStream)
118118
private val mssqlRowValidator = MSSQLCsvRowValidator(validateValuesPreLoad)
119119
override fun accept(record: DestinationRecordAirbyteValue) {
120-
121120
printer.printRecord(
122121
mssqlRowValidator
123122
.validate(record, this.finalSchema)

airbyte-ci/connectors/pipelines/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ If you are developing on pipelines, we recommend installing airbyte-ci with poet
8585
```bash
8686
cd airbyte-ci/connectors/pipelines/
8787
poetry install
88-
poetry shell
88+
poetry env activate
8989
cd ../../
9090
```
9191

airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/list/pipeline.py

-3
This file was deleted.

airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/manifest_only_connectors.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class ManifestOnlyConnectorUnitTests(PytestStep):
6464

6565
title = "Manifest-only unit tests"
6666
test_directory_name = "unit_tests"
67-
common_test_dependencies = ["pytest"]
67+
common_test_dependencies = ["pytest", "requests-mock"]
6868

6969
async def install_testing_environment(
7070
self,

airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ async def install_testing_environment(
168168
)
169169
if self.common_test_dependencies:
170170
container_with_test_deps = container_with_test_deps.with_user("root").with_exec(
171-
["pip", "install", f'{" ".join(self.common_test_dependencies)}']
171+
["pip", "install"] + self.common_test_dependencies
172172
)
173173

174174
container_with_test_deps = (

airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
MANIFEST_FILE_NAME = "manifest.yaml"
3636
METADATA_ICON_FILE_NAME = "icon.svg"
3737
DIFF_FILTER = "MADRT" # Modified, Added, Deleted, Renamed, Type changed
38-
IGNORED_FILE_EXTENSIONS: List[str] = []
38+
IGNORED_FILE_EXTENSIONS: List[str] = [".md"]
3939

4040

4141
# This utils will probably be redundant once https://github.com/dagger/dagger/issues/3764 is implemented

0 commit comments

Comments
 (0)