@@ -403,6 +403,42 @@ struct GrouperImpl : public Grouper {
403
403
return ConsumeImpl (batch, offset, length, GrouperMode::kLookup );
404
404
}
405
405
406
+ template <typename VisitGroupFunc, typename VisitUnknownGroupFunc>
407
+ void VisitKeys (int64_t length, const int32_t * key_offsets, const uint8_t * key_data,
408
+ bool insert_new_keys, VisitGroupFunc&& visit_group,
409
+ VisitUnknownGroupFunc&& visit_unknown_group) {
410
+ for (int64_t i = 0 ; i < length; ++i) {
411
+ const int32_t key_length = key_offsets[i + 1 ] - key_offsets[i];
412
+ const uint8_t * key_ptr = key_data + key_offsets[i];
413
+ std::string key (reinterpret_cast <const char *>(key_ptr), key_length);
414
+
415
+ uint32_t group_id;
416
+ if (insert_new_keys) {
417
+ const auto [it, inserted] = map_.emplace (std::move (key), num_groups_);
418
+ if (inserted) {
419
+ // New key: update offsets and key_bytes
420
+ ++num_groups_;
421
+ if (key_length > 0 ) {
422
+ const auto next_key_offset = static_cast <int32_t >(key_bytes_.size ());
423
+ key_bytes_.resize (next_key_offset + key_length);
424
+ offsets_.push_back (next_key_offset + key_length);
425
+ memcpy (key_bytes_.data () + next_key_offset, key_ptr, key_length);
426
+ }
427
+ }
428
+ group_id = it->second ;
429
+ } else {
430
+ const auto it = map_.find (std::move (key));
431
+ if (it == map_.end ()) {
432
+ // Key not found
433
+ visit_unknown_group ();
434
+ continue ;
435
+ }
436
+ group_id = it->second ;
437
+ }
438
+ visit_group (group_id);
439
+ }
440
+ }
441
+
406
442
Result<Datum> ConsumeImpl (const ExecSpan& batch, int64_t offset, int64_t length,
407
443
GrouperMode mode) {
408
444
ARROW_RETURN_NOT_OK (CheckAndCapLengthForConsume (batch.length , offset, &length));
@@ -433,56 +469,12 @@ struct GrouperImpl : public Grouper {
433
469
RETURN_NOT_OK (encoders_[i]->Encode (batch[i], batch.length , key_buf_ptrs.data ()));
434
470
}
435
471
436
- using MapIterator = typename decltype (map_)::iterator;
437
-
438
- struct LookupResult {
439
- bool inserted;
440
- bool found;
441
- MapIterator it;
442
- };
443
-
444
- auto generate_keys = [&](auto && lookup_key, auto && visit_group,
445
- auto && visit_unknown_group) {
446
- for (int64_t i = 0 ; i < batch.length ; ++i) {
447
- int32_t key_length = offsets_batch[i + 1 ] - offsets_batch[i];
448
- std::string key (
449
- reinterpret_cast <const char *>(key_bytes_batch.data () + offsets_batch[i]),
450
- key_length);
451
-
452
- LookupResult res = lookup_key (std::move (key), num_groups_);
453
-
454
- if (res.inserted ) {
455
- // new key; update offsets and key_bytes
456
- ++num_groups_;
457
- // Skip if there are no keys
458
- if (key_length > 0 ) {
459
- auto next_key_offset = static_cast <int32_t >(key_bytes_.size ());
460
- key_bytes_.resize (next_key_offset + key_length);
461
- offsets_.push_back (next_key_offset + key_length);
462
- memcpy (key_bytes_.data () + next_key_offset, key.c_str (), key_length);
463
- }
464
- }
465
-
466
- if (res.found ) {
467
- visit_group (res.it ->second );
468
- } else {
469
- visit_unknown_group ();
470
- }
471
- }
472
- };
473
-
474
- auto lookup_or_insert_key = [&](auto && key, uint32_t new_group_id) -> LookupResult {
475
- auto [it, inserted] = map_.emplace (key, new_group_id);
476
- return {inserted, /* found=*/ true , it};
477
- };
478
- auto lookup_key = [&](auto && key, uint32_t new_group_id) -> LookupResult {
479
- auto it = map_.find (key);
480
- return {/* inserted=*/ false , /* found=*/ it != map_.end (), it};
481
- };
482
-
483
472
if (mode == GrouperMode::kPopulate ) {
484
- generate_keys (
485
- lookup_or_insert_key, [](uint32_t group_id) {}, [] {});
473
+ VisitKeys (
474
+ batch.length , offsets_batch.data (), key_bytes_batch.data (),
475
+ /* insert_new_keys=*/ true ,
476
+ /* visit_group=*/ [](...) {},
477
+ /* visit_unknown_group=*/ [] {});
486
478
return Datum ();
487
479
}
488
480
@@ -496,10 +488,12 @@ struct GrouperImpl : public Grouper {
496
488
};
497
489
auto visit_unknown_group = [] {};
498
490
499
- generate_keys (lookup_or_insert_key, visit_group, visit_unknown_group);
491
+ VisitKeys (batch.length , offsets_batch.data (), key_bytes_batch.data (),
492
+ /* insert_new_keys=*/ true , visit_group, visit_unknown_group);
500
493
} else {
501
494
DCHECK_EQ (mode, GrouperMode::kLookup );
502
495
496
+ // Create a null bitmap to indicate which keys were found.
503
497
TypedBufferBuilder<bool > null_bitmap_builder (ctx_->memory_pool ());
504
498
RETURN_NOT_OK (null_bitmap_builder.Resize (batch.length ));
505
499
@@ -512,7 +506,8 @@ struct GrouperImpl : public Grouper {
512
506
null_bitmap_builder.UnsafeAppend (false );
513
507
};
514
508
515
- generate_keys (lookup_key, visit_group, visit_unknown_group);
509
+ VisitKeys (batch.length , offsets_batch.data (), key_bytes_batch.data (),
510
+ /* insert_new_keys=*/ false , visit_group, visit_unknown_group);
516
511
517
512
ARROW_ASSIGN_OR_RAISE (null_bitmap, null_bitmap_builder.Finish ());
518
513
}
0 commit comments