2013-09-15 10:53:10 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
#include <statdaemons/threadpool.hpp>
|
|
|
|
|
|
|
|
|
|
#include <DB/Common/PODArray.h>
|
|
|
|
|
#include <DB/Interpreters/Aggregator.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** Агрегирует источник блоков параллельно в нескольких потоках.
|
|
|
|
|
* Распараллеливание производится для каждого блока.
|
|
|
|
|
* Для этого, сначала параллельно вычисляет хэши от ключей всех строчек блока,
|
2013-09-16 05:55:13 +00:00
|
|
|
|
* затем агрегирует строчки с разными диапазонами хэшей в разные хэш-таблицы
|
|
|
|
|
* (для разделения по хэш-таблицам используются другие биты ключа или другие хэш функции, чем те, что внутри хэш-таблиц).
|
2013-09-15 10:53:10 +00:00
|
|
|
|
* Получится, что эти хэш-таблицы будут содержать разные ключи, и их не потребуется объединять.
|
|
|
|
|
*
|
2013-09-16 05:55:13 +00:00
|
|
|
|
* Хорошо работает при большом размере результата агрегации (количестве уникальных ключей)
|
|
|
|
|
* - линейно масштабируется по количеству потоков.
|
|
|
|
|
*
|
2013-09-15 10:53:10 +00:00
|
|
|
|
* Не работает при числе потоков больше 256.
|
|
|
|
|
* Плохо работает при размере хэш-таблиц больше 2^32 элементов.
|
2013-09-16 05:55:13 +00:00
|
|
|
|
*
|
|
|
|
|
* TODO:
|
|
|
|
|
* - поддержка with_totals;
|
|
|
|
|
* - проверить работу при распределённой обработке запроса;
|
|
|
|
|
* - починить rows_before_limit_at_least;
|
|
|
|
|
* - минимальное количество строк на один поток; если в блоке мало строк - читать и обрабатывать несколько блоков сразу;
|
|
|
|
|
* - определиться, в каких случаях следует использовать этот агрегатор, а в каких - нет.
|
2013-09-15 10:53:10 +00:00
|
|
|
|
*/
|
|
|
|
|
class SplittingAggregator : private Aggregator
|
|
|
|
|
{
|
|
|
|
|
public:
|
2013-11-04 00:49:37 +00:00
|
|
|
|
SplittingAggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, size_t threads_,
|
2014-02-17 23:56:45 +00:00
|
|
|
|
bool with_totals_, size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
|
2013-11-04 00:49:37 +00:00
|
|
|
|
: Aggregator(keys_, aggregates_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_), threads(threads_), pool(threads),
|
2013-09-15 10:53:10 +00:00
|
|
|
|
log(&Logger::get("SplittingAggregator")), method(AggregatedDataVariants::EMPTY),
|
2013-11-04 00:49:37 +00:00
|
|
|
|
key_columns(keys_size), aggregate_columns(aggregates_size), rows(0), src_rows(0), src_bytes(0), size_of_all_results(0)
|
2013-09-15 10:53:10 +00:00
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2013-11-04 00:49:37 +00:00
|
|
|
|
SplittingAggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, size_t threads_,
|
2014-02-17 23:56:45 +00:00
|
|
|
|
bool with_totals_, size_t max_rows_to_group_by_ = 0, OverflowMode group_by_overflow_mode_ = OverflowMode::THROW)
|
2013-11-04 00:49:37 +00:00
|
|
|
|
: Aggregator(key_names_, aggregates_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_), threads(threads_), pool(threads),
|
2013-09-15 10:53:10 +00:00
|
|
|
|
log(&Logger::get("SplittingAggregator")), method(AggregatedDataVariants::EMPTY),
|
2013-11-04 00:49:37 +00:00
|
|
|
|
key_columns(keys_size), aggregate_columns(aggregates_size), rows(0), src_rows(0), src_bytes(0), size_of_all_results(0)
|
2013-09-15 10:53:10 +00:00
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Агрегировать источник. Получить результат в виде одной из структур данных.
|
|
|
|
|
void execute(BlockInputStreamPtr stream, ManyAggregatedDataVariants & results);
|
|
|
|
|
|
2013-11-03 23:35:18 +00:00
|
|
|
|
void convertToBlocks(ManyAggregatedDataVariants & data_variants, Blocks & blocks, bool final);
|
2013-09-15 10:53:10 +00:00
|
|
|
|
|
|
|
|
|
String getID() const { return Aggregator::getID(); }
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
size_t threads;
|
|
|
|
|
boost::threadpool::pool pool;
|
|
|
|
|
|
|
|
|
|
/// Вычисленные значения ключей и хэшей хэш-таблицы.
|
|
|
|
|
PODArray<UInt64> keys64;
|
|
|
|
|
PODArray<UInt64> hashes64;
|
|
|
|
|
PODArray<UInt128> keys128;
|
|
|
|
|
PODArray<UInt128> hashes128;
|
|
|
|
|
PODArray<StringRef> string_refs;
|
|
|
|
|
|
|
|
|
|
PODArray<UInt8> thread_nums;
|
|
|
|
|
|
|
|
|
|
Logger * log;
|
|
|
|
|
|
|
|
|
|
/// Каким способом выполняется агрегация.
|
|
|
|
|
AggregatedDataVariants::Type method;
|
|
|
|
|
|
|
|
|
|
ConstColumnPlainPtrs key_columns;
|
|
|
|
|
|
|
|
|
|
typedef std::vector<ConstColumnPlainPtrs> AggregateColumns;
|
|
|
|
|
AggregateColumns aggregate_columns;
|
|
|
|
|
|
|
|
|
|
size_t rows;
|
|
|
|
|
|
|
|
|
|
size_t src_rows;
|
|
|
|
|
size_t src_bytes;
|
|
|
|
|
|
|
|
|
|
Sizes key_sizes;
|
|
|
|
|
|
|
|
|
|
StringRefHash hash_func_string;
|
|
|
|
|
|
2013-11-04 00:49:37 +00:00
|
|
|
|
/// Для более точного контроля max_rows_to_group_by.
|
|
|
|
|
size_t size_of_all_results;
|
2013-09-15 10:53:10 +00:00
|
|
|
|
|
2014-05-03 22:57:43 +00:00
|
|
|
|
void calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception, MemoryTracker * memory_tracker);
|
|
|
|
|
void aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception, MemoryTracker * memory_tracker);
|
|
|
|
|
void convertToBlockThread(AggregatedDataVariants & data_variant, Block & block, bool final, ExceptionPtr & exception, MemoryTracker * memory_tracker);
|
2013-09-15 10:53:10 +00:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|