-
Notifications
You must be signed in to change notification settings - Fork 462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] feat!: kernel-based log replay #3137
base: main
Are you sure you want to change the base?
Conversation
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
Not in its current form, but updating Snapshot and with that the log segment needs to definitely go in here... |
f8049db
to
e7c7766
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3137 +/- ##
==========================================
- Coverage 71.90% 71.45% -0.45%
==========================================
Files 137 144 +7
Lines 44263 45351 +1088
Branches 44263 45351 +1088
==========================================
+ Hits 31826 32406 +580
- Misses 10397 10797 +400
- Partials 2040 2148 +108 ☔ View full report in Codecov by Sentry. |
d59867e
to
4f8ff2d
Compare
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
4f8ff2d
to
5d2cf48
Compare
@roeap I assume with the introduction of the CommitCacheObjectStore, you would want to have the instantiate two object_stores on the log store, one for commits, and one for reading/writing parquet. With regards to the object store for reading/writing parquet, the folks at "seafowl" built an interesting caching layer for reading parquets https://github.com/splitgraph/seafowl/blob/main/src/object_store/cache.rs, I asked whether they could publish that as a crate, I think it could be really valuable for read operations during some operations that require scans |
Well .. this very naive caching implementation is mainly meant for now to not double down on some of the "regrets" from our pasts selves when it comes to the Snapshot implementation. By now the parquet read is very selective in delta-rs and delta-kernel-rs with column selection and row group filtering... as such the assumption is, that we do not need to cache data from checkpoints and focus on caching all these expensive json commit reads. This simplifies the data we keep in memory significantly - essentially just reconciled add action data. While not incurring too much of a penalty for repeated json (commit) reads. But this is mostly just a stop-gap for adopting kernel "the right way", or at least not in an obviously wrong way 😆. As you rightfully mention, there is much more that can be done. IIRC, datafusion also at least has the wiring to inject caching of parquet footers, which should make scanning snapshots for actions other then adds also much more efficient. Without having spend too much time thinking about it, I think the abstraction you mentioned is much nicer - i.e. we are aware of what type of file we are reading. For us this would in a kernel world mean we would hoist some caching up to a higher level, the json and parquet handler traits in One could argue that this is more or less what we are doing now, keeping all arrow state in memory, but I would say that we can build something much more efficient - and shareable across snapshots - at the engine layer. Also do things like local file cache etc .. One thing I discussed with @rtyler is to move the caching object store to a dedicated PR, as we can get that merged much quicker then this one - which may yet take some time :). Also, we can think about if we can (and should) iterate on our configuration system a bit. The tombstones config for instance has no effect for a while now. |
@roeap on your last note, I think that could be useful indeed to already provide the benefit of it. I haven't looked to depth in to that code, but I assume you can limit the cache size? |
Indeed you can - right now its a hard-coded count, bit in a separate PR this should be configurable. The crate also allows in a simple way to use other weights - e.g. limit by size, as well as choose eviction policies. Some of which we should allow users to configure, but hopefully we can just have great defaults based on what we know about delta tables 😆. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall!
Self { | ||
inner, | ||
check: Arc::new(cache_json), | ||
cache: Arc::new(Cache::new(100)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add a Weight capacity here as well with a configurable env var to limit the Bytes held in memory
files: Option<RecordBatch>, | ||
} | ||
|
||
impl Snapshot for EagerSnapshot { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that is missing is the DeltaTableConfig, I added this some time ago to the old snapshot because we some times need to be aware in the operation how the table got loaded.
/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
self.snapshot.load_config()
}
self.snapshot.table_root() | ||
} | ||
|
||
fn version(&self) -> Version { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No more version_timestamp as well?
fn next(&mut self) -> Option<Self::Item> { | ||
if self.index < self.paths.len() { | ||
let path = self.paths.value(self.index).to_string(); | ||
let add = AddVisitor::visit_add(self.index, path, self.getters.as_slice()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this always guaranteed to find the next add action?
)) | ||
} | ||
|
||
pub fn stats(&self) -> Option<&str> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does a logicalFileView have stats?
|
||
fn extract_column<'a>( | ||
mut parent: &'a dyn ProvidesColumnByName, | ||
col: &[impl AsRef<str>], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name threw me off, I thought it was multiple columns, but it's a single column_path
res.and_then(|(data, predicate)| { | ||
let batch: RecordBatch = | ||
ArrowEngineData::try_from_engine_data(data)?.into(); | ||
Ok(filter_record_batch(&batch, &BooleanArray::from(predicate))?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why even filter when the predicate was None?
start_version: Option<Version>, | ||
limit: Option<usize>, | ||
) -> DeltaResult<Box<dyn Iterator<Item = (Version, CommitInfo)>>> { | ||
// let start_version = start_version.into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old line?
let end_version = start_version.unwrap_or_else(|| self.version()); | ||
let start_version = limit | ||
.and_then(|limit| { | ||
if limit == 0 { | ||
Some(end_version) | ||
} else { | ||
Some(end_version.saturating_sub(limit as u64 - 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is highly confusing xd, the end version becomes the start versions when passed, and then the start_versions becomes the end version again when there is no limit :S
store: Arc<dyn ObjectStore>, | ||
version: impl Into<Option<Version>>, | ||
) -> DeltaResult<Self> { | ||
// TODO: how to deal with the dedicated IO runtime? Would this already be covered by the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently do that all the way at the beginning in logstore_with
Description
This PR aims to provide new implementations for the current
Snapshot
(now calledLazySnapshot
) andEagerSnapshot
back by thedelta-kernel-rs
library.This PR focusses on the implementation of the new snapshots, but avoids updating all usage and removing the old ones. I plan to provide some stacked PRs that actually use these in operations etc., hoping that this way reviews and feedback can be a bit more streamlined.
To reduce churn in the codebase, after the switch has been made, we introduce a trait
Snapshot
which is implemented by the new snapshots and should also be implemented forDeltaTableState
. We can now establish a more uniform API across theSnapshot
variants since Kernel's execution model allows us to avoidasync
in all APIs.One of the most significant conceptual changes is how eager the
EagerSnapshot
is. The parquet reading in bothdelta-rs
anddelta-kernel-rs
has evolved much since theEagerSnapshot
was first written and handles pushdown of columns and predicates much more effectively. TO mitigate the cost of repeated reads of commit data, we introduce a simple caching layer in form of anObjectStore
implementation that caches commit reads in memory. This is right now a simple brute force approach to allow for migration, but hopefully will be extended in the future to also avoid json parsing and caching parquet metadata reads.Any feedback on the direction this is taking is greatly appreciated.