Skip to content

Commit eade938

Browse files
authoredJan 9, 2024
apacheGH-39456: [Go][Parquet] Arrow DATE64 Type Coerced to Parquet DATE Logical Type (apache#39460)
### Rationale for this change Closes: apache#39456 ### What changes are included in this PR? Update physical and logical type mapping from Arrow to Parquet for DATE64 type ### Are these changes tested? Yes, - Update expected schema mapping in existing test - Tests asserting new behavior - Arrow DATE64 will roundtrip -> Parquet -> Arrow as DATE32 - Arrow DATE64 _not aligned_ to exact date boundary will truncate to milliseconds at boundary of greatest full day on Parquet roundtrip ### Are there any user-facing changes? Yes, users of `pqarrow.FileWriter` will produce Parquet files containing `DATE` logical type instead of `TIMESTAMP[ms]` when writing Arrow data containing DATE64 field(s). The proposed implementation truncates `int64` values to be divisible by 86400000 rather than validating that this is already the case, as some implementations do. I am happy to add this validation if it would be preferred, but the truncating behavior will likely break fewer existing users. I'm not sure whether this is technically considered a breaking change to a public API and if/how it should be communicated. Any direction regarding this would be appreciated. * Closes: apache#39456 Authored-by: Joel Lubinitsky <joel@cherre.com> Signed-off-by: Matt Topol <zotthewizard@gmail.com>
1 parent 92520c6 commit eade938

File tree

3 files changed

+87
-3
lines changed

3 files changed

+87
-3
lines changed
 

‎go/parquet/pqarrow/encode_arrow_test.go

+84
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,52 @@ func makeDateTimeTypesTable(mem memory.Allocator, expected bool, addFieldMeta bo
125125
return array.NewTable(arrsc, cols, int64(len(isValid)))
126126
}
127127

128+
func makeDateTypeTable(mem memory.Allocator, expected bool, partialDays bool) arrow.Table {
129+
const (
130+
millisPerHour int64 = 1000 * 60 * 60
131+
millisPerDay int64 = millisPerHour * 24
132+
)
133+
isValid := []bool{true, true, true, false, true, true}
134+
135+
var field arrow.Field
136+
if expected {
137+
field = arrow.Field{Name: "date", Type: arrow.FixedWidthTypes.Date32, Nullable: true}
138+
} else {
139+
field = arrow.Field{Name: "date", Type: arrow.FixedWidthTypes.Date64, Nullable: true}
140+
}
141+
142+
field.Metadata = arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"1"})
143+
144+
arrsc := arrow.NewSchema([]arrow.Field{field}, nil)
145+
146+
d32Values := []arrow.Date32{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}
147+
148+
d64Values := make([]arrow.Date64, len(d32Values))
149+
for i := range d64Values {
150+
// Calculate number of milliseconds at date boundary
151+
d64Values[i] = arrow.Date64(int64(d32Values[i]) * millisPerDay)
152+
if partialDays {
153+
// Offset 1 or more hours past the date boundary
154+
hoursIntoDay := int64(i) * millisPerHour
155+
d64Values[i] += arrow.Date64(hoursIntoDay)
156+
}
157+
}
158+
159+
bldr := array.NewRecordBuilder(mem, arrsc)
160+
defer bldr.Release()
161+
162+
if expected {
163+
bldr.Field(0).(*array.Date32Builder).AppendValues(d32Values, isValid)
164+
} else {
165+
bldr.Field(0).(*array.Date64Builder).AppendValues(d64Values, isValid)
166+
}
167+
168+
rec := bldr.NewRecord()
169+
defer rec.Release()
170+
171+
return array.NewTableFromRecords(arrsc, []arrow.Record{rec})
172+
}
173+
128174
func TestWriteArrowCols(t *testing.T) {
129175
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
130176
defer mem.AssertSize(t, 0)
@@ -831,6 +877,44 @@ func (ps *ParquetIOTestSuite) TestDateTimeTypesWithInt96ReadWriteTable() {
831877
}
832878
}
833879

880+
func (ps *ParquetIOTestSuite) TestDate64ReadWriteTable() {
881+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
882+
defer mem.AssertSize(ps.T(), 0)
883+
884+
date64InputTable := makeDateTypeTable(mem, false, false)
885+
defer date64InputTable.Release()
886+
buf := writeTableToBuffer(ps.T(), mem, date64InputTable, date64InputTable.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
887+
defer buf.Release()
888+
889+
reader := ps.createReader(mem, buf.Bytes())
890+
roundTripOutputTable := ps.readTable(reader)
891+
defer roundTripOutputTable.Release()
892+
893+
date32ExpectedOutputTable := makeDateTypeTable(mem, true, false)
894+
defer date32ExpectedOutputTable.Release()
895+
896+
ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable)
897+
}
898+
899+
func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() {
900+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
901+
defer mem.AssertSize(ps.T(), 0)
902+
903+
date64InputTableNotAlignedToDateBoundary := makeDateTypeTable(mem, false, true)
904+
defer date64InputTableNotAlignedToDateBoundary.Release()
905+
buf := writeTableToBuffer(ps.T(), mem, date64InputTableNotAlignedToDateBoundary, date64InputTableNotAlignedToDateBoundary.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
906+
defer buf.Release()
907+
908+
reader := ps.createReader(mem, buf.Bytes())
909+
roundTripOutputTable := ps.readTable(reader)
910+
defer roundTripOutputTable.Release()
911+
912+
date32ExpectedOutputTable := makeDateTypeTable(mem, true, true)
913+
defer date32ExpectedOutputTable.Release()
914+
915+
ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable)
916+
}
917+
834918
func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() {
835919
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
836920
defer mem.AssertSize(ps.T(), 0)

‎go/parquet/pqarrow/schema.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,8 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties
326326
typ = parquet.Types.Int32
327327
logicalType = schema.DateLogicalType{}
328328
case arrow.DATE64:
329-
typ = parquet.Types.Int64
330-
logicalType = schema.NewTimestampLogicalType(true, schema.TimeUnitMillis)
329+
typ = parquet.Types.Int32
330+
logicalType = schema.DateLogicalType{}
331331
case arrow.TIMESTAMP:
332332
typ, logicalType, err = getTimestampMeta(field.Type.(*arrow.TimestampType), props, arrprops)
333333
if err != nil {

‎go/parquet/pqarrow/schema_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func TestConvertArrowFlatPrimitives(t *testing.T) {
187187
arrowFields = append(arrowFields, arrow.Field{Name: "date", Type: arrow.FixedWidthTypes.Date32, Nullable: false})
188188

189189
parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("date64", parquet.Repetitions.Required,
190-
schema.NewTimestampLogicalType(true, schema.TimeUnitMillis), parquet.Types.Int64, 0, -1)))
190+
schema.DateLogicalType{}, parquet.Types.Int32, 0, -1)))
191191
arrowFields = append(arrowFields, arrow.Field{Name: "date64", Type: arrow.FixedWidthTypes.Date64, Nullable: false})
192192

193193
parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("time32", parquet.Repetitions.Required,

0 commit comments

Comments
 (0)
Please sign in to comment.