Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ZstdZarrCompressor #149

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
URIs = "5c2747f8-b7ea-4ff2-ba2e-563bfd36b1d4"
ZipArchives = "49080126-0e18-4c2a-b176-c102e4b3760c"

[weakdeps]
CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2"

[compat]
AWSS3 = "0.10"
Blosc = "0.5, 0.6, 0.7"
CodecZlib = "0.6, 0.7"
CodecZstd = "0.8.3"
DataStructures = "0.17, 0.18"
DiskArrays = "0.4.2"
HTTP = "^1.3"
Expand All @@ -34,6 +38,9 @@ URIs = "1"
ZipArchives = "2"
julia = "1.2"

[extensions]
CodecZstdExt = "CodecZstd"

[extras]
Conda = "8f4d0f93-b110-5947-807f-2305c1781a2d"
PyCall = "438e738f-606a-5dbb-bf0a-cddfbfd45ab0"
Expand Down
16 changes: 16 additions & 0 deletions docs/src/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,19 @@ Pages = ["ZGroup.jl"]
Modules = [Zarr]
Pages = ["Compressors.jl"]
```

Additional compressors can be loaded via Julia's package extension mechanism.

For example, the "zstd" compressor ID can be enabled by loading CodecZstd.jl.
This uses Zstandard directly rather than using Blosc.

```julia-repl
julia> using Zarr, CodecZstd

julia> zarray = zzeros(UInt16, 1024, 512, compressor="zstd", path="zarr_zstd_demo");

julia> zarray2 = zopen("zarr_zstd_demo");

julia> zarray == zarray2
true
```
54 changes: 54 additions & 0 deletions ext/CodecZstdExt.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
module CodecZstdExt

using Zarr: Zarr
using JSON: JSON
using CodecZstd: CodecZstd

"""
ZstdZarrCompressor(clevel::Int=0)
ZstdZarrCompressor(c::CodecZstd.ZstdCompressor, [d::CodecZstd.ZstdDecompressor])

Zstandard compression for Zarr.jl. This is a `Zarr.Compressor` wrapper around
`CodecZstd`. Construct with either the compression level, `clevel`, or by
providing an instance of a `ZstdCompressor`. `ZstdFrameCompressor` is
recommended.
"""
struct ZstdZarrCompressor <: Zarr.Compressor
compressor::CodecZstd.ZstdCompressor
decompressor::CodecZstd.ZstdDecompressor
end
Comment on lines +16 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't support multithreaded use IIUC. I think this should be like

Zarr.jl/src/Compressors.jl

Lines 129 to 131 in f436713

struct ZlibCompressor <: Compressor
clevel::Int
end
where the struct only contains the parameters of the codec.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially wrote it like this, but then I was thinking about all the other potential parameters, even if they do not need to be serialized. I think what we should implement is the ability to copy a compessor.

Frankly, I'm somewhat confused about why one actually needs to serialize the compression level into the array metadata. You do not need that information to decompress the data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I don't understand this correctly, but what would happen in a scenario where a user opens an existing array and wants to add some new data? Of course one can set a different compression level for the new chunks, but for consistency of the dataset I think it is good to write all compression parameters to the metadata struct

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this implementation respects all this and other compressors in Zarr.jl currently don't work multithreaded as well so ok from my side

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the thread safety issues, this also leaks memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other potential parameters what about something like:
https://github.com/nhz2/ChunkCodecs.jl/blob/799b154bd400633f0ae3bd1cf78d0cc95957f2cf/ChunkCodecLibZstd/src/encode.jl#L21-L25

struct ZstdEncodeOptions <: EncodeOptions
    compressionLevel::Cint
    checksum::Bool
    advanced_parameters::Vector{Pair{Cint, Cint}}
end

Where the advanced parameters are set with ZSTD_CCtx_setParameter after the compression level and checksum options are set.

# Use default ZstdDecompressor if only compressor is provided
function ZstdZarrCompressor(compressor::CodecZstd.ZstdCompressor)
return ZstdZarrCompressor(
compressor,
CodecZstd.ZstdDecompressor()
)
end
function ZstdZarrCompressor(clevel::Int)
return ZstdZarrCompressor(
CodecZstd.ZstdFrameCompressor(; level = clevel)
)
end
ZstdZarrCompressor(;clevel::Int=3) = ZstdZarrCompressor(clevel)

function Zarr.getCompressor(::Type{ZstdZarrCompressor}, d::Dict)
return ZstdZarrCompressor(d["level"])
end

function Zarr.zuncompress(a, z::ZstdZarrCompressor, T)
result = transcode(z.decompressor, a)
return Zarr._reinterpret(Base.nonmissingtype(T), result)
end

function Zarr.zcompress(a, z::ZstdZarrCompressor)
a_uint8 = Zarr._reinterpret(UInt8,a)[:]
transcode(z.compressor, a_uint8)
end

JSON.lower(z::ZstdZarrCompressor) = Dict("id"=>"zstd", "level" => z.compressor.level)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This level property is not documented or tested in CodecZstd so it might be removed in a future non-breaking release. This is easy to fix by either documenting this property in CodecZstd, or keeping track of the compression level in the ZstdZarrCompressor struct.


function __init__()
Zarr.compressortypes["zstd"] = ZstdZarrCompressor
end

end # module CodecZstdExt
52 changes: 50 additions & 2 deletions src/Compressors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,46 @@ _reinterpret(::Type{T}, x::AbstractArray{S, 0}) where {T, S} = reinterpret(T, re
_reinterpret(::Type{T}, x::AbstractArray) where T = reinterpret(T, x)

abstract type Compressor end
getCompressor(compdict::Dict) = getCompressor(compressortypes[compdict["id"]],compdict)
function getCompressor(compdict::Dict)
if haskey(compressortypes, compdict["id"])
getCompressor(compressortypes[compdict["id"]],compdict)
else
throw(UnknownCompressorException(compdict["id"]))
end
end
getCompressor(::Nothing) = NoCompressor()

"""
UnknownCompressorException(compid::String)

Exception that occurs when an unknown compressor id string is encountered. If
a package that will enable the compressor is known, then we will recommend that
the user load that package.
"""
struct UnknownCompressorException <: Exception
compid::String
end
function Base.show(io::IO, e::UnknownCompressorException)
println(io, "Zarr compressor $(e.compid) is not loaded.")
if haskey(compressorpkgs, e.compid)
pkg = compressorpkgs[e.compid]
println(io, """
Loading the Julia package $(pkg).jl will trigger the compressor
extension package to load:
```
using Pkg
Pkg.add("$pkg")
using $pkg
```
""")
else
println(io, """
A compressor for $(e.compid) has not been implemented. Please file an
issue at https://github.com/JuliaIO/Zarr.jl/issues .
""")
end
end

#Compression when no filter is given
zcompress!(compressed,data,c,::Nothing) = zcompress!(compressed,data,c)
zuncompress!(data,compressed,c,::Nothing) = zuncompress!(data,compressed,c)
Expand Down Expand Up @@ -116,9 +153,20 @@ end

JSON.lower(::NoCompressor) = nothing

compressortypes = Dict("blosc"=>BloscCompressor, nothing=>NoCompressor)
const compressortypes = Dict("blosc"=>BloscCompressor, nothing=>NoCompressor)

"""
Zarr.compressorpkgs::Dict{String,Symbol}

Dictionary mapping compressor names to package names containing the compressor
implementations. Loading the packages in the values will trigger package
extensions to load.
"""
const compressorpkgs = Dict(
"blosc" => :Blosc,
"zlib" => :CodecZlib,
"zstd" => :CodecZstd
)

"""
ZlibCompressor(clevel=-1)
Expand Down
8 changes: 8 additions & 0 deletions src/ZArray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,14 @@ function zcreate(::Type{T},storage::AbstractStore,
attrs=Dict(),
writeable=true,
) where T

if compressor isa AbstractString
if haskey(compressortypes, String(compressor))
compressor = compressortypes[compressor]()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would make it impossible to set custom compression levels for the compression algorithm. Do we need another keyword argument for zcreate that gets passed to the compressor constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is that the simple option of just passing a string will give you default compression options. If you want to specify the compression level, you can use the compression constructor and pass the instatiated compressor instance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i get and error when trying to pass a compression constructor:

julia> zzeros(Int16, 16, 16, compressor=CodecZstd.ZstdCompressor(; level=6), path=tempname())
ERROR: Cannot serialize type Ptr{CodecZstd.LibZstd.ZSTD_CCtx_s}

Copy link
Member Author

@mkitti mkitti Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work. Passing a TranscodingStream instance specifically is not going to work at the moment.

The constructor you need is the ZstdZarrCompressor type I created in this PR.

@nhz2 is also cooking up something here:
https://github.com/nhz2/ChunkCodecs.jl

Maybe we should invite him over to this side of the river to work on this?

else
throw(UnknownCompressorException(compressor))
end
end

length(dims) == length(chunks) || throw(DimensionMismatch("Dims must have the same length as chunks"))
N = length(dims)
Expand Down
1 change: 1 addition & 0 deletions test/Project.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[deps]
CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2"
Conda = "8f4d0f93-b110-5947-807f-2305c1781a2d"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Expand Down
34 changes: 34 additions & 0 deletions test/ext.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using Zarr
using Test

@testset "Zarr Extension Packages" begin
@test_throws Zarr.UnknownCompressorException("zstd") zzeros(UInt8, 512, compressor="zstd")
@test_throws Zarr.UnknownCompressorException("asdf") zzeros(UInt8, 512, compressor="asdf")
d = Dict("id" => "zstd")
@test_throws Zarr.UnknownCompressorException("zstd") Zarr.getCompressor(d)

iob = IOBuffer()
show(iob, Zarr.UnknownCompressorException("zstd"))
@test occursin("CodecZstd.jl", String(take!(iob)))

iob = IOBuffer()
show(iob, Zarr.UnknownCompressorException("asdf"))
@test occursin("issue", String(take!(iob)))
@test Zarr.getCompressor(nothing) == Zarr.NoCompressor()
end

using CodecZstd
@testset "Zarr CodecZstd Extension" begin
CodecZstdExt = Base.get_extension(Zarr, :CodecZstdExt)
@test haskey(Zarr.compressortypes, "zstd")
@test Zarr.compressortypes["zstd"] == CodecZstdExt.ZstdZarrCompressor
td = tempname()
zarray = zzeros(UInt16, 16, 16, compressor="zstd", path=td)
zarray .= reshape(1:256,16,16)
@test isa(zarray, ZArray{UInt16})
@test zarray.metadata.compressor isa CodecZstdExt.ZstdZarrCompressor
zarray2 = zopen(td)
@test isa(zarray2, ZArray{UInt16})
@test zarray2.metadata.compressor isa CodecZstdExt.ZstdZarrCompressor
@test zarray2 == reshape(1:256,16,16)
end
4 changes: 1 addition & 3 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,7 @@ end
end

include("storage.jl")



include("python.jl")
include("ext.jl")

end # @testset "Zarr"
Loading