-
-
Notifications
You must be signed in to change notification settings - Fork 728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Warn if tasks are submitted with identical keys but different run_spec
#8185
Conversation
run_spec
s
run_spec
srun_spec
run_spec
run_spec
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 27 files ± 0 27 suites ±0 10h 1m 29s ⏱️ - 9m 16s For more details on these failures, see this check. Results for commit 07974fd. ± Comparison against base commit 045dc64. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm quite conflicted about this PR. I see a lot of pitfalls, and the only benefit would be to push the failure sooner in the process and make it more explicit.
I wonder if it wouldn't be simpler to just blindly retain the previous run_spec in case of resubmission?
Should we have a high-bandwidth brainstorming session about this?
distributed/scheduler.py
Outdated
elif ( | ||
# run_spec in the submitted graph may be None. This happens | ||
# when an already persisted future is part of the graph | ||
dsk.get(k) is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it even possible that k is not in dsk? Short of a malformed graph
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it turns out, there are plenty of possibilities. Most notably, this is the case when there are persisted futures as part of the graph
I'm growing more convinced about this approach by the minute. Our assumptions that the key is uniquely identifying the entire task (not just the data) is so strongly baked into the system that we cannot afford to slip. I will not be able to make time today for a high bandwidth session but I will also not be working on this today. If you are interested, I recommend checking out some of the remaining test failures. For instance, in P2P it appears that we have a genuine, potentially data-lossy bug in some situations because of this. |
I agree that it's extremely unhealthy to let a task change its contents after it's been created. |
Not too excited about this. We'd basically replace one inconsistency with another. |
Why? Assuming the user doesn't manually submit different tasks with the same hand-crafted key (if they want to shoot themselves in their feet, it's their problem), it would nicely work around the issue of the optimizer submitting different DAGs that produce the same output? |
I do not want to assume that there is only one possible way to achieve this corrupt state and guard against this. When using the futures API, it is not that difficult to produce a state like this. Providing "hand crafted" keys is part of our API and users are using this (e.g. #3965 and #7551; unrelated but also problems because users are providing keys that break our assumptions). This is even the most natural way to scatter data. I also have no idea how future optimizers will behave. I don't believe that blindly keeping the first known run_spec is the right way. I don't like guessing intent. Besides... the code right now actually does this (albeit buggy)! |
cc4f46b
to
2e302a1
Compare
Status update
This PR is blocked by |
a70bbdd
to
cf5c294
Compare
marks=pytest.mark.xfail(reason="https://github.com/dask/dask/issues/9888"), | ||
), | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b6df5d4
to
e9c59d5
Compare
Houston we have a problem. The new check highlights behaviour in p2p shuffle where a piece of graph is eagerly sent up to the scheduler during graph definition time, then again with the same keys but different run_spec. To my understanding this is a separate issue from dask/dask#9888 as it is doesn't go away with You can easily see it in ddf = dd.from_pandas(df, npartitions=4)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
ddf = ddf.set_index("a").sort_values("b")
result = ddf.compute() however, it trips the run_spec check, because
Some keys are the same, but run_spec changes; this triggers a race condition where the scheduler didn't have the time to forget those keys yet. Example exception:
The use case won't fail in main because the key is always close to its released->forgotten transition when this happens (although in other cases it may be in memory; I'm not sure). To me, the hard failure is very healthy behaviour, but at the same time this means we can't merge this PR in until after we fix shuffle to prevent the above. |
Looking at the tokens you are posting, this still looks like dask/dask#9888. The config option you are referring to does not control blockwise fusion but rather low-level task fusing. Both are problematic but interestingly, the low level fusion is a little more robust. This would also mean that this goes away with dask-expr (although I'm sure there are cases we can trigger a similar problem for arrays; one step at a time) |
2099447
to
50bed33
Compare
50bed33
to
7591996
Compare
Closes dask/dask#10905 |
distributed/scheduler.py
Outdated
tok_lhs != tok_rhs or deps_lhs != deps_rhs | ||
) and ts.group not in tgs_with_bad_run_spec: | ||
tgs_with_bad_run_spec.add(ts.group) | ||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we add a debug log regardless of ts.group not in tgs_with_bad_run_spec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
distributed/scheduler.py
Outdated
tok_lhs != tok_rhs or deps_lhs != deps_rhs | ||
) and ts.group not in tgs_with_bad_run_spec: | ||
tgs_with_bad_run_spec.add(ts.group) | ||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it would be useful to propagate this warning to the user who submitted the graph instead of potentially burying it in the scheduler logs. Do you have thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK we have no means to propagate warnings to the client? We'd need to build the infrastructure for it first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't entirely sure whether we had something that does this. Since that's not the case, forget what I said.
distributed/scheduler.py
Outdated
tok_lhs: Any = tokenize(ts.run_spec, ensure_deterministic=True) | ||
tok_rhs: Any = tokenize(dsk[k], ensure_deterministic=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we deal with the case where one of the two is deterministic but not the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
7591996
to
728b89e
Compare
728b89e
to
cad1467
Compare
Co-authored-by: Hendrik Makait <[email protected]>
@hendrikmakait all comments have been addressed |
f13dd0b
to
07974fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @crusaderky! LGTM, assuming CI is happy.
Since version 2024.2.1, I am getting the "Detected different Does this point to an issue in my code or with the dask.array.linalg code? Log
|
Not an error, a warning. As a user, you could work around it by making sure you don't resubmit the same parts of your graph multiple times. a = ... # dask collection
b = f(a)
a = a.persist()
b = b.persist() the error will disappear if you change it to a = ... # dask collection
a = a.persist()
b = f(a)
b = b.persist() |
This mitigates the impact of dask/dask#9888 by raising an exception when we detect that the submitted run_spec differs from the already known run_spec.
I believe to handle such a key collision gracefully we'd need to not only store the key itself but also a hash/token of the run_spec itself and pass this signature around everywhere. Alternatively, there was a proposal to use the TaskState object ID for this but the changes are the same.
A surprising amount of tests are failing with this change and I have to investigate more