diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index e4f544a3b05..cf97941c8c2 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1605,7 +1605,7 @@ void NO_INLINE Aggregator::mergeBucketImpl( /** Combines aggregation states together, turns them into blocks, and outputs streams. - * If the aggregation states are two-level, then it produces blocks strictly in order bucket_num. + * If the aggregation states are two-level, then it produces blocks strictly in order of 'bucket_num'. * (This is important for distributed processing.) * In doing so, it can handle different buckets in parallel, using up to `threads` threads. */ @@ -1636,6 +1636,16 @@ public: return res.str(); } + ~MergingAndConvertingBlockInputStream() + { + LOG_TRACE(&Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish"); + + /// We need to wait for threads to finish before destructor of 'parallel_merge_data', + /// because the threads access 'parallel_merge_data'. + if (parallel_merge_data) + parallel_merge_data->pool.wait(); + } + protected: Block readImpl() override { @@ -1732,19 +1742,13 @@ private: struct ParallelMergeData { - ThreadPool pool; std::map ready_blocks; std::exception_ptr exception; std::mutex mutex; std::condition_variable condvar; + ThreadPool pool; explicit ParallelMergeData(size_t threads) : pool(threads) {} - - ~ParallelMergeData() - { - LOG_TRACE(&Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish"); - pool.wait(); - } }; std::unique_ptr parallel_merge_data;