Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental (delta) update #928

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open

Conversation

ilongin
Copy link
Contributor

@ilongin ilongin commented Feb 19, 2025

Adding the ability to do incremental, or delta updates with dataset. Idea is to not re-build the whole dataset from source once source has some changes (new or modified files in s3 bucket for example), but to create diff chain between the last version of dataset and source with all modifications added (chain methods like mappers, filters etc. what has been added to create original chain from source), and then merge diff with last version of dataset. This way we will have much better performance.

The way user will run delta updates is to just re-run the whole script where it creates dataset, with one small modification - adding delta=True on DataChain.save() method.

Example:

from datachain import DataChain, Column, C

def my_embedding(file: File) -> list[float]:
    return [0.5, 0.5]

(
      DataChain.from_storage(
          "s3://ldb-public/remote/data-lakes/dogs-and-cats/", type="image", anon=True
      )
      .filter(C.name.glob("*cat*"))
      .filter(C.name.glob("*.jpg"))
      .map(emb=my_embedding)
      .mutate(dist=func.cosine_distance("emb", (0.1, 0.2)))
      .save("cats", delta=True)
)

Comparison between current approach and alternative one:

Comparison Current approach Alternative approach
Description We calculate diff between last version of starting dataset (listing or any other "normal" dataset) and current version of resulting dataset. We then apply chain functions on that diff and merge results with current version of resulting dataset to create new version of it From dataset dependencies of resulting dataset we get starting dataset name and version (listing or any other "normal" dataset) and calculate diff between that and last version of starting dataset. We then apply chain functions on that diff and merge results with current version of resulting dataset to create new version of it
Union / Merge works No (it will result in duplicates) No (it will result in duplicates)
Someone removes starting dataset and all of it's versions (e.g listing) Everything works as expected (listing will be re-created and delta update will work fine) We wouldn't have exact listing version for delta -> we would need to re-create the whole dataset as it was ran the first time (without delta performance gains)
File object is removed in resulting dataset (e.g using one of the chain functions) Delta update doesn't work Delta update should work as expected
Supported queries All except union/merge in it All except union / merge in it
How it will look in UI The same as in local (just adding delta=True flag in .save()) The same as in local (just adding delta=True flag in .save())
Pros If starting dataset (e.g listing) is removed we still can use delta update and have performance gain If file signal is removed from final dataset delta update still works
Cons If file signal is removed from resulting dataset, delta update doesn't work If starting dataset (e.g listing) is removed we still can use delta update and have performance gain

Q&A

Q: Difference between approaches regarding supporting queries
A: They should support the same type of queries

Q: How will different approaches look in code?
A: The same as normal queries, just without union or merge and by adding delta flag to .save()

Q: In the second approach the idea was to use delta on the source side (per each source). This would allow to have a single file source for example. Both approaches don't allow that atm AFAIU. Are there cases like this?
A: In both approaches when we speak about "source" we actually mean on source or starting dataset from which resulting dataset is created. It can be listing dataset if .from_storage() is used or just a regular dataset if .from_dataset() is used for example. In the question, if I understood correctly, suggestion is to get distinct source columns and then do diff for each of those and currently this is not possible because in order to get multiple sources into resulting dataset some kind of union / merge must have happened in the chain process and even if union is supported we don't know the chain of dataset which have been added with union. For example:

(
    DataChain.from_storage("s3://ldb-public/remote/data-lakes/dogs-and-cats/", anon=True)
    .union(DataChain.from_dataset("one_file"))
    .save("cats", delta=True)
)

In this chain we are adding dataset one_file which we don't know how is created as it's chain is not in this script. Maybe it's chain was created in some other script and looks something like:

(
    DataChain.from_storage("gs://datachain-demo")
    .limit(1)
    .save("one_file")
)

but we don't know this and we won't be able to apply limit(1) to it after doing diff for source gs://datachain-demo and would end up with whole gs://datachain-demo bucket instead of one file in resulting dataset. Maybe this could be done in future if we will have ability to go through the tree of datasets and extracting it's chains from other scripts but that is way out of the scope of this task and I'm not sure if it's even possible to do atm.
Regarding the one file, yes that one is tricky. With current implementation it doesn't break but every time dataset is calculated from start (delta doesn't work as there is no dependency in created dataset as there is no listing created since we just extract data from file and create dataset rows) .. example:

    DF_DATA = {
        "first_name": ["Alice", "Bob", "Charlie", "David", "Eva"],
        "age": [25, 30, 35, 40, 45],
        "city": ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"],
    }
    pd.DataFrame(DF_DATA).to_parquet(Path(os.path.join(os.path.abspath(os.getcwd()), "test.parquet")))
    DataChain.from_storage(path.as_uri()).parse_tabular().save("tabular", delta=True)

Q: We need actual examples of code (e.g. union / merge where it breaks)
A: Code doesn't brake on union / merge, instead it just creates duplicates. Example:

(
    DataChain.from_storage("gs://datachain-demo/datacomp-small/metadata")
    .limit(1)
    .save("one_file")
)

(
    DataChain.from_storage("s3://ldb-public/remote/data-lakes/dogs-and-cats/", anon=True)
    .filter(C("file.path").glob("*cat*"))
    .union(DataChain.from_dataset("one_file"))
    .save("cats", delta=True)
)

Here, on a second run of script if there is any diff between old and new version of s3://ldb-public/remote/data-lakes/dogs-and-cats/ we would end up with duplicate file from gs://datachain-demo/datacomp-small/metadata as that one would be added with applying chain (which has union in it) to diff and then merging (union) with current latest version of resulting dataset which already has that file. Currently I don't see the way to fix this and even if there is some smart way to avoid this, I would leave it for follow up.

Q: how about group by and other stuff - will those work?
A: Group by cannot work as well. I'm not sure it can ever work with delta update even in theory as the whole point of delta is to apply chain (which is where group by is) to just a subset of data that has been added / changed in order for group by to be correct it must be applied to whole dataset.
Other stuff should work IMO.

@ilongin ilongin marked this pull request as draft February 19, 2025 15:13
@ilongin ilongin linked an issue Feb 19, 2025 that may be closed by this pull request
Copy link

cloudflare-workers-and-pages bot commented Feb 19, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 626ba5a
Status: ✅  Deploy successful!
Preview URL: https://5fb86766.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-798-incremental-upda.datachain-documentation.pages.dev

View logs

@dmpetrov
Copy link
Member

@ilongin it would be great to extract all logic outside of the fat file dc.py to increment.py or dc_incremental.py

Also, should we call it incremental or delta? :) Delta seems better but I don't like it do to a conflict with Delta Lake. Any ideas? :)

Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 8fa1534
Status: ✅  Deploy successful!
Preview URL: https://c897b9bc.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-798-incremental-upda.datachain-documentation.pages.dev

View logs

@ilongin
Copy link
Contributor Author

ilongin commented Feb 21, 2025

@ilongin it would be great to extract all logic outside of the fat file dc.py to increment.py or dc_incremental.py

Also, should we call it incremental or delta? :) Delta seems better but I don't like it do to a conflict with Delta Lake. Any ideas? :)

@dmpetrov one question just to be 100% sure. How do we deal with different statuses : added, modified, removed, same?

My assumption is to:

  1. Added records are appended to previous dataset (current last version of it)
  2. Modified records are replacing those matched from previous dataset in new dataset
  3. Deleted records > Do nothing about it, but maybe we should remove them in new dataset??
  4. Same -> nothing to do here

Currently DataChain.diff() returns only added and changed records by default...for other statuses explicit flags must be set.

Regarding the name, delta makes more sense if we are not just appending new ones, otherwise it's more like incremental, but I don't have strong opinion here...both sound reasonable to me.

Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 67824e6
Status:⚡️  Build in progress...

View logs

Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 67824e6
Status: ✅  Deploy successful!
Preview URL: https://b84a6f31.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-798-incremental-upda.datachain-documentation.pages.dev

View logs

@dmpetrov
Copy link
Member

Currently DataChain.diff() returns only added and changed records by default...

Let's use the same default for the incremental update.

delta makes more sense

Then let's use Delta 🙂

Copy link

codecov bot commented Feb 24, 2025

Codecov Report

Attention: Patch coverage is 95.12195% with 2 lines in your changes missing coverage. Please review.

Project coverage is 88.09%. Comparing base (40b9c40) to head (b2430c4).

Files with missing lines Patch % Lines
src/datachain/diff/__init__.py 50.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #928      +/-   ##
==========================================
+ Coverage   88.02%   88.09%   +0.06%     
==========================================
  Files         133      134       +1     
  Lines       12075    12103      +28     
  Branches     1671     1674       +3     
==========================================
+ Hits        10629    10662      +33     
+ Misses       1029     1025       -4     
+ Partials      417      416       -1     
Flag Coverage Δ
datachain 88.01% <95.12%> (+0.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ilongin ilongin marked this pull request as ready for review February 25, 2025 15:34

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Overview

This PR introduces incremental (delta) update functionality to optimize dataset updates by computing and merging diffs rather than re‐processing the entire source, and it updates related tests and supporting modules accordingly.

  • Implements a new delta_update function in the core module.
  • Adds functional and unit tests covering delta updates from both datasets and storage.
  • Updates the save and query methods (and related documentation) to support delta processing and refactors file signal retrieval.

Reviewed Changes

File Description
src/datachain/delta.py Adds delta_update for incremental diff computation.
tests/func/test_delta.py Introduces tests to validate delta updates from various sources.
tests/unit/lib/test_signal_schema.py Adds unit tests for file signal retrieval.
src/datachain/lib/dc.py Updates the save method to allow a delta flag.
src/datachain/query/dataset.py Refactors StartingStep to QueryStep; updates type annotation.
src/datachain/lib/signal_schema.py Replaces contains_file() with get_file_signal() for clarity.

Copilot reviewed 6 out of 6 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (2)

src/datachain/delta.py:36

  • Consider reviewing the logic of appending the original chain's steps onto the diff chain to ensure that the order of applied transformations is correct and consistent with user expectations.
diff._query.steps += dc._query.steps

tests/func/test_delta.py:146

  • Duplicate file entries (e.g., 'images/img6.jpg' and 'images/img8.jpg' appear twice) in the expected output may indicate an unintended behavior in the union operation. Please confirm whether duplicates are intentional or if filtering/aggregation is needed.
"images/img6.jpg",
Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍🔥



def test_get_file_signal():
assert SignalSchema({"name": str, "f": File}).get_file_signal() == "f"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test for nested file signal or is it out of the scope of this method?

E.g.:

class CustomModel(DataModel):
    file: File
    foo: str
    bar: float

assert SignalSchema({"name": str, "custom": CustomModel}).get_file_signal() == "custom.file"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now it only works for top level file objects. In future we can add nested as well if needed

) -> "Self":
"""Save to a Dataset. It returns the chain itself.

Parameters:
name : dataset name. Empty name saves to a temporary dataset that will be
removed after process ends. Temp dataset are useful for optimization.
version : version of a dataset. Default - the last version that exist.
delta : If True, we optimize on creation of the new dataset versions
by calculating diff between source and the last version and applying
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you describe a bit - how do we do this exactly? do we calculate diff for each "source" (e.g. bucket)?

Copy link
Contributor Author

@ilongin ilongin Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added better explanation. Regarding question about sources - there is only one source that can be cloud storage or maybe even dataset (but needs to have File object in it). So by source we mean the "starting" point for dataset creation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there are two sources (merge of two datasets)?

File `source` and `path` for matching, and File `version` and `etag`
for checking if the record is changed.
Note that this takes in account only added and changed records in
source while deleted recordsare not removed in the new dataset version.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
source while deleted recordsare not removed in the new dataset version.
source while deleted records are not removed in the new dataset version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by calculating diff between source and the last version of dataset

I'm not sure I understand this. I assume source == bucket here (or dataset) and dataset - is it an actual named dataset from the save("name", delta=True) operation? How can we compare source with some datasets, I'm not sure ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diff is calculated using `DataChain.diff()` method which looks into
                File `source` and `path` for matching, and File `version` and `etag`
                for checking if the record is changed.

I see.

This won't work in so many cases though, no? File got renamed, dropped. There are > 1 sources, etc, etc.

Why can't we compare source with itself to get diff, apply chan to that diff (or multiple diffs if there are multiple sources) and the end merge result with a previous version of the saved dataset. I think that semantics would be cleaner. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started with premise that source = some other dataset with File objects, to make it more generic as every DataChain construct starts with some dataset (whether it's listing or other "normal" one).
You are right about this not working if File is removed and your suggestion does sound better (I will try to re-implement that way).

The only thing I'm not sure is about what you said "multiple diffs if there are multiple sources". From this I assumed you mean: 1. get distinct sources from dataset (e.g distinct bucket names) 2. get their listings 3. for all of them calculate diffs with self 3. apply chain to each and merge. Disadvantage of this approach is that then we cannot do delta update with other dataset as starting point and it's more complex to implement as that "get their listings" part is not so easy to find since we have partial listings (in source we have just bucket name but listing from which it's created could be something more specific depending of what is defined in DataChain.from_storage() method ). Also if someone does filtering using path and maybe glob in the .from_storage() itself, e.g `DataChain.from_storage('s3://ldb-public/remote/data-lakes/dogs-and-cats/cat*') this will be tricky to implement. Also, applying chain to every of this little diffs (for each source) is not possible as some source can come from union / merge with other chain and those methods (e.g filters, mappers) that were in the chain before union/merge cannot be applied to it as the code will break.
Because of this I suggest to keep this "dataset" as a original source notation and with your suggestion from above it would be something like:

  1. Get original starting dataset from which chain is being created (listing dataset, other dataset etc.)
  2. Calculate diff between latest version and version before of that starting dataset
  3. Apply chain to that diff and union with current version

To refer to your question below as well, diff doesn't work with union / merge in a way that will catch changes from that other chain which was "added" to the original one, but the code will not break i.e user can still use union / merge - it will recalculate whole other chain that was added with union / merge so there will be no performance gains.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I realized that "compare source with itself to get a diff" is also little bit tricky. What if someone created dataset from some listing that was then in latest version 5 and then there were multiple versions (re-listings) added and now it's in version 10. We should compare version 10 to version 5 but we don't save that info that it was created from version 5 anywhere ... in general it's questionable what comparing with self even means.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We don't save with which version of source listing we created dataset so we don't know which version of source to diff. This is explained in more details in my last post here as well.

I think we can / and probably should be saving this (regardless of this epic). Are there any issues with that?

  1. If someone removes listings we will not be able to do diff with previous versions and we won't be able to do incremental update.

I guess it will be the same for any implementation to a certain degree. If someone removes the previous dataset we also won't be able to do an incremental update, right?

are there any other fundamental problems with this? what is the difference this approach vs alternatives in regards to different types of queries they would support? How will different approaches look in code? Will they affect how this can be done in UI?

==================

Can we please create a table with different approaches, brief description, brief examples, pros/cons and more examples how they would looks like and work (or not) for different types of queries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can / and probably should be saving this (regardless of this epic). Are there any issues with that?

You are right actually, I totally forgot about dataset dependencies which can be used for this.

I guess it will be the same for any implementation to a certain degree. If someone removes the previous dataset we also won't be able to do an incremental update, right?

When all listing is removed then in current implementation delta still works fine while in alternative approach whole dataset needs to be re-created as if there was no delta (it will still work but without delta performance enchacement)

are there any other fundamental problems with this? what is the difference this approach vs alternatives in regards to different types of queries they would support? How will different approaches look in code? Will they affect how this can be done in UI?

==================

Can we please create a table with different approaches, brief description, brief examples, pros/cons and more examples how they would looks like and work (or not) for different types of queries.

I will write table in description. It's hard to say if there are some fundamental problems with alternative approach until I implement it but with current approach the only issue is that removal of file objects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new function called delta_update_alternative() which implements alternative approach (what you suggested) so we can compare them and also added table of comparison in description between approaches.
TL;DR; Alternative approach does look slightly better now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the table!

  • we need actual examples of code (e.g. union / merge where it breaks)
  • how about group by and other stuff - will those work?
  • in the second approach the idea was to use delta on the source side (per each source). This would allow to have a single file source for example. Both approaches don't allow that atm AFAIU. Are there cases like this?

Copy link
Contributor Author

@ilongin ilongin Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some answers to those in PR description. I will add some more code examples if needed.

I found one major flaw of alternative approach though (or any approach that allows File objects to disappear in final dataset) - it is impossible to filter out old records that are modified in new version of source dataset.
My original approach was:

diff = source_dc_latest.diff(source_dc)  # diff source by itself to get `added` and `modified` rows
res = DataChain.from_dataset(name, latest_version).diff(diff, added=True, modified=False)  # diff current dataset with diff to filter out those old rows that are modified in new version
res = res.union(diff)  # final result

Step 2 is problematic since we cannot diff current dataset with anything if we assume File objects might not be there.
I will try to think of about some other approach but ATM I don't see any viable solution.
If we allow final dataset to have old and new modified rows then this is not an issue, but this issue requirements were to include only newest versions of a files in final result.

@ilongin ilongin requested review from skshetry and shcheklein March 13, 2025 14:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incremental update
5 participants