2015-07-30 23:41:02 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
#include <DB/Interpreters/Aggregator.h>
|
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
2015-12-01 22:35:48 +00:00
|
|
|
|
#include <DB/Common/ConcurrentBoundedQueue.h>
|
2016-08-02 01:46:05 +00:00
|
|
|
|
#include <DB/Common/ThreadPool.h>
|
2015-12-13 15:07:01 +00:00
|
|
|
|
#include <condition_variable>
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному или несколько (до merging_threads) блоков из каждого источника.
|
2015-12-13 15:07:01 +00:00
|
|
|
|
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом источнике будет до 256 блоков с частями результата.
|
2015-07-30 23:41:02 +00:00
|
|
|
|
*
|
|
|
|
|
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
|
2015-12-13 15:07:01 +00:00
|
|
|
|
*
|
|
|
|
|
* Используется для решения двух задач:
|
|
|
|
|
*
|
|
|
|
|
* 1. Внешняя агрегация со сбросом данных на диск.
|
|
|
|
|
* Частично агрегированные данные (предварительно разбитые на 256 корзин) сброшены в какое-то количество файлов на диске.
|
|
|
|
|
* Нужно читать их и мерджить по корзинам - держа в оперативке одновременно только несколько корзин из каждого файла.
|
|
|
|
|
*
|
|
|
|
|
* 2. Слияние результатов агрегации при распределённой обработке запроса.
|
|
|
|
|
* С разных серверов приезжают частично агрегированные данные, которые могут быть разбиты, а могут быть не разбиты на 256 корзин,
|
|
|
|
|
* и эти корзины отдаются нам по сети с каждого сервера последовательно, друг за другом.
|
|
|
|
|
* Надо так же читать и мерджить по корзинам.
|
|
|
|
|
*
|
|
|
|
|
* Суть работы:
|
|
|
|
|
*
|
|
|
|
|
* Есть какое-то количество источников. Они отдают блоки с частично агрегированными данными.
|
|
|
|
|
* Каждый источник может отдать одну из следующих последовательностей блоков:
|
|
|
|
|
* 1. "неразрезанный" блок с bucket_num = -1;
|
|
|
|
|
* 2. "разрезанные" (two_level) блоки с bucket_num от 0 до 255;
|
|
|
|
|
* В обоих случаях, может ещё присутствовать блок "переполнений" (overflows) с bucket_num = -1 и is_overflows = true;
|
|
|
|
|
*
|
|
|
|
|
* Исходим из соглашения, что разрезанные блоки всегда передаются в порядке bucket_num.
|
|
|
|
|
* То есть, если a < b, то блок с bucket_num = a идёт раньше bucket_num = b.
|
|
|
|
|
* Это нужно для экономного по памяти слияния
|
|
|
|
|
* - чтобы не надо было читать блоки наперёд, а идти по всем последовательностям по возрастанию bucket_num.
|
|
|
|
|
*
|
|
|
|
|
* При этом, не все bucket_num из диапазона 0..255 могут присутствовать.
|
|
|
|
|
* Блок переполнений может присутствовать в любом порядке относительно других блоков (но он может быть только один).
|
|
|
|
|
*
|
|
|
|
|
* Необходимо объединить эти последовательности блоков и отдать результат в виде последовательности с такими же свойствами.
|
|
|
|
|
* То есть, на выходе, если в последовательности есть "разрезанные" блоки, то они должны идти в порядке bucket_num.
|
|
|
|
|
*
|
|
|
|
|
* Мердж можно осуществлять с использованием нескольких (merging_threads) потоков.
|
|
|
|
|
* Для этого, получение набора блоков для следующего bucket_num надо делать последовательно,
|
|
|
|
|
* а затем, когда мы имеем несколько полученных наборов, их объединение можно делать параллельно.
|
|
|
|
|
*
|
|
|
|
|
* При получении следующих блоков из разных источников,
|
|
|
|
|
* данные из источников можно также читать в несколько потоков (reading_threads)
|
|
|
|
|
* для оптимальной работы при наличии быстрой сети или дисков (откуда эти блоки читаются).
|
2015-07-30 23:41:02 +00:00
|
|
|
|
*/
|
|
|
|
|
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
|
|
|
|
|
{
|
|
|
|
|
public:
|
2015-12-01 22:35:48 +00:00
|
|
|
|
MergingAggregatedMemoryEfficientBlockInputStream(
|
2015-12-05 04:20:37 +00:00
|
|
|
|
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_,
|
|
|
|
|
size_t reading_threads_, size_t merging_threads_);
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
~MergingAggregatedMemoryEfficientBlockInputStream() override;
|
2015-12-05 03:04:13 +00:00
|
|
|
|
|
2015-09-07 20:08:02 +00:00
|
|
|
|
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-09-08 19:53:16 +00:00
|
|
|
|
String getID() const override;
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
|
|
|
|
|
void readPrefix() override;
|
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
/// Вызывается либо после того, как всё прочитано, либо после cancel-а.
|
|
|
|
|
void readSuffix() override;
|
|
|
|
|
|
|
|
|
|
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
|
|
|
|
|
* пропуская отвалившиеся по эксепшену.
|
|
|
|
|
*/
|
|
|
|
|
void cancel() override;
|
|
|
|
|
|
2015-07-30 23:41:02 +00:00
|
|
|
|
protected:
|
2015-09-08 19:53:16 +00:00
|
|
|
|
Block readImpl() override;
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
|
|
|
|
private:
|
2015-12-13 15:07:01 +00:00
|
|
|
|
static constexpr size_t NUM_BUCKETS = 256;
|
|
|
|
|
|
2015-07-30 23:41:02 +00:00
|
|
|
|
Aggregator aggregator;
|
|
|
|
|
bool final;
|
2015-12-05 04:20:37 +00:00
|
|
|
|
size_t reading_threads;
|
|
|
|
|
size_t merging_threads;
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-09-07 07:40:14 +00:00
|
|
|
|
bool started = false;
|
2015-12-13 15:07:01 +00:00
|
|
|
|
bool all_read = false;
|
2016-07-31 03:53:16 +00:00
|
|
|
|
std::atomic<bool> has_two_level {false};
|
|
|
|
|
std::atomic<bool> has_overflows {false};
|
2015-09-07 07:40:14 +00:00
|
|
|
|
int current_bucket_num = -1;
|
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
Logger * log = &Logger::get("MergingAggregatedMemoryEfficientBlockInputStream");
|
|
|
|
|
|
|
|
|
|
|
2015-09-07 07:40:14 +00:00
|
|
|
|
struct Input
|
|
|
|
|
{
|
|
|
|
|
BlockInputStreamPtr stream;
|
|
|
|
|
Block block;
|
|
|
|
|
Block overflow_block;
|
|
|
|
|
std::vector<Block> splitted_blocks;
|
|
|
|
|
bool is_exhausted = false;
|
|
|
|
|
|
|
|
|
|
Input(BlockInputStreamPtr & stream_) : stream(stream_) {}
|
|
|
|
|
};
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
2015-09-07 07:40:14 +00:00
|
|
|
|
std::vector<Input> inputs;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2016-05-28 08:23:36 +00:00
|
|
|
|
using BlocksToMerge = std::unique_ptr<BlocksList>;
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-05 04:20:37 +00:00
|
|
|
|
void start();
|
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
/// Получить блоки, которые можно мерджить. Это позволяет мерджить их параллельно в отдельных потоках.
|
|
|
|
|
BlocksToMerge getNextBlocksToMerge();
|
|
|
|
|
|
2016-08-02 01:46:05 +00:00
|
|
|
|
std::unique_ptr<ThreadPool> reading_pool;
|
2015-12-05 04:20:37 +00:00
|
|
|
|
|
2015-12-01 22:35:48 +00:00
|
|
|
|
/// Для параллельного мерджа.
|
|
|
|
|
|
|
|
|
|
struct ParallelMergeData
|
|
|
|
|
{
|
2016-08-02 01:46:05 +00:00
|
|
|
|
ThreadPool pool;
|
2016-09-19 05:01:32 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
/// Сейчас один из мерджащих потоков получает следующие блоки для мерджа. Эта операция должна делаться последовательно.
|
2015-12-01 22:35:48 +00:00
|
|
|
|
std::mutex get_next_blocks_mutex;
|
2016-09-19 05:01:32 +00:00
|
|
|
|
|
|
|
|
|
std::atomic<bool> exhausted {false}; /// No more source data.
|
|
|
|
|
std::atomic<bool> finish {false}; /// Need to terminate early.
|
2015-12-01 22:35:48 +00:00
|
|
|
|
|
2015-12-13 15:07:01 +00:00
|
|
|
|
std::exception_ptr exception;
|
|
|
|
|
/// Следует отдавать блоки стого в порядке ключа (bucket_num).
|
|
|
|
|
/// Если значение - пустой блок - то нужно дождаться его мерджа.
|
|
|
|
|
/// (Такое значение означает обещание, что здесь будут данные. Это важно, потому что данные нужно отдавать в порядке ключа - bucket_num)
|
|
|
|
|
std::map<int, Block> merged_blocks;
|
|
|
|
|
std::mutex merged_blocks_mutex;
|
|
|
|
|
/// Событие, с помощью которого мерджащие потоки говорят главному потоку, что новый блок готов.
|
|
|
|
|
std::condition_variable merged_blocks_changed;
|
|
|
|
|
/// Событие, с помощью которого главный поток говорят мерджащим потокам, что можно обработать следующую группу блоков.
|
|
|
|
|
std::condition_variable have_space;
|
|
|
|
|
|
|
|
|
|
ParallelMergeData(size_t max_threads) : pool(max_threads) {}
|
2015-12-01 22:35:48 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<ParallelMergeData> parallel_merge_data;
|
|
|
|
|
|
|
|
|
|
void mergeThread(MemoryTracker * memory_tracker);
|
2015-12-13 15:07:01 +00:00
|
|
|
|
|
|
|
|
|
void finalize();
|
2015-07-30 23:41:02 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|