diff --git a/distributed/protocol/arrow.py b/distributed/protocol/arrow.py index f16780f0535..b2acc29cf29 100644 --- a/distributed/protocol/arrow.py +++ b/distributed/protocol/arrow.py @@ -47,3 +47,14 @@ def deserialize_table(header, frames): blob = frames[0] reader = pyarrow.RecordBatchStreamReader(pyarrow.BufferReader(blob)) return reader.read_all() + + +@dask_serialize.register(pyarrow.fs.FileInfo) +def serialize_fileinfo(fileinfo): + return {}, [(fileinfo.path, fileinfo.size, fileinfo.mtime_ns)] + + +@dask_deserialize.register(pyarrow.fs.FileInfo) +def serialize_filesystem(header, frames): + path, size, mtime_ns = frames[0] + return pyarrow.fs.FileInfo(path=path, size=size, mtime_ns=mtime_ns) diff --git a/distributed/protocol/tests/test_arrow.py b/distributed/protocol/tests/test_arrow.py index d3a76cdb478..bf4265a51ae 100644 --- a/distributed/protocol/tests/test_arrow.py +++ b/distributed/protocol/tests/test_arrow.py @@ -46,3 +46,15 @@ def test_dumps_compression(): msg = {"op": "update", "data": to_serialize(t)} result = distributed.protocol.loads(distributed.protocol.dumps(msg)) assert result["data"].equals(t) + + +def test_dump_fileinfo(): + from pyarrow.fs import FileInfo + + finfo = FileInfo(path="path", size=1, mtime_ns=2) + dtfinfo = deserialize(*serialize(finfo)) + # Does not implement __equal__ + assert type(dtfinfo) == FileInfo + assert dtfinfo.path == finfo.path + assert dtfinfo.size == finfo.size + assert dtfinfo.mtime_ns == finfo.mtime_ns