-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Destination-S3] Uses New Load CDK LoadStrategy Interface (temporarily disabled) #54695
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
7c788b3
to
e7f785d
Compare
7c75a47
to
f899595
Compare
a1e327c
to
35509dc
Compare
35509dc
to
dbfd7fc
Compare
dbfd7fc
to
09e3ac5
Compare
09e3ac5
to
b4349ef
Compare
8d8cb6c
to
0782ea4
Compare
b4349ef
to
a78c844
Compare
0782ea4
to
ecf8fed
Compare
a78c844
to
1f8561d
Compare
1f8561d
to
eae8200
Compare
eae8200
to
b5c5d8e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems reasonable. Had a few minor comments, plus questions about the PartToObjectAccumulator, and about how we're injecting the queues.
Can I get a link to the PR adding bulk loader? I was a bit nervous about the micronaut wiring you described on tuesday (with all the Replaces stuff, queues, etc.) - it felt like we're moving towards the same problems that platform has, with it being difficult to understand which beans are in play at any given moment (probably even worse, since we're doing stuff across classpaths, so intellij doesn't know how to discover all the beans)
but that code isn't in this PR, so 🤷
...cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
Outdated
Show resolved
Hide resolved
...cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
Outdated
Show resolved
Hide resolved
...cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
Outdated
Show resolved
Hide resolved
...cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
Outdated
Show resolved
Hide resolved
...ge/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt
Outdated
Show resolved
Hide resolved
...ain/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt
Outdated
Show resolved
Hide resolved
...ain/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt
Outdated
Show resolved
Hide resolved
...ain/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt
Outdated
Show resolved
Hide resolved
...t-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartStep.kt
Outdated
Show resolved
Hide resolved
Everything that is implicitly included by some The confusing part will be when something @Replaces something, but in practice you |
e7eff8d
to
eb39313
Compare
eb39313
to
c0cd6cc
Compare
c0cd6cc
to
6f31250
Compare
6f31250
to
f1a6b7b
Compare
f1a6b7b
to
53cbe5a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, had a few nits
...cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
Outdated
Show resolved
Hide resolved
...cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
Outdated
Show resolved
Hide resolved
.../main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderLoadedPartPartitioner.kt
Show resolved
Hide resolved
@Value("\${airbyte.destination.core.record-batch-size-override:null}") | ||
val batchSizeOverride: Long? = null, | ||
) : LoadPipelineStep { | ||
override val numWorkers: Int = objectLoader.numPartWorkers | ||
private val streamCompletionMap = ConcurrentHashMap<DestinationStream.Descriptor, AtomicLong>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does streamCompletionMap
need to be in the LoadPipelineStepTask constructor? seems a bit weird to require every Step implementation to pass in this hashmap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it needs to be shared across tasks and it needs to be affined to the step (ie, it can't be global).
I think breaking out a factory makes sense. i was thinking something like
LoadPipelineStepTaskFactory:
createFirstStep(... arguments except input queue/input partitioner/batch queue/num workers/stream completions/output queue)
createMiddleStep(...same as above but also takes input queue, but injects the partitioner)
createFinalStep(...same as middle but doesn't require output)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did that and went ahead and updated DirectLoader. (I'm punting on Object loader, since it's about to change a little with BulkLoad and I want to avoid the rebase.)
ie,
@Singleton
@Requires(bean = DirectLoaderFactory::class)
class DirectLoadPipelineStep<S : DirectLoader>(
val directLoaderFactory: DirectLoaderFactory<S>,
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
val taskFactory: LoadPipelineStepTaskFactory,
) : LoadPipelineStep {
private val log = KotlinLogging.logger {}
override val numWorkers: Int = directLoaderFactory.inputPartitions
override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
log.info { "Creating DirectLoad pipeline step task for partition $partition" }
return taskFactory.createOnlyStep<S, StreamKey, DirectLoadAccResult>(accumulator, partition, numWorkers)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also reran tests for S3DataLake (temporarily unpinned)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
13001de (I fixed the constructor, too)
as | ||
PartitionedQueue< | ||
PipelineEvent<ObjectKey, ObjectLoaderUploadCompleter.UploadResult>>?, | ||
null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's have an explicit flushStrategy = null
here - wasn't immediately obvious that this is how we implement the enclosing step should be configured not to flush
in UploadCompleter.finish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's hidden by the factory now.
In practice, forcing only makes sense on the first step (it will finish early, and that effect will propagate downstream.)
...ge/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems reasonable - I'll take your word that the factory createFirstStep / etc. methods make sense, since you're only calling createOnlyStep here
(but I think you meant to replace the constructor calls with factory calls, in the ObjectFooStep classes?)
@@ -28,7 +28,8 @@ class ObjectLoaderUploadCompleterStep( | |||
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>, | |||
) : LoadPipelineStep { | |||
override val numWorkers: Int = objectLoader.numUploadCompleters | |||
private val streamCompletionMap = ConcurrentHashMap<DestinationStream.Descriptor, AtomicLong>() | |||
private val streamCompletionMap = | |||
ConcurrentHashMap<DestinationStream.Descriptor, AtomicInteger>() | |||
|
|||
override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> { | |||
return LoadPipelineStepTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean to replace this with a factory call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't make the changes to the object steps yet, but I will go ahead and do it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to avoid a rebase but the changes are actually really light
@Value("\${airbyte.destination.core.record-batch-size-override:null}") | ||
val batchSizeOverride: Long? = null, | ||
) { | ||
private val streamCompletions = ConcurrentHashMap<DestinationStream.Descriptor, AtomicInteger>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to be affined to the step (ie, it can't be global)
should this be a function so that we get a new instance on each invocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's worse than that 🙃
we can't do that, because each individual task would get its own map. we need the counts to be affined to the step and shared across all the workers in the step.
After some local testing, the right pattern is a map of Pair<StepIndex, Descriptor> -> Count
. Not too bad since the index is deterministic on the first, final, and only steps, so the factory can set it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one nonblocking comment, feel free to
@@ -221,6 +226,7 @@ class LoadPipelineStepTaskFactory( | |||
flushStrategy: PipelineFlushStrategy?, | |||
part: Int, | |||
numWorkers: Int, | |||
taskIndex: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nonblocking: maybe nicer to use taskName:String? then we don't have magic numbers like -1, 2, etc. floating around, and is maybe also useful for future debugging
19eaebd
to
f48b486
Compare
f48b486
to
5cd5111
Compare
What
Migrate S3 (and object storage in general) to the new
ObjectLoader
interface using theLoadPipeline
. This has been cloud tested for performance and reliability, but I'm leaving it disabled so that other connectors can use the code w/o waiting on a full S3 release.How
Creates a new
LoadStrategy
calledObjectLoader
. (See the documentation on the interface for usage guidelines for the connector dev. This will also be the foundation for any GCS, Azure, or other file-like interfaces. This is also the foundation for BulkLoad, in progress here.If the
ObjectLoader
bean is present, also createsObjectLoaderRecordToPartAccumulator
: straight port ofRecordToPartAccumulator
, except that it uses the per-record interface instead of the old batch iterator interfaceObjectLoaderPartToObjectAccumulator
: same as above, but a straight port ofPartToObjectAccumulator
LoadPipelineStep
)ObjectLoaderPartQueue
for passing finished parts from the first step to the second step (ObjectLoaderPartStep
andObjectLoaderObjectStep
)ObjectLoaderPipeline
that sequences the two steps into aLoadPipeline
for injection (presence of anyLoadPipeline
is enough to trigger the core using the new interface)Additionally, there are round-robin partitioners to distribute the incoming records and generated parts to the steps
Also, I made a couple of minor changes to the pipeline interface
Reserved<...>
around, it just has a callback for post-processing the message (it was impossible to make a 2-step pipeline otherwise)suspend
-- this caused ~20% performance increase for s3 (and had no effect on iceberg)