2015-07-30 23:41:02 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
#include <common/threadpool.hpp>
|
2015-07-30 23:41:02 +00:00
|
|
|
|
#include <DB/Interpreters/Aggregator.h>
|
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
2015-12-01 22:35:48 +00:00
|
|
|
|
#include <DB/Common/ConcurrentBoundedQueue.h>
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному или несколько (до merging_threads) блоков из каждого источника.
|
2015-07-30 23:41:02 +00:00
|
|
|
|
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата.
|
|
|
|
|
*
|
|
|
|
|
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
|
|
|
|
|
*/
|
|
|
|
|
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
|
|
|
|
|
{
|
|
|
|
|
public:
|
2015-12-01 22:35:48 +00:00
|
|
|
|
MergingAggregatedMemoryEfficientBlockInputStream(
|
2015-12-05 04:20:37 +00:00
|
|
|
|
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_,
|
|
|
|
|
size_t reading_threads_, size_t merging_threads_);
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-12-05 03:04:13 +00:00
|
|
|
|
~MergingAggregatedMemoryEfficientBlockInputStream();
|
|
|
|
|
|
2015-09-07 20:08:02 +00:00
|
|
|
|
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-09-08 19:53:16 +00:00
|
|
|
|
String getID() const override;
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
|
|
|
|
|
void readPrefix() override;
|
|
|
|
|
|
2015-07-30 23:41:02 +00:00
|
|
|
|
protected:
|
2015-09-08 19:53:16 +00:00
|
|
|
|
Block readImpl() override;
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
Aggregator aggregator;
|
|
|
|
|
bool final;
|
2015-12-05 04:20:37 +00:00
|
|
|
|
size_t reading_threads;
|
|
|
|
|
size_t merging_threads;
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-09-07 07:40:14 +00:00
|
|
|
|
bool started = false;
|
2015-12-05 04:20:37 +00:00
|
|
|
|
volatile bool has_two_level = false;
|
|
|
|
|
volatile bool has_overflows = false;
|
2015-09-07 07:40:14 +00:00
|
|
|
|
int current_bucket_num = -1;
|
|
|
|
|
|
|
|
|
|
struct Input
|
|
|
|
|
{
|
|
|
|
|
BlockInputStreamPtr stream;
|
|
|
|
|
Block block;
|
|
|
|
|
Block overflow_block;
|
|
|
|
|
std::vector<Block> splitted_blocks;
|
|
|
|
|
bool is_exhausted = false;
|
|
|
|
|
|
|
|
|
|
Input(BlockInputStreamPtr & stream_) : stream(stream_) {}
|
|
|
|
|
};
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-09-07 07:40:14 +00:00
|
|
|
|
std::vector<Input> inputs;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
|
|
|
|
using BlocksToMerge = Poco::SharedPtr<BlocksList>;
|
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
void start();
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
/// Получить блоки, которые можно мерджить. Это позволяет мерджить их параллельно в отдельных потоках.
|
|
|
|
|
BlocksToMerge getNextBlocksToMerge();
|
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
std::unique_ptr<boost::threadpool::pool> reading_pool;
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
/// Для параллельного мерджа.
|
|
|
|
|
struct OutputData
|
|
|
|
|
{
|
|
|
|
|
Block block;
|
|
|
|
|
std::exception_ptr exception;
|
|
|
|
|
|
|
|
|
|
OutputData() {}
|
|
|
|
|
OutputData(Block && block_) : block(std::move(block_)) {}
|
|
|
|
|
OutputData(std::exception_ptr && exception_) : exception(std::move(exception_)) {}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
struct ParallelMergeData
|
|
|
|
|
{
|
|
|
|
|
boost::threadpool::pool pool;
|
|
|
|
|
std::mutex get_next_blocks_mutex;
|
|
|
|
|
ConcurrentBoundedQueue<OutputData> result_queue;
|
2015-12-09 04:28:01 +00:00
|
|
|
|
bool exhausted = false; /// Данных больше нет.
|
|
|
|
|
bool finish = false; /// Нужно завершить работу раньше, чем данные закончились.
|
2015-12-03 02:43:40 +00:00
|
|
|
|
std::atomic<size_t> active_threads;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-03 02:43:40 +00:00
|
|
|
|
ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads), active_threads(max_threads) {}
|
2015-12-01 22:35:48 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<ParallelMergeData> parallel_merge_data;
|
|
|
|
|
|
|
|
|
|
void mergeThread(MemoryTracker * memory_tracker);
|
2015-07-30 23:41:02 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|