ClickHouse/dbms/include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h

98 lines
3.3 KiB
C
Raw Normal View History

#pragma once
#include <common/threadpool.hpp>
#include <DB/Interpreters/Aggregator.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/ConcurrentBoundedQueue.h>
namespace DB
{
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному блоку из каждого потока.
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата.
*
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
2015-09-07 21:20:28 +00:00
*
* Замечания:
*
* На хорошей сети (10Gbit) может работать заметно медленнее, так как чтения блоков с разных
* удалённых серверов делаются последовательно, при этом, чтение упирается в CPU.
* Это несложно исправить.
*
* Можно держать в памяти не по одному блоку из каждого источника, а по несколько, и распараллелить мердж.
* При этом будет расходоваться кратно больше оперативки.
*/
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
{
public:
MergingAggregatedMemoryEfficientBlockInputStream(
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t threads_);
2015-12-05 03:04:13 +00:00
~MergingAggregatedMemoryEfficientBlockInputStream();
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
2015-09-08 19:53:16 +00:00
String getID() const override;
protected:
2015-09-08 19:53:16 +00:00
Block readImpl() override;
private:
Aggregator aggregator;
bool final;
size_t threads;
bool started = false;
bool has_two_level = false;
bool has_overflows = false;
int current_bucket_num = -1;
struct Input
{
BlockInputStreamPtr stream;
Block block;
Block overflow_block;
std::vector<Block> splitted_blocks;
bool is_exhausted = false;
Input(BlockInputStreamPtr & stream_) : stream(stream_) {}
};
std::vector<Input> inputs;
using BlocksToMerge = Poco::SharedPtr<BlocksList>;
/// Получить блоки, которые можно мерджить. Это позволяет мерджить их параллельно в отдельных потоках.
BlocksToMerge getNextBlocksToMerge();
/// Для параллельного мерджа.
struct OutputData
{
Block block;
std::exception_ptr exception;
OutputData() {}
OutputData(Block && block_) : block(std::move(block_)) {}
OutputData(std::exception_ptr && exception_) : exception(std::move(exception_)) {}
};
struct ParallelMergeData
{
boost::threadpool::pool pool;
std::mutex get_next_blocks_mutex;
ConcurrentBoundedQueue<OutputData> result_queue;
bool exhausted = false;
std::atomic<size_t> active_threads;
ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads), active_threads(max_threads) {}
};
std::unique_ptr<ParallelMergeData> parallel_merge_data;
void mergeThread(MemoryTracker * memory_tracker);
};
}