Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-45788: [C++][Acero] Fix data race in aggregate node #45789

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions cpp/src/arrow/acero/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {

Status StartProducing() override {
NoteStartProducing(ToStringExtra(0));
local_states_.resize(plan_->query_context()->max_concurrency());
return Status::OK();
}

Expand All @@ -212,6 +213,17 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
std::string ToStringExtra(int indent) const override;

private:
struct ThreadLocalState {
// Holds the segment key values of the most recent input batch processed by a thread.
// The values are updated every time an input batch is processed by the thread
std::vector<Datum> segmenter_values;
};

ThreadLocalState* GetLocalState() {
size_t thread_index = plan_->query_context()->GetThreadIndex();
return &local_states_[thread_index];
}

Status ResetKernelStates();

Status OutputResult(bool is_last);
Expand All @@ -220,9 +232,6 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
std::unique_ptr<RowSegmenter> segmenter_;
// Field indices corresponding to the segment-keys
const std::vector<int> segment_field_ids_;
// Holds the value of segment keys of the most recent input batch
// The values are updated every time an input batch is processed
std::vector<Datum> segmenter_values_;

const std::vector<std::vector<int>> target_fieldsets_;
const std::vector<Aggregate> aggs_;
Expand All @@ -233,6 +242,9 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
std::vector<std::vector<std::unique_ptr<KernelState>>> states_;

AtomicCounter input_counter_;

std::vector<ThreadLocalState> local_states_;

/// \brief Total number of output batches produced
int total_output_batches_ = 0;
};
Expand Down Expand Up @@ -309,6 +321,9 @@ class GroupByNode : public ExecNode, public TracedNode {
struct ThreadLocalState {
std::unique_ptr<Grouper> grouper;
std::vector<std::unique_ptr<KernelState>> agg_states;
// Holds values of the current batch in this thread that were selected for the
// segment-keys
std::vector<Datum> segmenter_values;
};

ThreadLocalState* GetLocalState() {
Expand All @@ -330,8 +345,6 @@ class GroupByNode : public ExecNode, public TracedNode {
int output_task_group_id_;
/// \brief A segmenter for the segment-keys
std::unique_ptr<RowSegmenter> segmenter_;
/// \brief Holds values of the current batch that were selected for the segment-keys
std::vector<Datum> segmenter_values_;

const std::vector<int> key_field_ids_;
/// \brief Field indices corresponding to the segment-keys
Expand Down
39 changes: 39 additions & 0 deletions cpp/src/arrow/acero/aggregate_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,26 @@ TEST(GroupByNode, NoSkipNulls) {
AssertExecBatchesEqualIgnoringOrder(out_schema, {expected_batch}, out_batches.batches);
}

TEST(GroupByNode, BasicParallel) {
const int64_t num_batches = 8;

std::vector<ExecBatch> batches(num_batches, ExecBatchFromJSON({int32()}, "[[42]]"));

Declaration plan = Declaration::Sequence(
{{"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("key", int32())}), batches)},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_count_all", "count(*)"}},
/*keys=*/{"key"}}}});

ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema out_batches,
DeclarationToExecBatches(plan));

ExecBatch expected_batch = ExecBatchFromJSON(
{int32(), int64()}, "[[42, " + std::to_string(num_batches) + "]]");
AssertExecBatchesEqualIgnoringOrder(out_batches.schema, {expected_batch},
out_batches.batches);
}

TEST(ScalarAggregateNode, AnyAll) {
// GH-43768: boolean_any and boolean_all with constant input should work well
// when min_count != 0.
Expand Down Expand Up @@ -265,5 +285,24 @@ TEST(ScalarAggregateNode, AnyAll) {
}
}

TEST(ScalarAggregateNode, BasicParallel) {
const int64_t num_batches = 8;

std::vector<ExecBatch> batches(num_batches, ExecBatchFromJSON({int32()}, "[[42]]"));

Declaration plan = Declaration::Sequence(
{{"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("", int32())}), batches)},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{{"count_all", "count(*)"}}}}});

ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema out_batches,
DeclarationToExecBatches(plan));

ExecBatch expected_batch =
ExecBatchFromJSON({int64()}, "[[" + std::to_string(num_batches) + "]]");
AssertExecBatchesEqualIgnoringOrder(out_batches.schema, {expected_batch},
out_batches.batches);
}

} // namespace acero
} // namespace arrow
6 changes: 3 additions & 3 deletions cpp/src/arrow/acero/groupby_aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ Result<ExecBatch> GroupByNode::Finalize() {
segment_key_field_ids_.size());

// Segment keys come first
PlaceFields(out_data, 0, segmenter_values_);
PlaceFields(out_data, 0, state->segmenter_values);
Copy link
Member

@pitrou pitrou Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the Finalize step only considers the segmenter values for state[0]? I'm not sure I understand why.

// Followed by keys
ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
std::move(out_keys.values.begin(), out_keys.values.end(),
Expand Down Expand Up @@ -379,8 +379,8 @@ Status GroupByNode::InputReceived(ExecNode* input, ExecBatch batch) {
auto exec_batch = full_batch.Slice(segment.offset, segment.length);
auto batch = ExecSpan(exec_batch);
RETURN_NOT_OK(Consume(batch));
RETURN_NOT_OK(
ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_key_field_ids_));
RETURN_NOT_OK(ExtractSegmenterValues(&GetLocalState()->segmenter_values, exec_batch,
segment_key_field_ids_));
if (!segment.is_open) RETURN_NOT_OK(OutputResult(/*is_last=*/false));
return Status::OK();
};
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/acero/scalar_aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ Status ScalarAggregateNode::InputReceived(ExecNode* input, ExecBatch batch) {
// We add segment to the current segment group aggregation
auto exec_batch = full_batch.Slice(segment.offset, segment.length);
RETURN_NOT_OK(DoConsume(ExecSpan(exec_batch), thread_index));
RETURN_NOT_OK(
ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_field_ids_));
RETURN_NOT_OK(ExtractSegmenterValues(&GetLocalState()->segmenter_values, exec_batch,
segment_field_ids_));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OutputResult seems non-thread safe, how can InputReceived be called from several threads at once? Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OutputResult seems non-thread safe,

True.

how can InputReceived be called from several threads at once?

InputReceived can't be parallel if there exists any segment keys [1]. So if InputReceived is parallel, then the segmenter will be NoKeysSegmenter which only generates "open and extending" segments, which in turn never triggers OutputResult.

[1]

if (is_cpu_parallel && segment_keys.size() > 0) {
return Status::NotImplemented("Segmented aggregation in a multi-threaded plan");
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so the data race occurred in the non-segmented case? It's weird that we have to change the segmenting state to thread-local to fix that :)

Copy link
Contributor Author

@zanmato1984 zanmato1984 Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so the data race occurred in the non-segmented case?

Yes. The race happens for the trivial segmenter, which essentially does nothing, but concurrently clears the segmenter_values every time.

It is weird and I may even withdraw this fix. Please wait for my answer for the other comment (it's long and I'm still writing). Thank you.


// If the segment closes the current segment group, we can output segment group
// aggregation.
Expand Down Expand Up @@ -292,7 +292,7 @@ Status ScalarAggregateNode::OutputResult(bool is_last) {
batch.values.resize(kernels_.size() + segment_field_ids_.size());

// First, insert segment keys
PlaceFields(batch, /*base=*/0, segmenter_values_);
PlaceFields(batch, /*base=*/0, GetLocalState()->segmenter_values);

// Followed by aggregate values
std::size_t base = segment_field_ids_.size();
Expand Down
Loading