Skip to content

Commit 5e5a3a3

Browse files
github-actions[bot]maxi297johnny-schmidttautvydas-vian-at-airbyte
authored
Fork Sync: Update from parent repository (#129)
* ✨ Source Intercom: adding a mock server test (airbytehq#54715) * [Destination MS SQL V2] Correct Part Size, No buffered input stream (airbytehq#55252) * [source-hibob] Changed check stream from payrolls to profiles airbytehq#55674 (airbytehq#55675) * Add llms.txt to docs.airbyte.com (airbytehq#55261) * ✨ TikTok Marketing Source: Add `Pixels`, `PixelInstantPageEvents`, `PixelEventsStatistics` streams (airbytehq#55669) Co-authored-by: Octavia Squidington III <[email protected]> * chore(ci): remove stale and unused workflows (airbytehq#55260) * ci: for community CI, rename 'early ci' to 'pre-release checks', skip duplicated tests in 'connector tests' (airbytehq#55241) * (feat: Salesloft) - Add emails_scoped_fields stream (airbytehq#55229) * 🐛 Source Outreach: remove stream_state interpolation (airbytehq#55180) Co-authored-by: Natik Gadzhi <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]> * fix(source-stripe): disable progressive rollout (airbytehq#55682) Signed-off-by: Artem Inzhyyants <[email protected]> * fix(source-instagram): Disable cache for InstagramMediaChildrenTransformation (airbytehq#55685) * [Destination MSSQL V2] Bulk Load Local Performance Test (airbytehq#55687) * [Destination-S3] File Xfer Local Performance Test (airbytehq#55220) * 🐛bug(source-hubspot): fix deals_archived and marketing_emails issues for CAT (airbytehq#54177) * ✨Source Quickbooks: Migrate to manifest-only (airbytehq#55263) * ✨ Source Zendesk Chat : Migrate to Manifest-only (airbytehq#47319) * ✨ Source Facebook Marketing: Add `learning_stage_info` field to AdSets stream (airbytehq#50418) Co-authored-by: Marcos Marx <[email protected]> Co-authored-by: marcosmarxm <[email protected]> * Source Sendgrid: Update manifest for adapting changes with AsyncRetriever (airbytehq#55185) Co-authored-by: Octavia Squidington III <[email protected]> * docs: fix broken markup in Python CDK Basic Concepts page (airbytehq#55699) Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: [email protected] <[email protected]> * Source Zendesk Talk : Restore Unit Test (airbytehq#50956) Co-authored-by: Octavia Squidington III <[email protected]> Co-authored-by: Natik Gadzhi <[email protected]> * chore: 2.0.0 release (airbytehq#55684) * Adding EnrichedAirbyteValue and DeclaredField (airbytehq#55218) Co-authored-by: Octavia Squidington III <[email protected]> * Add Airbyte Academy section (airbytehq#49964) * [source-faker] - Bump to stable 6.2.21 (airbytehq#55705) * fix: remove duplicate breaking changes from destination-mssql metadata (airbytehq#55718) * fix: restore definition ID (airbytehq#55720) * ✨ Source Freshdesk : Migrate to Manifest-only (airbytehq#54687) Co-authored-by: Octavia Squidington III <[email protected]> * chore(source-s3): update base image to 4.0.0 and use caret dependencies (do not merge) (airbytehq#55202) Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: [email protected] <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]> Co-authored-by: Aldo Gonzalez <[email protected]> * fix(Source-LinkedIn-Ads): Update outdated schema (airbytehq#55724) * Bump @babel/runtime-corejs3 from 7.23.6 to 7.26.10 in /docusaurus (airbytehq#55708) Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * 🐛 Source S3: Up CDK to fix schema type issue (airbytehq#55694) * source-shipstation contribution from suhl79 (airbytehq#55738) Co-authored-by: Octavia Squidington III <[email protected]> Co-authored-by: Marcos Marx <[email protected]> * destination-teradata: Upgrade JDBC driver (airbytehq#55183) Co-authored-by: Marcos Marx <[email protected]> * 🐛 Destination Databricks: Fix destination check test table collisions when multiple connections write to same schema. (airbytehq#55232) Co-authored-by: Octavia Squidington III <[email protected]> * (source-sendgrid) - Configure max concurrent async job count (airbytehq#55744) * pass streams to debezium sources on cold start (airbytehq#55734) * Destination S3 Data Lake: exclude invalid fields from identifier fields (airbytehq#55700) * [source-mysql] pin to cdk 0.342 (airbytehq#55754) * docs: add enterprise connector documentation (airbytehq#55751) * Add UTM source (airbytehq#55733) * Matteogp/docs sap hana update 1 (airbytehq#55696) * Destination S3 Data Lake: Handle number in primary key (airbytehq#55755) * Fix reversed assertions in MySQL source tests (airbytehq#55756) * Update CDK to pass DestinationRecordRaw around (airbytehq#55737) Co-authored-by: Octavia Squidington III <[email protected]> Co-authored-by: Edward Gao <[email protected]> * fix(ci): remove empty notify-on-push workflow file (airbytehq#55757) * Update OTEL metrics, add new linting exceptions (airbytehq#55752) * 11526 second pass through iceberg documentation (airbytehq#55736) * [source-mysql] don't do sampling for source-mysql (airbytehq#55761) Co-authored-by: Octavia Squidington III <[email protected]> * 🚨Source Fauna: Migrate to poetry (airbytehq#41051) --------- Signed-off-by: Artem Inzhyyants <[email protected]> Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]> Co-authored-by: Johnny Schmidt <[email protected]> Co-authored-by: tautvydas-v <[email protected]> Co-authored-by: Ian Alton <[email protected]> Co-authored-by: Tope Folorunso <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]> Co-authored-by: Natik Gadzhi <[email protected]> Co-authored-by: Aaron ("AJ") Steers <[email protected]> Co-authored-by: Tyler B <[email protected]> Co-authored-by: kyleromines <[email protected]> Co-authored-by: Artem Inzhyyants <[email protected]> Co-authored-by: Anatolii Yatsuk <[email protected]> Co-authored-by: Aldo Gonzalez <[email protected]> Co-authored-by: Dhroov Makwana <[email protected]> Co-authored-by: jake horban <[email protected]> Co-authored-by: Marcos Marx <[email protected]> Co-authored-by: marcosmarxm <[email protected]> Co-authored-by: btkcodedev <[email protected]> Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: [email protected] <[email protected]> Co-authored-by: Jonathan Pearlin <[email protected]> Co-authored-by: Francis Genet <[email protected]> Co-authored-by: Patrick Nilan <[email protected]> Co-authored-by: Aldo Gonzalez <[email protected]> Co-authored-by: Alfredo Garcia <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: suhl79 <[email protected]> Co-authored-by: Satish Chinthanippu <[email protected]> Co-authored-by: Sena Heydari <[email protected]> Co-authored-by: Matt Bayley <[email protected]> Co-authored-by: Edward Gao <[email protected]> Co-authored-by: Matteo Palarchio <[email protected]> Co-authored-by: Wenqi Hu <[email protected]> Co-authored-by: Yue Li <[email protected]>
1 parent 89a8e8e commit 5e5a3a3

File tree

321 files changed

+45709
-26619
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

321 files changed

+45709
-26619
lines changed

.github/labeler.yml

-20
This file was deleted.

.github/workflows/assign-issue-to-project.yaml

-34
This file was deleted.

.github/workflows/community_ci.yml

+4-4
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ jobs:
3737
echo "The fork has changes in protected paths. This is not allowed."
3838
exit 1
3939
40-
connectors_early_ci:
41-
name: Run connectors early CI on fork
40+
connectors_pre_release_checks:
41+
name: Run pre-release checks on fork
4242
if: github.event.pull_request.head.repo.fork == true
4343
needs: fail_on_protected_path_changes
4444
environment: community-ci-auto
@@ -75,7 +75,7 @@ jobs:
7575
with:
7676
context: "pull_request"
7777
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
78-
subcommand: "connectors --modified test --only-step=qa_checks --only-step=version_inc_check --global-status-check-context='Connectors early CI checks' --global-status-check-description='Running early CI checks on connectors'"
78+
subcommand: "connectors --modified test --only-step=qa_checks --only-step=version_inc_check --global-status-check-context='Connector Pre-Release Checks' --global-status-check-description='Running pre-release checks...'"
7979
is_fork: "true"
8080
git_repo_url: ${{ github.event.pull_request.head.repo.clone_url }}
8181
git_branch: ${{ github.head_ref }}
@@ -144,7 +144,7 @@ jobs:
144144
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
145145
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
146146
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
147-
subcommand: "connectors --modified test"
147+
subcommand: "connectors --modified test --skip-step=qa_checks --skip-step=version_inc_check"
148148
is_fork: "true"
149149
- name: Upload pipeline reports
150150
id: upload-artifact

.github/workflows/label-github-issues-by-path.yml

-17
This file was deleted.

.github/workflows/notify-on-push-to-master.yml

-19
This file was deleted.

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import io.airbyte.cdk.load.command.DestinationConfiguration
99
import io.airbyte.cdk.load.command.DestinationStream
1010
import io.airbyte.cdk.load.message.BatchEnvelope
1111
import io.airbyte.cdk.load.message.ChannelMessageQueue
12-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
12+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1313
import io.airbyte.cdk.load.message.MultiProducerChannel
1414
import io.airbyte.cdk.load.message.PartitionedQueue
1515
import io.airbyte.cdk.load.message.PipelineEvent
@@ -120,7 +120,7 @@ class SyncBeanFactory {
120120
@Named("recordQueue")
121121
fun recordQueue(
122122
loadStrategy: LoadStrategy? = null,
123-
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>> {
123+
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>> {
124124
return PartitionedQueue(
125125
Array(loadStrategy?.inputPartitions ?: 1) {
126126
ChannelMessageQueue(Channel(Channel.UNLIMITED))

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt

+77
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import com.fasterxml.jackson.databind.JsonSerializer
1111
import com.fasterxml.jackson.databind.SerializerProvider
1212
import com.fasterxml.jackson.databind.annotation.JsonSerialize
1313
import com.fasterxml.jackson.databind.node.NullNode
14+
import io.airbyte.cdk.load.message.Meta
15+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
16+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
1417
import java.math.BigDecimal
1518
import java.math.BigInteger
1619
import java.time.LocalDate
@@ -153,3 +156,77 @@ private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
153156
}
154157

155158
@JvmInline value class UnknownValue(val value: JsonNode) : AirbyteValue
159+
160+
/**
161+
* Represents an "enriched" (/augmented) Airbyte value with additional metadata.
162+
*
163+
* @property value The actual [AirbyteValue]
164+
* @property type The type ([AirbyteType]) of the [AirbyteValue]
165+
* @property changes List of [Meta.Change]s that have been applied to this value
166+
* @property name Field name
167+
* @property fieldCategory [FieldCategory] of the field
168+
*/
169+
data class EnrichedAirbyteValue(
170+
val value: AirbyteValue,
171+
val type: AirbyteType,
172+
val changes: List<Meta.Change> = emptyList(),
173+
val name: String,
174+
val fieldCategory: FieldCategory
175+
) {
176+
init {
177+
require(name.isNotBlank()) { "Field name cannot be blank" }
178+
}
179+
180+
/**
181+
* Creates a nullified version of this value with the specified reason.
182+
*
183+
* @param reason The [Reason] for nullification, defaults to DESTINATION_SERIALIZATION_ERROR
184+
* @return A new [EnrichedAirbyteValue] with a null value and an additional change record
185+
*/
186+
fun toNullified(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR): EnrichedAirbyteValue {
187+
val nullChange =
188+
Meta.Change(
189+
field = name,
190+
change = AirbyteRecordMessageMetaChange.Change.NULLED,
191+
reason = reason
192+
)
193+
194+
// Return a copy with null value and the new change added to the changes list
195+
return copy(value = NullValue, changes = changes + nullChange)
196+
}
197+
198+
/**
199+
* Creates a truncated version of this value with the specified reason and new value.
200+
*
201+
* @param reason The [Reason] for truncation, defaults to DESTINATION_RECORD_SIZE_LIMITATION
202+
* @param newValue The new (truncated) value to use
203+
* @return A new [EnrichedAirbyteValue] with the truncated value and an additional change record
204+
*/
205+
fun toTruncated(
206+
reason: Reason = Reason.DESTINATION_RECORD_SIZE_LIMITATION,
207+
newValue: AirbyteValue
208+
): EnrichedAirbyteValue {
209+
val truncateChange =
210+
Meta.Change(
211+
field = name,
212+
change = AirbyteRecordMessageMetaChange.Change.TRUNCATED,
213+
reason = reason
214+
)
215+
216+
// Return a copy with null value and the new change added to the changes list
217+
return copy(value = newValue, changes = changes + truncateChange)
218+
}
219+
}
220+
221+
/**
222+
* The [EnrichedAirbyteValue] category allows us to quickly understand if the field is an Airbyte
223+
* controlled field or if it is declared by the source.
224+
*/
225+
enum class FieldCategory {
226+
RAW_ID,
227+
EXTRACTED_AT,
228+
META,
229+
GENERATION_ID,
230+
// For fields that don't match any of the predefined Airbyte columns
231+
CLIENT_DATA
232+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+41
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream
1111
import io.airbyte.cdk.load.data.AirbyteType
1212
import io.airbyte.cdk.load.data.AirbyteValue
1313
import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper
14+
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
1415
import io.airbyte.cdk.load.data.IntegerValue
1516
import io.airbyte.cdk.load.data.StringValue
1617
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
@@ -143,6 +144,9 @@ data class DestinationRecord(
143144
serialized.length.toLong()
144145
)
145146
}
147+
fun asDestinationRecordRaw(): DestinationRecordRaw {
148+
return DestinationRecordRaw(stream, message, serialized)
149+
}
146150
}
147151

148152
/**
@@ -163,6 +167,43 @@ data class DestinationRecordAirbyteValue(
163167
val serializedSizeBytes: Long = 0L
164168
)
165169

170+
data class EnrichedDestinationRecordAirbyteValue(
171+
val stream: DestinationStream.Descriptor,
172+
val declaredFields: Map<String, EnrichedAirbyteValue>,
173+
val airbyteMetaFields: Map<String, EnrichedAirbyteValue>,
174+
val undeclaredFields: Map<String, JsonNode>,
175+
val emittedAtMs: Long,
176+
val meta: Meta?,
177+
val serializedSizeBytes: Long = 0L
178+
)
179+
180+
data class DestinationRecordRaw(
181+
val stream: DestinationStream.Descriptor,
182+
private val rawData: AirbyteMessage,
183+
private val serialized: String
184+
) {
185+
fun asRawJson(): JsonNode {
186+
return rawData.record.data
187+
}
188+
189+
fun asDestinationRecordAirbyteValue(): DestinationRecordAirbyteValue {
190+
return DestinationRecordAirbyteValue(
191+
stream,
192+
rawData.record.data.toAirbyteValue(),
193+
rawData.record.emittedAt,
194+
Meta(
195+
rawData.record.meta?.changes?.map { Meta.Change(it.field, it.change, it.reason) }
196+
?: emptyList()
197+
),
198+
serialized.length.toLong()
199+
)
200+
}
201+
202+
fun asEnrichedDestinationRecordAirbyteValue(): EnrichedDestinationRecordAirbyteValue {
203+
TODO()
204+
}
205+
}
206+
166207
data class DestinationFile(
167208
override val stream: DestinationStream.Descriptor,
168209
val emittedAtMs: Long,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.airbyte.cdk.load.pipeline
66

7-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
7+
import io.airbyte.cdk.load.message.DestinationRecordRaw
88
import io.airbyte.cdk.load.message.PartitionedQueue
99
import io.airbyte.cdk.load.message.PipelineEvent
1010
import io.airbyte.cdk.load.message.QueueWriter
@@ -24,8 +24,7 @@ import jakarta.inject.Singleton
2424
class DirectLoadPipelineStep<S : DirectLoader>(
2525
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
2626
@Named("recordQueue")
27-
val inputQueue:
28-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
27+
val inputQueue: PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
2928
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
3029
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
3130
val batchSizeOverride: Long? = null,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package io.airbyte.cdk.load.pipeline
66

77
import io.airbyte.cdk.load.message.Batch
8-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
8+
import io.airbyte.cdk.load.message.DestinationRecordRaw
99
import io.airbyte.cdk.load.message.WithBatchState
1010
import io.airbyte.cdk.load.message.WithStream
1111
import io.airbyte.cdk.load.write.DirectLoader
@@ -25,15 +25,12 @@ data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState
2525
@Requires(bean = DirectLoaderFactory::class)
2626
class DirectLoadRecordAccumulator<S : DirectLoader, K : WithStream>(
2727
val directLoaderFactory: DirectLoaderFactory<S>
28-
) : BatchAccumulator<S, K, DestinationRecordAirbyteValue, DirectLoadAccResult> {
28+
) : BatchAccumulator<S, K, DestinationRecordRaw, DirectLoadAccResult> {
2929
override fun start(key: K, part: Int): S {
3030
return directLoaderFactory.create(key.stream, part)
3131
}
3232

33-
override fun accept(
34-
record: DestinationRecordAirbyteValue,
35-
state: S
36-
): Pair<S, DirectLoadAccResult?> {
33+
override fun accept(record: DestinationRecordRaw, state: S): Pair<S, DirectLoadAccResult?> {
3734
state.accept(record).let {
3835
return when (it) {
3936
is Incomplete -> Pair(state, null)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.airbyte.cdk.load.pipeline
66

7-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
7+
import io.airbyte.cdk.load.message.DestinationRecordRaw
88
import io.micronaut.context.annotation.Secondary
99
import jakarta.inject.Singleton
1010
import kotlin.math.abs
@@ -14,13 +14,13 @@ import kotlin.math.abs
1414
* partitioned by a hash of the stream name and namespace.
1515
*/
1616
interface InputPartitioner {
17-
fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int
17+
fun getPartition(record: DestinationRecordRaw, numParts: Int): Int
1818
}
1919

2020
@Singleton
2121
@Secondary
2222
class ByStreamInputPartitioner : InputPartitioner {
23-
override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int {
23+
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
2424
return abs(record.stream.hashCode()) % numParts
2525
}
2626
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream
1111
import io.airbyte.cdk.load.message.BatchEnvelope
1212
import io.airbyte.cdk.load.message.ChannelMessageQueue
1313
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
14-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
14+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1515
import io.airbyte.cdk.load.message.DestinationStreamEvent
1616
import io.airbyte.cdk.load.message.MessageQueue
1717
import io.airbyte.cdk.load.message.MessageQueueSupplier
@@ -145,7 +145,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
145145
// New interface shim
146146
@Named("recordQueue")
147147
private val recordQueueForPipeline:
148-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
148+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
149149
@Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
150150
private val loadPipeline: LoadPipeline?,
151151
private val partitioner: InputPartitioner,

0 commit comments

Comments
 (0)