From 9d8ddbb6646d2e902b68d4f107608cfdd0189639 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Thu, 13 Feb 2025 01:49:46 +0800 Subject: [PATCH 1/4] Fix agg data race and add tests. --- cpp/src/arrow/acero/aggregate_internal.h | 23 ++++++++--- cpp/src/arrow/acero/aggregate_node_test.cc | 39 +++++++++++++++++++ cpp/src/arrow/acero/groupby_aggregate_node.cc | 6 +-- cpp/src/arrow/acero/scalar_aggregate_node.cc | 6 +-- 4 files changed, 63 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/acero/aggregate_internal.h b/cpp/src/arrow/acero/aggregate_internal.h index 7cdc424cbb76b..2cdf56934a3e4 100644 --- a/cpp/src/arrow/acero/aggregate_internal.h +++ b/cpp/src/arrow/acero/aggregate_internal.h @@ -195,6 +195,7 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { Status StartProducing() override { NoteStartProducing(ToStringExtra(0)); + local_states_.resize(plan_->query_context()->max_concurrency()); return Status::OK(); } @@ -212,6 +213,17 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { std::string ToStringExtra(int indent) const override; private: + struct ThreadLocalState { + // Holds the segment key values of the most recent input batch processed by a thread. + // The values are updated every time an input batch is processed by the thread + std::vector segmenter_values; + }; + + ThreadLocalState* GetLocalState() { + size_t thread_index = plan_->query_context()->GetThreadIndex(); + return &local_states_[thread_index]; + } + Status ResetKernelStates(); Status OutputResult(bool is_last); @@ -220,9 +232,6 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { std::unique_ptr segmenter_; // Field indices corresponding to the segment-keys const std::vector segment_field_ids_; - // Holds the value of segment keys of the most recent input batch - // The values are updated every time an input batch is processed - std::vector segmenter_values_; const std::vector> target_fieldsets_; const std::vector aggs_; @@ -233,6 +242,9 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { std::vector>> states_; AtomicCounter input_counter_; + + std::vector local_states_; + /// \brief Total number of output batches produced int total_output_batches_ = 0; }; @@ -309,6 +321,9 @@ class GroupByNode : public ExecNode, public TracedNode { struct ThreadLocalState { std::unique_ptr grouper; std::vector> agg_states; + // Holds values of the current batch in this thread that were selected for the + // segment-keys + std::vector segmenter_values; }; ThreadLocalState* GetLocalState() { @@ -330,8 +345,6 @@ class GroupByNode : public ExecNode, public TracedNode { int output_task_group_id_; /// \brief A segmenter for the segment-keys std::unique_ptr segmenter_; - /// \brief Holds values of the current batch that were selected for the segment-keys - std::vector segmenter_values_; const std::vector key_field_ids_; /// \brief Field indices corresponding to the segment-keys diff --git a/cpp/src/arrow/acero/aggregate_node_test.cc b/cpp/src/arrow/acero/aggregate_node_test.cc index f980496d527d1..73476a681e186 100644 --- a/cpp/src/arrow/acero/aggregate_node_test.cc +++ b/cpp/src/arrow/acero/aggregate_node_test.cc @@ -213,6 +213,26 @@ TEST(GroupByNode, NoSkipNulls) { AssertExecBatchesEqualIgnoringOrder(out_schema, {expected_batch}, out_batches.batches); } +TEST(GroupByNode, BasicParallel) { + const int64_t num_batches = 8; + + std::vector batches(num_batches, ExecBatchFromJSON({int32()}, "[[42]]")); + + Declaration plan = Declaration::Sequence( + {{"exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("key", int32())}), batches)}, + {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_count_all", "count(*)"}}, + /*keys=*/{"key"}}}}); + + ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema out_batches, + DeclarationToExecBatches(plan)); + + ExecBatch expected_batch = ExecBatchFromJSON( + {int32(), int64()}, "[[42, " + std::to_string(num_batches) + "]]"); + AssertExecBatchesEqualIgnoringOrder(out_batches.schema, {expected_batch}, + out_batches.batches); +} + TEST(ScalarAggregateNode, AnyAll) { // GH-43768: boolean_any and boolean_all with constant input should work well // when min_count != 0. @@ -265,5 +285,24 @@ TEST(ScalarAggregateNode, AnyAll) { } } +TEST(ScalarAggregateNode, BasicParallel) { + const int64_t num_batches = 8; + + std::vector batches(num_batches, ExecBatchFromJSON({int32()}, "[[42]]")); + + Declaration plan = Declaration::Sequence( + {{"exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("", int32())}), batches)}, + {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"count_all", "count(*)"}}}}}); + + ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema out_batches, + DeclarationToExecBatches(plan)); + + ExecBatch expected_batch = + ExecBatchFromJSON({int64()}, "[[" + std::to_string(num_batches) + "]]"); + AssertExecBatchesEqualIgnoringOrder(out_batches.schema, {expected_batch}, + out_batches.batches); +} + } // namespace acero } // namespace arrow diff --git a/cpp/src/arrow/acero/groupby_aggregate_node.cc b/cpp/src/arrow/acero/groupby_aggregate_node.cc index 2beef360b45d4..12596e56ce0aa 100644 --- a/cpp/src/arrow/acero/groupby_aggregate_node.cc +++ b/cpp/src/arrow/acero/groupby_aggregate_node.cc @@ -312,7 +312,7 @@ Result GroupByNode::Finalize() { segment_key_field_ids_.size()); // Segment keys come first - PlaceFields(out_data, 0, segmenter_values_); + PlaceFields(out_data, 0, state->segmenter_values); // Followed by keys ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques()); std::move(out_keys.values.begin(), out_keys.values.end(), @@ -379,8 +379,8 @@ Status GroupByNode::InputReceived(ExecNode* input, ExecBatch batch) { auto exec_batch = full_batch.Slice(segment.offset, segment.length); auto batch = ExecSpan(exec_batch); RETURN_NOT_OK(Consume(batch)); - RETURN_NOT_OK( - ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_key_field_ids_)); + RETURN_NOT_OK(ExtractSegmenterValues(&GetLocalState()->segmenter_values, exec_batch, + segment_key_field_ids_)); if (!segment.is_open) RETURN_NOT_OK(OutputResult(/*is_last=*/false)); return Status::OK(); }; diff --git a/cpp/src/arrow/acero/scalar_aggregate_node.cc b/cpp/src/arrow/acero/scalar_aggregate_node.cc index b34f7511cc12b..b374daebef78d 100644 --- a/cpp/src/arrow/acero/scalar_aggregate_node.cc +++ b/cpp/src/arrow/acero/scalar_aggregate_node.cc @@ -240,8 +240,8 @@ Status ScalarAggregateNode::InputReceived(ExecNode* input, ExecBatch batch) { // We add segment to the current segment group aggregation auto exec_batch = full_batch.Slice(segment.offset, segment.length); RETURN_NOT_OK(DoConsume(ExecSpan(exec_batch), thread_index)); - RETURN_NOT_OK( - ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_field_ids_)); + RETURN_NOT_OK(ExtractSegmenterValues(&GetLocalState()->segmenter_values, exec_batch, + segment_field_ids_)); // If the segment closes the current segment group, we can output segment group // aggregation. @@ -292,7 +292,7 @@ Status ScalarAggregateNode::OutputResult(bool is_last) { batch.values.resize(kernels_.size() + segment_field_ids_.size()); // First, insert segment keys - PlaceFields(batch, /*base=*/0, segmenter_values_); + PlaceFields(batch, /*base=*/0, GetLocalState()->segmenter_values); // Followed by aggregate values std::size_t base = segment_field_ids_.size(); From f2af2d7c3388de6a3bf6f423697c620bb1563e79 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Wed, 19 Mar 2025 19:06:12 +0800 Subject: [PATCH 2/4] Re-fix --- cpp/src/arrow/acero/aggregate_internal.cc | 7 ++--- cpp/src/arrow/acero/aggregate_internal.h | 30 ++++++------------- cpp/src/arrow/acero/groupby_aggregate_node.cc | 6 ++-- cpp/src/arrow/acero/scalar_aggregate_node.cc | 6 ++-- 4 files changed, 17 insertions(+), 32 deletions(-) diff --git a/cpp/src/arrow/acero/aggregate_internal.cc b/cpp/src/arrow/acero/aggregate_internal.cc index 0c1bc3db365a6..68cbc8549c516 100644 --- a/cpp/src/arrow/acero/aggregate_internal.cc +++ b/cpp/src/arrow/acero/aggregate_internal.cc @@ -177,14 +177,11 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema, *ss << ']'; } -Status ExtractSegmenterValues(std::vector* values_ptr, - const ExecBatch& input_batch, +Status ExtractSegmenterValues(std::vector& values, const ExecBatch& input_batch, const std::vector& field_ids) { + DCHECK_EQ(values.size(), field_ids.size()); DCHECK_GT(input_batch.length, 0); - std::vector& values = *values_ptr; int64_t row = input_batch.length - 1; - values.clear(); - values.resize(field_ids.size()); for (size_t i = 0; i < field_ids.size(); i++) { const Datum& value = input_batch.values[field_ids[i]]; if (value.is_scalar()) { diff --git a/cpp/src/arrow/acero/aggregate_internal.h b/cpp/src/arrow/acero/aggregate_internal.h index 2cdf56934a3e4..94622f9149059 100644 --- a/cpp/src/arrow/acero/aggregate_internal.h +++ b/cpp/src/arrow/acero/aggregate_internal.h @@ -143,11 +143,10 @@ Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch, } /// @brief Extract values of segment keys from a segment batch -/// @param[out] values_ptr Vector to store the extracted segment key values +/// @param[out] values Vector to store the extracted segment key values /// @param[in] input_batch Segment batch. Must have the a constant value for segment key /// @param[in] field_ids Segment key field ids -Status ExtractSegmenterValues(std::vector* values_ptr, - const ExecBatch& input_batch, +Status ExtractSegmenterValues(std::vector& values, const ExecBatch& input_batch, const std::vector& field_ids); Result> ExtractValues(const ExecBatch& input_batch, @@ -171,6 +170,7 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { TracedNode(this), segmenter_(std::move(segmenter)), segment_field_ids_(std::move(segment_field_ids)), + segmenter_values_(segment_field_ids_.size()), target_fieldsets_(std::move(target_fieldsets)), aggs_(std::move(aggs)), kernels_(std::move(kernels)), @@ -195,7 +195,6 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { Status StartProducing() override { NoteStartProducing(ToStringExtra(0)); - local_states_.resize(plan_->query_context()->max_concurrency()); return Status::OK(); } @@ -213,17 +212,6 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { std::string ToStringExtra(int indent) const override; private: - struct ThreadLocalState { - // Holds the segment key values of the most recent input batch processed by a thread. - // The values are updated every time an input batch is processed by the thread - std::vector segmenter_values; - }; - - ThreadLocalState* GetLocalState() { - size_t thread_index = plan_->query_context()->GetThreadIndex(); - return &local_states_[thread_index]; - } - Status ResetKernelStates(); Status OutputResult(bool is_last); @@ -232,6 +220,9 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { std::unique_ptr segmenter_; // Field indices corresponding to the segment-keys const std::vector segment_field_ids_; + // Holds the value of segment keys of the most recent input batch + // The values are updated every time an input batch is processed + std::vector segmenter_values_; const std::vector> target_fieldsets_; const std::vector aggs_; @@ -242,9 +233,6 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { std::vector>> states_; AtomicCounter input_counter_; - - std::vector local_states_; - /// \brief Total number of output batches produced int total_output_batches_ = 0; }; @@ -261,6 +249,7 @@ class GroupByNode : public ExecNode, public TracedNode { : ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema)), TracedNode(this), segmenter_(std::move(segmenter)), + segmenter_values_(segment_key_field_ids.size()), key_field_ids_(std::move(key_field_ids)), segment_key_field_ids_(std::move(segment_key_field_ids)), agg_src_types_(std::move(agg_src_types)), @@ -321,9 +310,6 @@ class GroupByNode : public ExecNode, public TracedNode { struct ThreadLocalState { std::unique_ptr grouper; std::vector> agg_states; - // Holds values of the current batch in this thread that were selected for the - // segment-keys - std::vector segmenter_values; }; ThreadLocalState* GetLocalState() { @@ -345,6 +331,8 @@ class GroupByNode : public ExecNode, public TracedNode { int output_task_group_id_; /// \brief A segmenter for the segment-keys std::unique_ptr segmenter_; + /// \brief Holds values of the current batch that were selected for the segment-keys + std::vector segmenter_values_; const std::vector key_field_ids_; /// \brief Field indices corresponding to the segment-keys diff --git a/cpp/src/arrow/acero/groupby_aggregate_node.cc b/cpp/src/arrow/acero/groupby_aggregate_node.cc index 12596e56ce0aa..48af6d90e09d9 100644 --- a/cpp/src/arrow/acero/groupby_aggregate_node.cc +++ b/cpp/src/arrow/acero/groupby_aggregate_node.cc @@ -312,7 +312,7 @@ Result GroupByNode::Finalize() { segment_key_field_ids_.size()); // Segment keys come first - PlaceFields(out_data, 0, state->segmenter_values); + PlaceFields(out_data, 0, segmenter_values_); // Followed by keys ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques()); std::move(out_keys.values.begin(), out_keys.values.end(), @@ -379,8 +379,8 @@ Status GroupByNode::InputReceived(ExecNode* input, ExecBatch batch) { auto exec_batch = full_batch.Slice(segment.offset, segment.length); auto batch = ExecSpan(exec_batch); RETURN_NOT_OK(Consume(batch)); - RETURN_NOT_OK(ExtractSegmenterValues(&GetLocalState()->segmenter_values, exec_batch, - segment_key_field_ids_)); + RETURN_NOT_OK( + ExtractSegmenterValues(segmenter_values_, exec_batch, segment_key_field_ids_)); if (!segment.is_open) RETURN_NOT_OK(OutputResult(/*is_last=*/false)); return Status::OK(); }; diff --git a/cpp/src/arrow/acero/scalar_aggregate_node.cc b/cpp/src/arrow/acero/scalar_aggregate_node.cc index b374daebef78d..eb1cf04022aaf 100644 --- a/cpp/src/arrow/acero/scalar_aggregate_node.cc +++ b/cpp/src/arrow/acero/scalar_aggregate_node.cc @@ -240,8 +240,8 @@ Status ScalarAggregateNode::InputReceived(ExecNode* input, ExecBatch batch) { // We add segment to the current segment group aggregation auto exec_batch = full_batch.Slice(segment.offset, segment.length); RETURN_NOT_OK(DoConsume(ExecSpan(exec_batch), thread_index)); - RETURN_NOT_OK(ExtractSegmenterValues(&GetLocalState()->segmenter_values, exec_batch, - segment_field_ids_)); + RETURN_NOT_OK( + ExtractSegmenterValues(segmenter_values_, exec_batch, segment_field_ids_)); // If the segment closes the current segment group, we can output segment group // aggregation. @@ -292,7 +292,7 @@ Status ScalarAggregateNode::OutputResult(bool is_last) { batch.values.resize(kernels_.size() + segment_field_ids_.size()); // First, insert segment keys - PlaceFields(batch, /*base=*/0, GetLocalState()->segmenter_values); + PlaceFields(batch, /*base=*/0, segmenter_values_); // Followed by aggregate values std::size_t base = segment_field_ids_.size(); From 0520063425afd21e819ad87ed54efa5dbf7d0ff5 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Wed, 19 Mar 2025 22:34:49 +0800 Subject: [PATCH 3/4] Address review comments --- cpp/src/arrow/acero/aggregate_internal.cc | 6 ++++-- cpp/src/arrow/acero/aggregate_internal.h | 4 ++-- cpp/src/arrow/acero/groupby_aggregate_node.cc | 2 +- cpp/src/arrow/acero/scalar_aggregate_node.cc | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/acero/aggregate_internal.cc b/cpp/src/arrow/acero/aggregate_internal.cc index 68cbc8549c516..26e4888a596ad 100644 --- a/cpp/src/arrow/acero/aggregate_internal.cc +++ b/cpp/src/arrow/acero/aggregate_internal.cc @@ -177,10 +177,12 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema, *ss << ']'; } -Status ExtractSegmenterValues(std::vector& values, const ExecBatch& input_batch, +Status ExtractSegmenterValues(std::vector* values_ptr, + const ExecBatch& input_batch, const std::vector& field_ids) { - DCHECK_EQ(values.size(), field_ids.size()); DCHECK_GT(input_batch.length, 0); + std::vector& values = *values_ptr; + DCHECK_EQ(values.size(), field_ids.size()); int64_t row = input_batch.length - 1; for (size_t i = 0; i < field_ids.size(); i++) { const Datum& value = input_batch.values[field_ids[i]]; diff --git a/cpp/src/arrow/acero/aggregate_internal.h b/cpp/src/arrow/acero/aggregate_internal.h index 94622f9149059..08e26796f7cd0 100644 --- a/cpp/src/arrow/acero/aggregate_internal.h +++ b/cpp/src/arrow/acero/aggregate_internal.h @@ -143,10 +143,10 @@ Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch, } /// @brief Extract values of segment keys from a segment batch -/// @param[out] values Vector to store the extracted segment key values +/// @param[out] values_ptr Vector to store the extracted segment key values /// @param[in] input_batch Segment batch. Must have the a constant value for segment key /// @param[in] field_ids Segment key field ids -Status ExtractSegmenterValues(std::vector& values, const ExecBatch& input_batch, +Status ExtractSegmenterValues(std::vector* values_ptr, const ExecBatch& input_batch, const std::vector& field_ids); Result> ExtractValues(const ExecBatch& input_batch, diff --git a/cpp/src/arrow/acero/groupby_aggregate_node.cc b/cpp/src/arrow/acero/groupby_aggregate_node.cc index 48af6d90e09d9..2beef360b45d4 100644 --- a/cpp/src/arrow/acero/groupby_aggregate_node.cc +++ b/cpp/src/arrow/acero/groupby_aggregate_node.cc @@ -380,7 +380,7 @@ Status GroupByNode::InputReceived(ExecNode* input, ExecBatch batch) { auto batch = ExecSpan(exec_batch); RETURN_NOT_OK(Consume(batch)); RETURN_NOT_OK( - ExtractSegmenterValues(segmenter_values_, exec_batch, segment_key_field_ids_)); + ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_key_field_ids_)); if (!segment.is_open) RETURN_NOT_OK(OutputResult(/*is_last=*/false)); return Status::OK(); }; diff --git a/cpp/src/arrow/acero/scalar_aggregate_node.cc b/cpp/src/arrow/acero/scalar_aggregate_node.cc index eb1cf04022aaf..b34f7511cc12b 100644 --- a/cpp/src/arrow/acero/scalar_aggregate_node.cc +++ b/cpp/src/arrow/acero/scalar_aggregate_node.cc @@ -241,7 +241,7 @@ Status ScalarAggregateNode::InputReceived(ExecNode* input, ExecBatch batch) { auto exec_batch = full_batch.Slice(segment.offset, segment.length); RETURN_NOT_OK(DoConsume(ExecSpan(exec_batch), thread_index)); RETURN_NOT_OK( - ExtractSegmenterValues(segmenter_values_, exec_batch, segment_field_ids_)); + ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_field_ids_)); // If the segment closes the current segment group, we can output segment group // aggregation. From c80baea6a72bb42389b77cbc3deaf76925870441 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Wed, 19 Mar 2025 22:37:36 +0800 Subject: [PATCH 4/4] Format --- cpp/src/arrow/acero/aggregate_internal.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/aggregate_internal.h b/cpp/src/arrow/acero/aggregate_internal.h index 08e26796f7cd0..e2b1ab121f4c6 100644 --- a/cpp/src/arrow/acero/aggregate_internal.h +++ b/cpp/src/arrow/acero/aggregate_internal.h @@ -146,7 +146,8 @@ Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch, /// @param[out] values_ptr Vector to store the extracted segment key values /// @param[in] input_batch Segment batch. Must have the a constant value for segment key /// @param[in] field_ids Segment key field ids -Status ExtractSegmenterValues(std::vector* values_ptr, const ExecBatch& input_batch, +Status ExtractSegmenterValues(std::vector* values_ptr, + const ExecBatch& input_batch, const std::vector& field_ids); Result> ExtractValues(const ExecBatch& input_batch,