Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Expr instead of HLG #9008

Merged
merged 2 commits into from
Mar 24, 2025
Merged

Use Expr instead of HLG #9008

merged 2 commits into from
Mar 24, 2025

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Feb 13, 2025

Sibling to dask/dask#11736

Copy link
Contributor

github-actions bot commented Feb 13, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    27 files  ± 0      27 suites  ±0   11h 30m 17s ⏱️ +38s
 4 106 tests + 3   3 990 ✅ ± 0    112 💤 + 1  3 ❌ +1  1 🔥 +1 
51 479 runs  +38  49 166 ✅ +23  2 309 💤 +13  3 ❌ +1  1 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit 462aedb. ± Comparison against base commit 4ef21ad.

This pull request removes 4 and adds 7 tests. Note that renamed tests count towards both.
distributed.tests.test_client ‑ test_auto_normalize_collection
distributed.tests.test_client ‑ test_auto_normalize_collection_sync
distributed.tests.test_client ‑ test_worker_clients_do_not_claim_ownership_of_serialize_futures[False]
distributed.tests.test_client ‑ test_worker_clients_do_not_claim_ownership_of_serialize_futures[True]
distributed.tests.test_client ‑ test_compute_no_collection_or_future
distributed.tests.test_client ‑ test_submit_persisted_collection_as_argument[False]
distributed.tests.test_client ‑ test_submit_persisted_collection_as_argument[True]
distributed.tests.test_client ‑ test_worker_clients_do_not_claim_ownership_of_serialize_futures[False-False]
distributed.tests.test_client ‑ test_worker_clients_do_not_claim_ownership_of_serialize_futures[False-True]
distributed.tests.test_client ‑ test_worker_clients_do_not_claim_ownership_of_serialize_futures[True-True]
distributed.tests.test_dask_collections ‑ test_persist

♻️ This comment has been updated with latest results.

x = da.ones(10, chunks=5)
assert len(x.dask) == 2

with dask.config.set(optimizations=[c._optimize_insert_futures]):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer possible

@fjetter
Copy link
Member Author

fjetter commented Mar 19, 2025

docs build failure is due to wrong version of dask being installed

@fjetter fjetter marked this pull request as ready for review March 20, 2025 13:33
@fjetter
Copy link
Member Author

fjetter commented Mar 20, 2025

There are two test failures that feel related but I don't anticipate any major changes. I think this is good for a review

@fjetter
Copy link
Member Author

fjetter commented Mar 20, 2025

test_worker_clients_do_not_claim_ownership_of_serialize_futures still not fixed. it's certainly a race condition but the events apparently didn't help.... 💢

Comment on lines +4887 to +4908
# *************************************
# BELOW THIS LINE HAS TO BE SYNCHRONOUS
#
# Everything that compares the submitted graph to the current state
# has to happen in the same event loop.
# *************************************

lost_keys = self._find_lost_dependencies(dsk, dependencies, keys)

if lost_keys:
self.report(
{
"op": "cancelled-keys",
"keys": lost_keys,
"reason": "lost dependencies",
},
client=client,
)
self.client_releases_keys(
keys=lost_keys, client=client, stimulus_id=stimulus_id
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nasty one and I think I'll want this in another PR. the lost keys check is currently performed between materialization and ordering (for no reason, just chance) which are both offloaded to give the event loop a chance to breath.
However, that also means that tasks can be cancelled during the ordering step that would otherwise cause the computation to be flagged as lost. Instead of receiving a CancelledError the user will see obscure state transition issues.

@fjetter
Copy link
Member Author

fjetter commented Mar 21, 2025

From what I can tell, the remaining test failures are unrelated

@@ -64,7 +64,7 @@ repos:
- tornado
- pyarrow
- urllib3
- git+https://github.com/dask/dask
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • revert before merge

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, thanks, @fjetter

Comment on lines 1086 to 1093
df.shuffle(
"A",
# If we don't have enough partitions, we'll fall back to a
# simple shuffle
max_branch=npart - 1,
)
# Block optimizer from killing the shuffle
.map_partitions(lambda x: len(x)).sum()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
df.shuffle(
"A",
# If we don't have enough partitions, we'll fall back to a
# simple shuffle
max_branch=npart - 1,
)
# Block optimizer from killing the shuffle
.map_partitions(lambda x: len(x)).sum()
df.shuffle(
"A",
# If we don't have enough partitions, we'll fall back to a
# simple shuffle
max_branch=npart - 1,
# Block optimizer from killing the shuffle
force=True,
)
.sum()

Comment on lines 2827 to 2834
df.shuffle(
"A",
# If we don't have enough partitions, we'll fall back to a
# simple shuffle
max_branch=npart - 1,
)
# Block optimizer from killing the shuffle
.map_partitions(lambda x: len(x)).sum()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
df.shuffle(
"A",
# If we don't have enough partitions, we'll fall back to a
# simple shuffle
max_branch=npart - 1,
)
# Block optimizer from killing the shuffle
.map_partitions(lambda x: len(x)).sum()
df.shuffle(
"A",
# If we don't have enough partitions, we'll fall back to a
# simple shuffle
max_branch=npart - 1,
# Block optimizer from killing the shuffle
force=True,
)
.sum()

@fjetter fjetter merged commit c5ca1ff into dask:main Mar 24, 2025
1 of 30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants