-
-
Notifications
You must be signed in to change notification settings - Fork 728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serialize High Level Layers with Pickle #5581
Comments
I believe @madsbk and @jakirkham did a fair amount of work around serialization generally and specifically for HLGs. If we relaxed the requirement for say As an aside, have you heard any folks express any security concerns ? I have not |
Are these causing havoc? Do you have some examples you can share? At least the way this was implemented was Dask just creates That said, given we are thinking about how to make things easier, I wonder if it makes sense to have some very simple barebones framework for serialization in Dask that Distributed then extends. One advantage of this is we wouldn't need to |
I agree, this would remove a lot of complexity. Must importantly, it would remove all the code surrounding the partial-deserialization of the HLG.
|
I think pickle is a red herring. We could allow the scheduler to un-pickle things and it wouldn't solve our problem—it would just allow us to ignore the problem by being sloppier. Generating tasks usually requires inlining user data or arbitrary functions into the task-tuples. This is trivial on the client; you insert the thing into the tuple, then eventually pickle the whole tuple and send it to the scheduler. It remains an opaque binary blob on the scheduler, and eventually gets un-pickled wholesale on the worker. But if you're generating the tasks on the scheduler, you need the client to send you those user-defined things to get inserted into the graph. With this proposal, we'd instead just un-pickle the user-defined things on the scheduler, stick them into the tasks, and then re-serialize the entire task. It would be just like on the client. It certainly would make the logic easier! But that's just sloppy! We'd be de-serializing things just in order to immediately re-serialize them! We already do sloppy things like this today when transferring spilled-to-disk data (see end of #4424 (comment)) and should move away from it. So I don't think the problem has much to do with pickle. The problem has to do with:
I think the currently-painful code could be resolved by
I personally don't care much about the security, versioning, or rewriting arguments. I just care about stability and performance. I could certainly be convinced that deserializing things on the scheduler only to immediately re-serialize them is very low overhead, and worth it for the simplification of the developer experience. I just think it's possible to have both a simple developer experience and no unnecessary deserialization if we take the time to define the interfaces and protocols well. But I'm fine if we decide that's not worth the time. |
The problem is actually less about nested serialization, and more just about the representation of a task, and the fact that tasks have special serialization logic that's disjoint from normal comms serialization. Really the core pain point is that HLG code should emit two different representations of its tasks, depending on whether it's running locally or on the scheduler.
To make things worse, the And if you want to use the normal-tuple representation instead (which is also supported on the scheduler; it just gets wrapped in |
Jim, Mads, James and I met this morning. We listed a few requirements: Requirements
Maybe-requirements
Open questions
These requirements could be cleaned up. A lot of them are of the form "we shouldn't be doing this thing that we were doing" instead of saying "the solution should have such and such a property". I think that the next step here is to see some design proposals on what we could do. Right now, most conversation is of the form "we're currently doing X, which is bad because of Y and Z". I think that these conversations are helpful in highlighting problems. We've also had them for a while now and it's probably time to move on towards constructive plans. I'll provide a base plan, which I hope others can surplant with better plans. Base planWe continue using non-strict messages. We use msgpack to traverse them. We label known user data with This is exactly the same as the current system, except that now we allow for arbitrary objects with pickle. It's fast in the common case, easy to implement (it's mostly what we have today), and allows us to avoid all of the crap that evolved out of HighLevelLayers (which seemed to cause some cascade of serialized data within serialize data which seems unpleasant). On the open questions it does not provide scheduler security, and chooses loose vs strict specification. I think that it covers all of the requirements listed above. I think that it allows us to clean up a bunch of crap that evolved out of HLGs. It doesn't achieve the goals of better specification, but will I think result in a net improvement on simplicity. I'm not pushing for this approach, but I think that it suits our needs. ExampleSo if there was a message, like the following: msg = {
"op": "foo",
"data": [to_serialize(my_numpy_array)],
"object": my_object,
} msgpack would go through this, pack up the entire thing as it does today, but special handle both Layers (maybe counter-example?)So let's say that the user creates a layer layer = MyLayer("metadata", 123, my_numpy_array)
msg = {
"op": "update-single-layer",
"layers": [layer],
} We're not trying to call this thing user data, so the scheduler just gets a complete copy of it unpickled locally. This includes the numpy array, which was handled with pickle rather than Dask serialization (we don't try to traverse and blend between pickle and dask serialization). This is a little bit dangerous because, as Gabe points out, |
I want to reiterate that I'm not pushing for the plan above. I would like to see designs that are better than that plan. If no designs materialize in, say, a month, then I would recommend that we empower people to go ahead and implement the plan above. (it should be, I think, mostly a net improvement on what we have today). |
It seems this issue is no longer just about HLG layer serialization getting to use pickle, or even about HLG serialization specifically, but about the serialization system within distributed in general. Should we update the title and description accordingly? Or maybe make #5581 (comment) into a new issue? |
I don't have enough knowledge about the inner workings of the current Dask version to talk about details, but in general, when we were working on the Dask scheduler written in Rust, there were two broad problems with the communication protocol and serialization that we have faced:
|
In a call @jcrist also mentioned an alternate solution which was "just use pickle everywhere", both for system messages (currently using msgpack) as well as user data (currently using Dask's serialization). Short term, one way to work towards this would be to switch out Dask serialization to use Pickle everywhere with some custom registered pickle handlers. |
From a call today it seemed like a path forward might be the following phases:
|
Additionally, @jcrist (I think) mentioned that we might want to opt-in to pickle, similar to how we opt-in to dask serialization with the |
I'll also say that there was increased comfort with the proposal above on the call, at least along the lines of "well it seems easy and it's clearly better than what we have now". There was still significant interest in having something more rigid/predictable, but that seemed like it would likely take several months to design and build. We're still open to proposals here, but right now the msgpack+(dask-serialization, pickle) approach is looking more likely. |
Would it make sense to move some of the serialization logic to Dask itself? This might make it easier to handle things with msgpack + pickle there. If we need more customization, we could allow for the serialization functionality in Dask to be stubbed out and leverage that in Distributed |
Basic question: Is the current plan to allow |
That's my understanding from @mrocklin's comments above, but perhaps he can clarify |
Sorry - I should have followed up on this. Matt confirmed offline that the plan is indeed to consider allowing |
2024-02-28 09:25:22,819 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
i am facing above error |
Same issue. Did you get it resolved? |
No, i didn't get the solution yet |
Seeing this, too:
dask/distributed versions are 2024.2.0 |
My problem was solved by downgrading dask/distributed to version 2023.3.2, the version before the big change in the way how HighLevelGraphs are serialized. All versions after and including 2023.4.0 fail when facing HighLevelGraphs which serialize to objects larger than 2GB (more or less). The failure is reflected by an error message like this:
The problem seems to be that msgpack is attempting to serialize a huge object in |
Sorry, I still see the error, even with dask/distributed 2024.2.1
The relevant code is in msgpack is here and it fails because |
I'm getting a similar pickling issue for a HighLevelGraph with 20 layers and 21890 keys (created from an xarray apply_ufunc) - dask/distributed version 2024.5.2
|
@yoninachmany I assume you should receive this error message with pretty much every version of dask. At least in your example, you are using About what I'm seeing, it looks like something in your code is using weakrefs which cannot be serialized. I suspect this is some custom code on your end. You can probably figure out which layer is the culprit by doign something like from distributed.protocol import dumps
for name, layer in out_ds.dask.layers.items():
try:
dumps(layer)
except:
print(f"{name} is the culprit") |
The pickle vs cloudpickle thing here is not that important. I suggest to inspect whatever function you're putting into your ufunc and make sure that the objects don't use weakref. Maybe this is a pint thing, I haven't worked much with this. @efocht sorry for the late reply
I might have a fix for this in #8684 but I still need to run perf tests. I'm going to close this issue now since this was actually completed more than a year ago in #7564 I understand that there may be follow up issues with this (as we saw above) but I would ask anybody who is encountering issues to open a new one (and link back to this if you like) |
Right now serialization of high level graphs is a bit of a mess. It was evolved organically and without much architectural thought. This seems to be a source of technical debt.
A lot of the challenge to serializing high level graphs is in order to keep the scheduler protected in a few ways ...
I like these constraints, but perhaps they're causing more havoc than they're worth. It might be time to reconsider these constraints and instead allow the client and scheduler to communicate by pickle. I think that this would allow us to remove a lot of currently painful code, and accelerate high level graph work in the future.
cc @quasiben and team
The text was updated successfully, but these errors were encountered: