Fixed error that reproduced only when using libc++ [#CLICKHOUSE-3174].

This commit is contained in:
Alexey Milovidov 2017-09-09 05:12:53 +03:00
parent a0f6f406c8
commit 92d7d6bd44

View File

@ -1605,7 +1605,7 @@ void NO_INLINE Aggregator::mergeBucketImpl(
/** Combines aggregation states together, turns them into blocks, and outputs streams.
* If the aggregation states are two-level, then it produces blocks strictly in order bucket_num.
* If the aggregation states are two-level, then it produces blocks strictly in order of 'bucket_num'.
* (This is important for distributed processing.)
* In doing so, it can handle different buckets in parallel, using up to `threads` threads.
*/
@ -1636,6 +1636,16 @@ public:
return res.str();
}
~MergingAndConvertingBlockInputStream()
{
LOG_TRACE(&Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish");
/// We need to wait for threads to finish before destructor of 'parallel_merge_data',
/// because the threads access 'parallel_merge_data'.
if (parallel_merge_data)
parallel_merge_data->pool.wait();
}
protected:
Block readImpl() override
{
@ -1732,19 +1742,13 @@ private:
struct ParallelMergeData
{
ThreadPool pool;
std::map<Int32, Block> ready_blocks;
std::exception_ptr exception;
std::mutex mutex;
std::condition_variable condvar;
ThreadPool pool;
explicit ParallelMergeData(size_t threads) : pool(threads) {}
~ParallelMergeData()
{
LOG_TRACE(&Logger::get(__PRETTY_FUNCTION__), "Waiting for threads to finish");
pool.wait();
}
};
std::unique_ptr<ParallelMergeData> parallel_merge_data;