@@ -434,7 +434,8 @@ void DecideAdaptation(RadixHTGlobalSinkState &gstate, RadixHTLocalSinkState &lst
434434 }
435435}
436436
437- void MaybeRepartition (ClientContext &context, RadixHTGlobalSinkState &gstate, RadixHTLocalSinkState &lstate) {
437+ void MaybeRepartition (ClientContext &context, RadixHTGlobalSinkState &gstate, RadixHTLocalSinkState &lstate,
438+ const bool combine) {
438439 auto &config = gstate.config ;
439440 auto &ht = *lstate.ht ;
440441
@@ -476,7 +477,7 @@ void MaybeRepartition(ClientContext &context, RadixHTGlobalSinkState &gstate, Ra
476477 }
477478
478479 // We can go external when there are few threads, but we shouldn't repartition here
479- if (gstate.number_of_threads <= RadixHTConfig::GROW_STRATEGY_THREAD_THRESHOLD) {
480+ if (!combine && gstate.number_of_threads <= RadixHTConfig::GROW_STRATEGY_THREAD_THRESHOLD) {
480481 return ;
481482 }
482483
@@ -545,7 +546,7 @@ void RadixPartitionedHashTable::Sink(ExecutionContext &context, DataChunk &chunk
545546
546547 // Check if we need to repartition
547548 const auto radix_bits_before = ht.GetRadixBits ();
548- MaybeRepartition (context.client , gstate, lstate);
549+ MaybeRepartition (context.client , gstate, lstate, false );
549550 const auto repartitioned = radix_bits_before != ht.GetRadixBits ();
550551
551552 if (repartitioned && ht.Count () != 0 ) {
@@ -561,8 +562,6 @@ void RadixPartitionedHashTable::Sink(ExecutionContext &context, DataChunk &chunk
561562
562563void RadixPartitionedHashTable::Combine (ExecutionContext &context, GlobalSinkState &gstate_p,
563564 LocalSinkState &lstate_p) const {
564- // There is some defensive programming in here to try to avoid spurious issues
565- // See duckdblabs/duckdb-internal#6818 for more information
566565 auto &gstate = gstate_p.Cast <RadixHTGlobalSinkState>();
567566 auto &lstate = lstate_p.Cast <RadixHTLocalSinkState>();
568567 if (!lstate.ht ) {
@@ -571,7 +570,7 @@ void RadixPartitionedHashTable::Combine(ExecutionContext &context, GlobalSinkSta
571570
572571 // Set any_combined, then check one last time whether we need to repartition
573572 gstate.any_combined = true ;
574- MaybeRepartition (context.client , gstate, lstate);
573+ MaybeRepartition (context.client , gstate, lstate, true );
575574
576575 auto &ht = *lstate.ht ;
577576 auto lstate_data = ht.AcquirePartitionedData ();
@@ -585,29 +584,26 @@ void RadixPartitionedHashTable::Combine(ExecutionContext &context, GlobalSinkSta
585584 lstate.abandoned_data = std::move (lstate_data);
586585 }
587586
588- auto guard = gstate. Lock ();
589- if (gstate. finalized ) {
590- throw InternalException ( " RadixPartitionedHashTable: Combine called after Finalize! " );
591- }
587+ auto aggregate_allocator = ht. GetAggregateAllocator ();
588+
589+ // Eagerly destroy the HT
590+ lstate. ht . reset ();
592591
592+ auto guard = gstate.Lock ();
593+ D_ASSERT (!gstate.finalized );
593594 if (gstate.uncombined_data ) {
594595 gstate.uncombined_data ->Combine (*lstate.abandoned_data );
595596 } else {
596597 gstate.uncombined_data = std::move (lstate.abandoned_data );
597598 }
598- gstate.stored_allocators .emplace_back (ht. GetAggregateAllocator ( ));
599+ gstate.stored_allocators .emplace_back (std::move (aggregate_allocator ));
599600 gstate.stored_allocators_size += gstate.stored_allocators .back ()->AllocationSize ();
600-
601- // Eagerly destroy the HT
602- lstate.ht .reset ();
603601}
604602
605603void RadixPartitionedHashTable::Finalize (ClientContext &context, GlobalSinkState &gstate_p) const {
606604 auto &gstate = gstate_p.Cast <RadixHTGlobalSinkState>();
607605 auto guard = gstate.Lock ();
608- if (gstate.finalized ) {
609- throw InternalException (" RadixPartitionedHashTable: Finalize called again!" );
610- }
606+ D_ASSERT (!gstate.finalized );
611607
612608 if (gstate.uncombined_data ) {
613609 auto &uncombined_data = *gstate.uncombined_data ;
0 commit comments