mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
dbms: external aggregation: better scaling [#METR-17000].
This commit is contained in:
parent
810edc775d
commit
ccec8e4adc
@ -4,6 +4,7 @@
|
||||
#include <DB/Interpreters/Aggregator.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DB/Common/ConcurrentBoundedQueue.h>
|
||||
#include <condition_variable>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -11,9 +12,47 @@ namespace DB
|
||||
|
||||
|
||||
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному или несколько (до merging_threads) блоков из каждого источника.
|
||||
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата.
|
||||
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом источнике будет до 256 блоков с частями результата.
|
||||
*
|
||||
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
|
||||
*
|
||||
* Используется для решения двух задач:
|
||||
*
|
||||
* 1. Внешняя агрегация со сбросом данных на диск.
|
||||
* Частично агрегированные данные (предварительно разбитые на 256 корзин) сброшены в какое-то количество файлов на диске.
|
||||
* Нужно читать их и мерджить по корзинам - держа в оперативке одновременно только несколько корзин из каждого файла.
|
||||
*
|
||||
* 2. Слияние результатов агрегации при распределённой обработке запроса.
|
||||
* С разных серверов приезжают частично агрегированные данные, которые могут быть разбиты, а могут быть не разбиты на 256 корзин,
|
||||
* и эти корзины отдаются нам по сети с каждого сервера последовательно, друг за другом.
|
||||
* Надо так же читать и мерджить по корзинам.
|
||||
*
|
||||
* Суть работы:
|
||||
*
|
||||
* Есть какое-то количество источников. Они отдают блоки с частично агрегированными данными.
|
||||
* Каждый источник может отдать одну из следующих последовательностей блоков:
|
||||
* 1. "неразрезанный" блок с bucket_num = -1;
|
||||
* 2. "разрезанные" (two_level) блоки с bucket_num от 0 до 255;
|
||||
* В обоих случаях, может ещё присутствовать блок "переполнений" (overflows) с bucket_num = -1 и is_overflows = true;
|
||||
*
|
||||
* Исходим из соглашения, что разрезанные блоки всегда передаются в порядке bucket_num.
|
||||
* То есть, если a < b, то блок с bucket_num = a идёт раньше bucket_num = b.
|
||||
* Это нужно для экономного по памяти слияния
|
||||
* - чтобы не надо было читать блоки наперёд, а идти по всем последовательностям по возрастанию bucket_num.
|
||||
*
|
||||
* При этом, не все bucket_num из диапазона 0..255 могут присутствовать.
|
||||
* Блок переполнений может присутствовать в любом порядке относительно других блоков (но он может быть только один).
|
||||
*
|
||||
* Необходимо объединить эти последовательности блоков и отдать результат в виде последовательности с такими же свойствами.
|
||||
* То есть, на выходе, если в последовательности есть "разрезанные" блоки, то они должны идти в порядке bucket_num.
|
||||
*
|
||||
* Мердж можно осуществлять с использованием нескольких (merging_threads) потоков.
|
||||
* Для этого, получение набора блоков для следующего bucket_num надо делать последовательно,
|
||||
* а затем, когда мы имеем несколько полученных наборов, их объединение можно делать параллельно.
|
||||
*
|
||||
* При получении следующих блоков из разных источников,
|
||||
* данные из источников можно также читать в несколько потоков (reading_threads)
|
||||
* для оптимальной работы при наличии быстрой сети или дисков (откуда эти блоки читаются).
|
||||
*/
|
||||
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
@ -22,7 +61,7 @@ public:
|
||||
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_,
|
||||
size_t reading_threads_, size_t merging_threads_);
|
||||
|
||||
~MergingAggregatedMemoryEfficientBlockInputStream();
|
||||
~MergingAggregatedMemoryEfficientBlockInputStream() override;
|
||||
|
||||
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
|
||||
|
||||
@ -31,20 +70,34 @@ public:
|
||||
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
|
||||
void readPrefix() override;
|
||||
|
||||
/// Вызывается либо после того, как всё прочитано, либо после cancel-а.
|
||||
void readSuffix() override;
|
||||
|
||||
/** Отличается от реализации по-умолчанию тем, что пытается остановить все источники,
|
||||
* пропуская отвалившиеся по эксепшену.
|
||||
*/
|
||||
void cancel() override;
|
||||
|
||||
protected:
|
||||
Block readImpl() override;
|
||||
|
||||
private:
|
||||
static constexpr size_t NUM_BUCKETS = 256;
|
||||
|
||||
Aggregator aggregator;
|
||||
bool final;
|
||||
size_t reading_threads;
|
||||
size_t merging_threads;
|
||||
|
||||
bool started = false;
|
||||
bool all_read = false;
|
||||
volatile bool has_two_level = false;
|
||||
volatile bool has_overflows = false;
|
||||
int current_bucket_num = -1;
|
||||
|
||||
Logger * log = &Logger::get("MergingAggregatedMemoryEfficientBlockInputStream");
|
||||
|
||||
|
||||
struct Input
|
||||
{
|
||||
BlockInputStreamPtr stream;
|
||||
@ -68,31 +121,34 @@ private:
|
||||
std::unique_ptr<boost::threadpool::pool> reading_pool;
|
||||
|
||||
/// Для параллельного мерджа.
|
||||
struct OutputData
|
||||
{
|
||||
Block block;
|
||||
std::exception_ptr exception;
|
||||
|
||||
OutputData() {}
|
||||
OutputData(Block && block_) : block(std::move(block_)) {}
|
||||
OutputData(std::exception_ptr && exception_) : exception(std::move(exception_)) {}
|
||||
};
|
||||
|
||||
struct ParallelMergeData
|
||||
{
|
||||
boost::threadpool::pool pool;
|
||||
/// Сейчас один из мерджащих потоков получает следующие блоки для мерджа. Эта операция должна делаться последовательно.
|
||||
std::mutex get_next_blocks_mutex;
|
||||
ConcurrentBoundedQueue<OutputData> result_queue;
|
||||
bool exhausted = false; /// Данных больше нет.
|
||||
bool finish = false; /// Нужно завершить работу раньше, чем данные закончились.
|
||||
std::atomic<size_t> active_threads;
|
||||
|
||||
ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads), active_threads(max_threads) {}
|
||||
std::exception_ptr exception;
|
||||
/// Следует отдавать блоки стого в порядке ключа (bucket_num).
|
||||
/// Если значение - пустой блок - то нужно дождаться его мерджа.
|
||||
/// (Такое значение означает обещание, что здесь будут данные. Это важно, потому что данные нужно отдавать в порядке ключа - bucket_num)
|
||||
std::map<int, Block> merged_blocks;
|
||||
std::mutex merged_blocks_mutex;
|
||||
/// Событие, с помощью которого мерджащие потоки говорят главному потоку, что новый блок готов.
|
||||
std::condition_variable merged_blocks_changed;
|
||||
/// Событие, с помощью которого главный поток говорят мерджащим потокам, что можно обработать следующую группу блоков.
|
||||
std::condition_variable have_space;
|
||||
|
||||
ParallelMergeData(size_t max_threads) : pool(max_threads) {}
|
||||
};
|
||||
|
||||
std::unique_ptr<ParallelMergeData> parallel_merge_data;
|
||||
|
||||
void mergeThread(MemoryTracker * memory_tracker);
|
||||
|
||||
void finalize();
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -34,6 +34,53 @@ void MergingAggregatedMemoryEfficientBlockInputStream::readPrefix()
|
||||
}
|
||||
|
||||
|
||||
void MergingAggregatedMemoryEfficientBlockInputStream::readSuffix()
|
||||
{
|
||||
if (!all_read && !is_cancelled.load(std::memory_order_seq_cst))
|
||||
throw Exception("readSuffix called before all data is read", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
finalize();
|
||||
|
||||
for (size_t i = 0; i < children.size(); ++i)
|
||||
children[i]->readSuffix();
|
||||
}
|
||||
|
||||
|
||||
void MergingAggregatedMemoryEfficientBlockInputStream::cancel()
|
||||
{
|
||||
bool old_val = false;
|
||||
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
|
||||
return;
|
||||
|
||||
if (parallel_merge_data)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
||||
|
||||
parallel_merge_data->finish = true;
|
||||
parallel_merge_data->merged_blocks_changed.notify_one();
|
||||
}
|
||||
|
||||
for (auto & input : inputs)
|
||||
{
|
||||
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(input.stream.get()))
|
||||
{
|
||||
try
|
||||
{
|
||||
child->cancel();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/** Если не удалось попросить остановиться одного или несколько источников.
|
||||
* (например, разорвано соединение при распределённой обработке запроса)
|
||||
* - то пофиг.
|
||||
*/
|
||||
LOG_ERROR(log, "Exception while cancelling " << child->getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void MergingAggregatedMemoryEfficientBlockInputStream::start()
|
||||
{
|
||||
if (started)
|
||||
@ -73,6 +120,23 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
|
||||
for (auto & task : tasks)
|
||||
task.get_future().get();
|
||||
}
|
||||
|
||||
if (merging_threads > 1)
|
||||
{
|
||||
/** Создадим несколько потоков. Каждый из них в цикле будет доставать следующий набор блоков для мерджа,
|
||||
* затем мерджить их и класть результат в очередь, откуда мы будем читать готовые результаты.
|
||||
*/
|
||||
parallel_merge_data.reset(new ParallelMergeData(merging_threads));
|
||||
|
||||
auto & pool = parallel_merge_data->pool;
|
||||
|
||||
/** Создаём потоки, которые будут получать и мерджить данные.
|
||||
*/
|
||||
|
||||
for (size_t i = 0; i < merging_threads; ++i)
|
||||
pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread,
|
||||
this, current_memory_tracker));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -88,55 +152,74 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
|
||||
}
|
||||
else
|
||||
{
|
||||
/** Создадим несколько потоков. Каждый из них в цикле будет доставать следующий набор блоков для мерджа,
|
||||
* затем мерджить их и класть результат в очередь, откуда мы будем читать готовые результаты.
|
||||
*/
|
||||
Block res;
|
||||
|
||||
if (!parallel_merge_data)
|
||||
while (true)
|
||||
{
|
||||
parallel_merge_data.reset(new ParallelMergeData(merging_threads));
|
||||
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
||||
|
||||
auto & pool = parallel_merge_data->pool;
|
||||
if (parallel_merge_data->exception)
|
||||
std::rethrow_exception(parallel_merge_data->exception);
|
||||
|
||||
/** Создаём потоки, которые будут получать и мерджить данные.
|
||||
*/
|
||||
if (parallel_merge_data->finish)
|
||||
break;
|
||||
|
||||
for (size_t i = 0; i < merging_threads; ++i)
|
||||
pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread,
|
||||
this, current_memory_tracker));
|
||||
if (!parallel_merge_data->merged_blocks.empty())
|
||||
{
|
||||
auto it = parallel_merge_data->merged_blocks.begin();
|
||||
|
||||
if (it->second)
|
||||
{
|
||||
res.swap(it->second);
|
||||
parallel_merge_data->merged_blocks.erase(it);
|
||||
parallel_merge_data->have_space.notify_one();
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if (parallel_merge_data->exhausted)
|
||||
break;
|
||||
|
||||
parallel_merge_data->merged_blocks_changed.wait(lock);
|
||||
}
|
||||
|
||||
OutputData res;
|
||||
parallel_merge_data->result_queue.pop(res);
|
||||
if (!res)
|
||||
all_read = true;
|
||||
|
||||
if (res.exception)
|
||||
std::rethrow_exception(res.exception);
|
||||
|
||||
if (!res.block)
|
||||
parallel_merge_data->pool.wait();
|
||||
|
||||
return res.block;
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
MergingAggregatedMemoryEfficientBlockInputStream::~MergingAggregatedMemoryEfficientBlockInputStream()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!all_read)
|
||||
cancel();
|
||||
|
||||
finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void MergingAggregatedMemoryEfficientBlockInputStream::finalize()
|
||||
{
|
||||
if (!started)
|
||||
return;
|
||||
|
||||
LOG_TRACE(log, "Waiting for threads to finish");
|
||||
|
||||
if (reading_pool)
|
||||
reading_pool->wait();
|
||||
|
||||
if (parallel_merge_data)
|
||||
{
|
||||
LOG_TRACE((&Logger::get("MergingAggregatedMemoryEfficientBlockInputStream")), "Waiting for threads to finish");
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
|
||||
parallel_merge_data->finish = true;
|
||||
}
|
||||
|
||||
parallel_merge_data->result_queue.clear();
|
||||
parallel_merge_data->pool.wait();
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Waited for threads to finish");
|
||||
}
|
||||
|
||||
|
||||
@ -147,7 +230,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
|
||||
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
while (!parallel_merge_data->finish)
|
||||
{
|
||||
/** Получение следующих блоков делается в одном пуле потоков, а мердж - в другом.
|
||||
* Это весьма сложное взаимодействие.
|
||||
@ -157,6 +240,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
|
||||
* - один из merging_threads выполняет слияние этой группы блоков;
|
||||
*/
|
||||
BlocksToMerge blocks_to_merge;
|
||||
int output_order = -1;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
|
||||
@ -168,32 +252,53 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker
|
||||
|
||||
if (!blocks_to_merge || blocks_to_merge->empty())
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
||||
|
||||
parallel_merge_data->exhausted = true;
|
||||
parallel_merge_data->merged_blocks_changed.notify_one();
|
||||
break;
|
||||
}
|
||||
|
||||
output_order = blocks_to_merge->front().info.is_overflows
|
||||
? NUM_BUCKETS /// Блоки "переполнений" отдаются функцией getNextBlocksToMerge позже всех остальных.
|
||||
: blocks_to_merge->front().info.bucket_num;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
||||
|
||||
while (parallel_merge_data->merged_blocks.size() >= merging_threads)
|
||||
parallel_merge_data->have_space.wait(lock);
|
||||
|
||||
/** Кладём пустой блок, что означает обещание его заполнить.
|
||||
* Основной поток должен возвращать результаты строго в порядке output_order, поэтому это важно.
|
||||
*/
|
||||
parallel_merge_data->merged_blocks[output_order];
|
||||
}
|
||||
}
|
||||
|
||||
Block res = aggregator.mergeBlocks(*blocks_to_merge, final);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
|
||||
std::lock_guard<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
||||
|
||||
if (parallel_merge_data->finish)
|
||||
break;
|
||||
|
||||
parallel_merge_data->result_queue.push(OutputData(std::move(res)));
|
||||
parallel_merge_data->merged_blocks[output_order] = res;
|
||||
parallel_merge_data->merged_blocks_changed.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
parallel_merge_data->result_queue.push(std::current_exception());
|
||||
return;
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(parallel_merge_data->merged_blocks_mutex);
|
||||
parallel_merge_data->exception = std::current_exception();
|
||||
parallel_merge_data->merged_blocks_changed.notify_one();
|
||||
}
|
||||
|
||||
/// Последний поток при выходе сообщает, что данных больше нет.
|
||||
if (0 == --parallel_merge_data->active_threads)
|
||||
parallel_merge_data->result_queue.push(Block());
|
||||
cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -217,9 +322,6 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
|
||||
* Это дополнительные данные для строк, не прошедших через max_rows_to_group_by.
|
||||
* Они должны объединяться друг с другом отдельно.
|
||||
*/
|
||||
|
||||
constexpr size_t NUM_BUCKETS = 256;
|
||||
|
||||
++current_bucket_num;
|
||||
|
||||
/// Получить из источника следующий блок с номером корзины не больше current_bucket_num.
|
||||
|
Loading…
Reference in New Issue
Block a user