mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-10-01 06:00:49 +00:00
95 lines
3.2 KiB
C++
95 lines
3.2 KiB
C++
#pragma once
|
||
|
||
#include <common/threadpool.hpp>
|
||
#include <DB/Interpreters/Aggregator.h>
|
||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||
#include <DB/Common/ConcurrentBoundedQueue.h>
|
||
|
||
|
||
namespace DB
|
||
{
|
||
|
||
|
||
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному блоку из каждого потока.
|
||
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата.
|
||
*
|
||
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
|
||
*
|
||
* Замечания:
|
||
*
|
||
* На хорошей сети (10Gbit) может работать заметно медленнее, так как чтения блоков с разных
|
||
* удалённых серверов делаются последовательно, при этом, чтение упирается в CPU.
|
||
* Это несложно исправить.
|
||
*
|
||
* Можно держать в памяти не по одному блоку из каждого источника, а по несколько, и распараллелить мердж.
|
||
* При этом будет расходоваться кратно больше оперативки.
|
||
*/
|
||
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
|
||
{
|
||
public:
|
||
MergingAggregatedMemoryEfficientBlockInputStream(
|
||
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t threads_);
|
||
|
||
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
|
||
|
||
String getID() const override;
|
||
|
||
protected:
|
||
Block readImpl() override;
|
||
|
||
private:
|
||
Aggregator aggregator;
|
||
bool final;
|
||
size_t threads;
|
||
|
||
bool started = false;
|
||
bool has_two_level = false;
|
||
bool has_overflows = false;
|
||
int current_bucket_num = -1;
|
||
|
||
struct Input
|
||
{
|
||
BlockInputStreamPtr stream;
|
||
Block block;
|
||
Block overflow_block;
|
||
std::vector<Block> splitted_blocks;
|
||
bool is_exhausted = false;
|
||
|
||
Input(BlockInputStreamPtr & stream_) : stream(stream_) {}
|
||
};
|
||
|
||
std::vector<Input> inputs;
|
||
|
||
using BlocksToMerge = Poco::SharedPtr<BlocksList>;
|
||
|
||
/// Получить блоки, которые можно мерджить. Это позволяет мерджить их параллельно в отдельных потоках.
|
||
BlocksToMerge getNextBlocksToMerge();
|
||
|
||
/// Для параллельного мерджа.
|
||
struct OutputData
|
||
{
|
||
Block block;
|
||
std::exception_ptr exception;
|
||
|
||
OutputData() {}
|
||
OutputData(Block && block_) : block(std::move(block_)) {}
|
||
OutputData(std::exception_ptr && exception_) : exception(std::move(exception_)) {}
|
||
};
|
||
|
||
struct ParallelMergeData
|
||
{
|
||
boost::threadpool::pool pool;
|
||
std::mutex get_next_blocks_mutex;
|
||
ConcurrentBoundedQueue<OutputData> result_queue;
|
||
bool exhausted = false;
|
||
|
||
ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads) {}
|
||
};
|
||
|
||
std::unique_ptr<ParallelMergeData> parallel_merge_data;
|
||
|
||
void mergeThread(MemoryTracker * memory_tracker);
|
||
};
|
||
|
||
}
|