This commit is contained in:
Vsevolod Orlov 2015-12-02 13:40:50 +03:00
commit 23d34bbbe1
27 changed files with 833 additions and 310 deletions

View File

@ -1,7 +1,11 @@
#pragma once
#include <DB/Interpreters/Aggregator.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <common/Revision.h>
namespace DB
@ -22,12 +26,8 @@ public:
* Агрегатные функции ищутся везде в выражении.
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.
*/
AggregatingBlockInputStream(BlockInputStreamPtr input_, const Names & key_names, const AggregateDescriptions & aggregates,
bool overflow_row_, bool final_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_)
: aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_,
compiler_, min_count_to_compile_, group_by_two_level_threshold_),
final(final_)
AggregatingBlockInputStream(BlockInputStreamPtr input_, const Aggregator::Params & params_, bool final_)
: params(params_), aggregator(params), final(final_)
{
children.push_back(input_);
}
@ -44,12 +44,28 @@ public:
protected:
Block readImpl() override;
Aggregator::Params params;
Aggregator aggregator;
bool final;
bool executed = false;
BlocksList blocks;
BlocksList::iterator it;
/// Для чтения сброшенных во временный файл данных.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(new NativeBlockInputStream(compressed_in, Revision::get())) {}
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
/** Отсюда будем доставать готовые блоки после агрегации. */
std::unique_ptr<IBlockInputStream> impl;
Logger * log = &Logger::get("AggregatingBlockInputStream");
};
}

View File

@ -0,0 +1,49 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Поток блоков, из которого можно прочитать следующий блок из явно предоставленного списка.
* Также смотрите OneBlockInputStream.
*/
class BlocksListBlockInputStream : public IProfilingBlockInputStream
{
public:
/// Захватывает владение списком блоков.
BlocksListBlockInputStream(BlocksList && list_)
: list(std::move(list_)), it(list.begin()), end(list.end()) {}
/// Использует лежащий где-то ещё список блоков.
BlocksListBlockInputStream(BlocksList::iterator & begin_, BlocksList::iterator & end_)
: it(begin_), end(end_) {}
String getName() const override { return "BlocksList"; }
String getID() const override
{
std::stringstream res;
res << this;
return res.str();
}
protected:
Block readImpl() override
{
if (it == end)
return Block();
Block res = *it;
++it;
return res;
}
private:
BlocksList list;
BlocksList::iterator it;
const BlocksList::iterator end;
};
}

View File

@ -16,10 +16,8 @@ using Poco::SharedPtr;
class MergingAggregatedBlockInputStream : public IProfilingBlockInputStream
{
public:
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_, size_t max_threads_)
: aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),
final(final_), max_threads(max_threads_)
MergingAggregatedBlockInputStream(BlockInputStreamPtr input_, const Aggregator::Params & params, bool final_, size_t max_threads_)
: aggregator(params), final(final_), max_threads(max_threads_)
{
children.push_back(input_);
}

View File

@ -1,7 +1,9 @@
#pragma once
#include <common/threadpool.hpp>
#include <DB/Interpreters/Aggregator.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/ConcurrentBoundedQueue.h>
namespace DB
@ -19,18 +21,14 @@ namespace DB
* удалённых серверов делаются последовательно, при этом, чтение упирается в CPU.
* Это несложно исправить.
*
* Также, чтения и вычисления (слияние состояний) делаются по очереди.
* Есть возможность делать чтения асинхронно - при этом будет расходоваться в два раза больше памяти, но всё-равно немного.
* Это можно сделать с помощью UnionBlockInputStream.
*
* Можно держать в памяти не по одному блоку из каждого источника, а по несколько, и распараллелить мердж.
* При этом будет расходоваться кратно больше оперативки.
*/
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
{
public:
MergingAggregatedMemoryEfficientBlockInputStream(BlockInputStreams inputs_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_);
MergingAggregatedMemoryEfficientBlockInputStream(
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t threads_);
String getName() const override { return "MergingAggregatedMemoryEfficient"; }
@ -42,6 +40,7 @@ protected:
private:
Aggregator aggregator;
bool final;
size_t threads;
bool started = false;
bool has_two_level = false;
@ -60,6 +59,36 @@ private:
};
std::vector<Input> inputs;
using BlocksToMerge = Poco::SharedPtr<BlocksList>;
/// Получить блоки, которые можно мерджить. Это позволяет мерджить их параллельно в отдельных потоках.
BlocksToMerge getNextBlocksToMerge();
/// Для параллельного мерджа.
struct OutputData
{
Block block;
std::exception_ptr exception;
OutputData() {}
OutputData(Block && block_) : block(std::move(block_)) {}
OutputData(std::exception_ptr && exception_) : exception(std::move(exception_)) {}
};
struct ParallelMergeData
{
boost::threadpool::pool pool;
std::mutex get_next_blocks_mutex;
ConcurrentBoundedQueue<OutputData> result_queue;
bool exhausted = false;
ParallelMergeData(size_t max_threads) : pool(max_threads), result_queue(max_threads) {}
};
std::unique_ptr<ParallelMergeData> parallel_merge_data;
void mergeThread(MemoryTracker * memory_tracker);
};
}

View File

@ -1,17 +1,13 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
using Poco::SharedPtr;
/** Поток блоков, из которого можно прочитать один блок.
* Также смотрите BlocksListBlockInputStream.
*/
class OneBlockInputStream : public IProfilingBlockInputStream
{

View File

@ -1,8 +1,14 @@
#pragma once
#include <DB/Interpreters/Aggregator.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/BlocksListBlockInputStream.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DB/DataStreams/ParallelInputsProcessor.h>
#include <common/Revision.h>
namespace DB
@ -23,14 +29,10 @@ public:
*/
ParallelAggregatingBlockInputStream(
BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end,
const Names & key_names, const AggregateDescriptions & aggregates,
bool overflow_row_, bool final_, size_t max_threads_,
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_, size_t group_by_two_level_threshold_)
: aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_,
compiler_, min_count_to_compile_, group_by_two_level_threshold_),
const Aggregator::Params & params_, bool final_, size_t max_threads_)
: params(params_), aggregator(params),
final(final_), max_threads(std::min(inputs.size(), max_threads_)),
keys_size(aggregator.getNumberOfKeys()), aggregates_size(aggregator.getNumberOfAggregates()),
keys_size(params.keys_size), aggregates_size(params.aggregates_size),
handler(*this), processor(inputs, additional_input_at_end, max_threads, handler)
{
children = inputs;
@ -78,25 +80,53 @@ protected:
Aggregator::CancellationHook hook = [&]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
AggregatedDataVariantsPtr data_variants = executeAndMerge();
execute();
if (data_variants)
blocks = aggregator.convertToBlocks(*data_variants, final, max_threads);
if (isCancelled())
return {};
it = blocks.begin();
if (!aggregator.hasTemporaryFiles())
{
/** Если все частично-агрегированные данные в оперативке, то мерджим их параллельно, тоже в оперативке.
* NOTE Если израсходовано больше половины допустимой памяти, то мерджить следовало бы более экономно.
*/
AggregatedDataVariantsPtr data_variants = aggregator.merge(many_data, max_threads);
if (data_variants)
impl.reset(new BlocksListBlockInputStream(
aggregator.convertToBlocks(*data_variants, final, max_threads)));
}
else
{
/** Если есть временные файлы с частично-агрегированными данными на диске,
* то читаем и мерджим их, расходуя минимальное количество памяти.
*/
const auto & files = aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
for (const auto & file : files.files)
{
temporary_inputs.emplace_back(new TemporaryFileStream(file->path()));
input_streams.emplace_back(temporary_inputs.back()->block_in);
}
LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size "
<< (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
<< (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final, max_threads));
}
}
Block res;
if (isCancelled() || it == blocks.end())
if (isCancelled() || !impl)
return res;
res = *it;
++it;
return res;
return impl->read();
}
private:
Aggregator::Params params;
Aggregator aggregator;
bool final;
size_t max_threads;
@ -112,8 +142,22 @@ private:
bool no_more_keys = false;
bool executed = false;
BlocksList blocks;
BlocksList::iterator it;
/// Для чтения сброшенных во временный файл данных.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(new NativeBlockInputStream(compressed_in, Revision::get())) {}
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
/** Отсюда будем доставать готовые блоки после агрегации.
*/
std::unique_ptr<IBlockInputStream> impl;
Logger * log = &Logger::get("ParallelAggregatingBlockInputStream");
@ -159,8 +203,31 @@ private:
parent.threads_data[thread_num].src_bytes += block.bytes();
}
void onFinishThread(size_t thread_num)
{
if (parent.aggregator.hasTemporaryFiles())
{
/// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще их потом объединять.
auto & data = *parent.many_data[thread_num];
size_t rows = data.sizeWithoutOverflowRow();
if (rows)
parent.aggregator.writeToTemporaryFile(data, rows);
}
}
void onFinish()
{
if (parent.aggregator.hasTemporaryFiles())
{
/// Может так получиться, что какие-то данные ещё не сброшены на диск,
/// потому что во время вызова onFinishThread ещё никакие данные не были сброшены на диск, а потом какие-то - были.
for (auto & data : parent.many_data)
{
size_t rows = data->sizeWithoutOverflowRow();
if (rows)
parent.aggregator.writeToTemporaryFile(*data, rows);
}
}
}
void onException(std::exception_ptr & exception, size_t thread_num)
@ -176,7 +243,7 @@ private:
ParallelInputsProcessor<Handler> processor;
AggregatedDataVariantsPtr executeAndMerge()
void execute()
{
many_data.resize(max_threads);
exceptions.resize(max_threads);
@ -197,7 +264,7 @@ private:
rethrowFirstException(exceptions);
if (isCancelled())
return nullptr;
return;
double elapsed_seconds = watch.elapsedSeconds();
@ -220,11 +287,6 @@ private:
<< "Total aggregated. " << total_src_rows << " rows (from " << total_src_bytes / 1048576.0 << " MiB)"
<< " in " << elapsed_seconds << " sec."
<< " (" << total_src_rows / elapsed_seconds << " rows/sec., " << total_src_bytes / elapsed_seconds / 1048576.0 << " MiB/sec.)");
if (isCancelled())
return nullptr;
return aggregator.merge(many_data, max_threads);
}
};

View File

@ -43,6 +43,11 @@ struct ParallelInputsHandler
/// Обработка блока данных + дополнительных информаций.
void onBlock(Block & block, BlockExtraInfo & extra_info, size_t thread_num) {}
/// Вызывается для каждого потока, когда потоку стало больше нечего делать.
/// Из-за того, что иссякла часть источников, и сейчас источников осталось меньше, чем потоков.
/// Вызывается, если метод onException не кидает исключение; вызывается до метода onFinish.
void onFinishThread(size_t thread_num) {}
/// Блоки закончились. Из-за того, что все источники иссякли или из-за отмены работы.
/// Этот метод всегда вызывается ровно один раз, в конце работы, если метод onException не кидает исключение.
void onFinish() {}
@ -182,6 +187,8 @@ private:
handler.onException(exception, thread_num);
}
handler.onFinishThread(thread_num);
/// Последний поток при выходе сообщает, что данных больше нет.
if (0 == --active_threads)
{

View File

@ -271,6 +271,10 @@ private:
parent.output_queue.push(Payload());
}
void onFinishThread(size_t thread_num)
{
}
void onException(std::exception_ptr & exception, size_t thread_num)
{
//std::cerr << "pushing exception\n";

View File

@ -522,7 +522,10 @@ private:
else
{
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
map[id] = String{string_ref};
if (!cell.isDefault())
map[id] = String{string_ref};
total_length += string_ref.size + 1;
}
}
@ -551,8 +554,6 @@ private:
out->getChars().reserve(total_length);
// const auto & null_value = std::get<String>(attribute.null_values);
for (const auto row : ext::range(0, ext::size(ids)))
{
const auto id = ids[row];

View File

@ -557,7 +557,10 @@ private:
else
{
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
map[key] = String{string_ref};
if (!cell.isDefault())
map[key] = String{string_ref};
total_length += string_ref.size + 1;
}
}

View File

@ -2,6 +2,7 @@
#include <double-conversion/double-conversion.h>
namespace DB
{
@ -15,14 +16,28 @@ template <> struct DoubleToStringConverterFlags<true>
static constexpr auto flags = double_conversion::DoubleToStringConverter::EMIT_TRAILING_DECIMAL_POINT;
};
template <bool emit_decimal_point = true>
const double_conversion::DoubleToStringConverter & getDoubleToStringConverter()
template <bool emit_decimal_point>
class DoubleConverter
{
static const double_conversion::DoubleToStringConverter instance{
DoubleToStringConverterFlags<emit_decimal_point>::flags, "inf", "nan", 'e', -6, 21, 6, 1
};
DoubleConverter(const DoubleConverter &) = delete;
DoubleConverter & operator=(const DoubleConverter &) = delete;
return instance;
}
DoubleConverter() = default;
public:
/** @todo Add commentary on how this constant is deduced.
* e.g. it's minus sign, integral zero, decimal point, up to 5 leading zeros and kBase10MaximalLength digits. */
static constexpr auto MAX_REPRESENTATION_LENGTH = 26;
using BufferType = char[MAX_REPRESENTATION_LENGTH];
static const auto & instance()
{
static const double_conversion::DoubleToStringConverter instance{
DoubleToStringConverterFlags<emit_decimal_point>::flags, "inf", "nan", 'e', -6, 21, 6, 1
};
return instance;
}
};
}

View File

@ -87,28 +87,28 @@ inline void writeBoolText(bool x, WriteBuffer & buf)
inline void writeFloatText(double x, WriteBuffer & buf)
{
char tmp[25];
double_conversion::StringBuilder builder{tmp, sizeof(tmp)};
DoubleConverter<false>::BufferType buffer;
double_conversion::StringBuilder builder{buffer, sizeof(buffer)};
const auto result = getDoubleToStringConverter<false>().ToShortest(x, &builder);
const auto result = DoubleConverter<false>::instance().ToShortest(x, &builder);
if (!result)
throw Exception("Cannot print double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
buf.write(tmp, builder.position());
buf.write(buffer, builder.position());
}
inline void writeFloatText(float x, WriteBuffer & buf)
{
char tmp[25];
double_conversion::StringBuilder builder{tmp, sizeof(tmp)};
DoubleConverter<false>::BufferType buffer;
double_conversion::StringBuilder builder{buffer, sizeof(buffer)};
const auto result = getDoubleToStringConverter<false>().ToShortestSingle(x, &builder);
const auto result = DoubleConverter<false>::instance().ToShortestSingle(x, &builder);
if (!result)
throw Exception("Cannot print float number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
buf.write(tmp, builder.position());
buf.write(buffer, builder.position());
}

View File

@ -4,6 +4,8 @@
#include <memory>
#include <functional>
#include <Poco/TemporaryFile.h>
#include <common/logger_useful.h>
#include <common/threadpool.hpp>
@ -602,7 +604,7 @@ struct AggregatedDataVariants : private boost::noncopyable
};
Type type = Type::EMPTY;
AggregatedDataVariants() : aggregates_pools(1, new Arena), aggregates_pool(&*aggregates_pools.back()) {}
AggregatedDataVariants() : aggregates_pools(1, new Arena), aggregates_pool(aggregates_pools.back().get()) {}
bool empty() const { return type == Type::EMPTY; }
void invalidate() { type = Type::EMPTY; }
@ -627,6 +629,7 @@ struct AggregatedDataVariants : private boost::noncopyable
type = type_;
}
/// Количество строк (разных ключей).
size_t size() const
{
switch (type)
@ -761,18 +764,65 @@ typedef std::vector<AggregatedDataVariantsPtr> ManyAggregatedDataVariants;
class Aggregator
{
public:
Aggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_,
size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_, Compiler * compiler_, UInt32 min_count_to_compile_,
size_t group_by_two_level_threshold_)
: key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()),
overflow_row(overflow_row_),
max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
compiler(compiler_), min_count_to_compile(min_count_to_compile_), group_by_two_level_threshold(group_by_two_level_threshold_),
struct Params
{
/// Что считать.
Names key_names;
ColumnNumbers keys; /// Номера столбцов - вычисляются позже.
AggregateDescriptions aggregates;
size_t keys_size;
size_t aggregates_size;
/// Настройки приближённого вычисления GROUP BY.
const bool overflow_row; /// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by.
const size_t max_rows_to_group_by;
const OverflowMode group_by_overflow_mode;
/// Для динамической компиляции.
Compiler * compiler;
const UInt32 min_count_to_compile;
/// Настройки двухуровневой агрегации (используется для большого количества ключей).
/** При каком количестве ключей или размере состояния агрегации в байтах,
* начинает использоваться двухуровневая агрегация. Достаточно срабатывания хотя бы одного из порогов.
* 0 - соответствующий порог не задан.
*/
const size_t group_by_two_level_threshold;
const size_t group_by_two_level_threshold_bytes;
/// Настройки для сброса временных данных в файловую систему (внешняя агрегация).
const size_t max_bytes_before_external_group_by; /// 0 - не использовать внешнюю агрегацию.
const std::string tmp_path;
Params(
const Names & key_names_, const AggregateDescriptions & aggregates_,
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_,
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_, const std::string & tmp_path_)
: key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
compiler(compiler_), min_count_to_compile(min_count_to_compile_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_), tmp_path(tmp_path_)
{
std::sort(key_names.begin(), key_names.end());
key_names.erase(std::unique(key_names.begin(), key_names.end()), key_names.end());
keys_size = key_names.size();
}
/// Только параметры, имеющие значение при мердже.
Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_)
: Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "") {}
/// Вычислить номера столбцов в keys и aggregates.
void calculateColumnNumbers(const Block & block);
};
Aggregator(const Params & params_)
: params(params_),
isCancelled([]() { return false; })
{
std::sort(key_names.begin(), key_names.end());
key_names.erase(std::unique(key_names.begin(), key_names.end()), key_names.end());
keys_size = key_names.size();
}
/// Агрегировать источник. Получить результат в виде одной из структур данных.
@ -827,15 +877,32 @@ public:
/// Для IBlockInputStream.
String getID() const;
size_t getNumberOfKeys() const { return keys_size; }
size_t getNumberOfAggregates() const { return aggregates_size; }
/// Для внешней агрегации.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows);
bool hasTemporaryFiles() const { return !temporary_files.empty(); }
struct TemporaryFiles
{
std::vector<std::unique_ptr<Poco::TemporaryFile>> files;
size_t sum_size_uncompressed = 0;
size_t sum_size_compressed = 0;
mutable std::mutex mutex;
bool empty() const
{
std::lock_guard<std::mutex> lock(mutex);
return files.empty();
}
};
const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }
protected:
friend struct AggregatedDataVariants;
ColumnNumbers keys;
Names key_names;
AggregateDescriptions aggregates;
Params params;
AggregateFunctionsPlainPtrs aggregate_functions;
/** Данный массив служит для двух целей.
@ -857,31 +924,21 @@ protected:
using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>;
size_t keys_size;
size_t aggregates_size;
/// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by.
bool overflow_row;
Sizes offsets_of_aggregate_states; /// Смещение до n-ой агрегатной функции в строке из агрегатных функций.
size_t total_size_of_aggregate_states = 0; /// Суммарный размер строки из агрегатных функций.
bool all_aggregates_has_trivial_destructor = false;
/// Сколько было использовано оперативки для обработки запроса до начала обработки первого блока.
Int64 memory_usage_before_aggregation = 0;
/// Для инициализации от первого блока при конкуррентном использовании.
bool initialized = false;
std::mutex mutex;
size_t max_rows_to_group_by;
OverflowMode group_by_overflow_mode;
Block sample;
Logger * log = &Logger::get("Aggregator");
/** Для динамической компиляции, если предусмотрено. */
Compiler * compiler = nullptr;
UInt32 min_count_to_compile;
/** Динамически скомпилированная библиотека для агрегации, если есть.
* Смысл динамической компиляции в том, чтобы специализировать код
* под конкретный список агрегатных функций.
@ -902,14 +959,12 @@ protected:
bool compiled_if_possible = false;
void compileIfPossible(AggregatedDataVariants::Type type);
/** При каком количестве ключей, начинает использоваться двухуровневая агрегация.
* 0 - никогда не использовать.
*/
size_t group_by_two_level_threshold;
/// Возвращает true, если можно прервать текущую задачу.
CancellationHook isCancelled;
/// Для внешней агрегации.
TemporaryFiles temporary_files;
/** Если заданы только имена столбцов (key_names, а также aggregates[i].column_name), то вычислить номера столбцов.
* Сформировать блок - пример результата.
*/
@ -960,6 +1015,13 @@ protected:
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const;
template <typename Method>
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out,
const String & path);
public:
/// Шаблоны, инстанцирующиеся путём динамической компиляции кода - см. SpecializedAggregator.h

View File

@ -32,6 +32,7 @@ struct Limits
\
M(SettingUInt64, max_rows_to_group_by, 0) \
M(SettingOverflowMode<true>, group_by_overflow_mode, OverflowMode::THROW) \
M(SettingUInt64, max_bytes_before_external_group_by, 0) \
\
M(SettingUInt64, max_rows_to_sort, 0) \
M(SettingUInt64, max_bytes_to_sort, 0) \

View File

@ -90,8 +90,11 @@ struct Settings
M(SettingBool, compile, false) \
/** Количество одинаковых по структуре запросов перед тем, как инициируется их компиляция. */ \
M(SettingUInt64, min_count_to_compile, 3) \
/** При каком количестве ключей, начинает использоваться двухуровневая агрегация. 0 - никогда не использовать. */ \
/** При каком количестве ключей, начинает использоваться двухуровневая агрегация. 0 - порог не выставлен. */ \
M(SettingUInt64, group_by_two_level_threshold, 100000) \
/** При каком размере состояния агрегации в байтах, начинает использоваться двухуровневая агрегация. 0 - порог не выставлен. \
* Двухуровневая агрегация начинает использоваться при срабатывании хотя бы одного из порогов. */ \
M(SettingUInt64, group_by_two_level_threshold_bytes, 100000000) \
/** Включён ли экономный по памяти режим распределённой агрегации. */ \
M(SettingBool, distributed_aggregation_memory_efficient, false) \
\

View File

@ -14,15 +14,15 @@ static void formatReadable(double size, DB::WriteBuffer & out, int precision, co
for (; i + 1 < units_size && fabs(size) >= delimiter; ++i)
size /= delimiter;
char tmp[25];
double_conversion::StringBuilder builder{tmp, sizeof(tmp)};
DB::DoubleConverter<false>::BufferType buffer;
double_conversion::StringBuilder builder{buffer, sizeof(buffer)};
const auto result = DB::getDoubleToStringConverter<false>().ToFixed(size, precision, &builder);
const auto result = DB::DoubleConverter<false>::instance().ToFixed(size, precision, &builder);
if (!result)
throw DB::Exception("Cannot print float or double number", DB::ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
out.write(tmp, builder.position());
out.write(buffer, builder.position());
writeCString(units[i], out);
}

View File

@ -34,15 +34,15 @@ String FieldVisitorDump::operator() (const Array & x) const
String FieldVisitorToString::formatFloat(const Float64 x)
{
char tmp[25];
double_conversion::StringBuilder builder{tmp, sizeof(tmp)};
DoubleConverter<true>::BufferType buffer;
double_conversion::StringBuilder builder{buffer, sizeof(buffer)};
const auto result = getDoubleToStringConverter().ToShortest(x, &builder);
const auto result = DoubleConverter<true>::instance().ToShortest(x, &builder);
if (!result)
throw Exception("Cannot print float or double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
return { tmp, tmp + builder.position() };
return { buffer, buffer + builder.position() };
}
String FieldVisitorToString::operator() (const Array & x) const

View File

@ -1,5 +1,5 @@
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataStreams/BlocksListBlockInputStream.h>
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DB/DataStreams/AggregatingBlockInputStream.h>
@ -18,18 +18,44 @@ Block AggregatingBlockInputStream::readImpl()
aggregator.setCancellationHook(hook);
aggregator.execute(children.back(), data_variants);
blocks = aggregator.convertToBlocks(data_variants, final, 1);
it = blocks.begin();
if (!aggregator.hasTemporaryFiles())
{
impl.reset(new BlocksListBlockInputStream(
aggregator.convertToBlocks(data_variants, final, 1)));
}
else
{
/** Если есть временные файлы с частично-агрегированными данными на диске,
* то читаем и мерджим их, расходуя минимальное количество памяти.
*/
/// Сбросим имеющиеся в оперативке данные тоже на диск. Так проще.
size_t rows = data_variants.sizeWithoutOverflowRow();
if (rows)
aggregator.writeToTemporaryFile(data_variants, rows);
const auto & files = aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
for (const auto & file : files.files)
{
temporary_inputs.emplace_back(new TemporaryFileStream(file->path()));
input_streams.emplace_back(temporary_inputs.back()->block_in);
}
LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size "
<< (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
<< (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
impl.reset(new MergingAggregatedMemoryEfficientBlockInputStream(input_streams, params, final, 1));
}
}
Block res;
if (isCancelled() || it == blocks.end())
if (isCancelled() || !impl)
return res;
res = *it;
++it;
return res;
return impl->read();
}

View File

@ -1,3 +1,5 @@
#include <future>
#include <DB/Common/setThreadName.h>
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
@ -6,11 +8,8 @@ namespace DB
MergingAggregatedMemoryEfficientBlockInputStream::MergingAggregatedMemoryEfficientBlockInputStream(
BlockInputStreams inputs_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_)
: aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),
final(final_),
inputs(inputs_.begin(), inputs_.end())
BlockInputStreams inputs_, const Aggregator::Params & params, bool final_, size_t threads_)
: aggregator(params), final(final_), threads(threads_), inputs(inputs_.begin(), inputs_.end())
{
children = inputs_;
}
@ -27,17 +26,115 @@ String MergingAggregatedMemoryEfficientBlockInputStream::getID() const
Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
{
/// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления.
/** NOTE: Если соединения ещё не установлены, то устанавливает их последовательно.
* И отправляет запрос последовательно. Это медленно.
*/
if (!started)
if (threads == 1)
{
started = true;
for (auto & child : children)
child->readPrefix();
}
/// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления.
/** NOTE: Если соединения ещё не установлены, то устанавливает их последовательно.
* И отправляет запрос последовательно. Это медленно.
*/
if (!started)
{
started = true;
for (auto & child : children)
child->readPrefix();
}
if (BlocksToMerge blocks_to_merge = getNextBlocksToMerge())
return aggregator.mergeBlocks(*blocks_to_merge, final);
return {};
}
else
{
/** Создадим несколько потоков. Каждый из них в цикле будет доставать следующий набор блоков для мерджа,
* затем мерджить их и класть результат в очередь, откуда мы будем читать готовые результаты.
*/
if (!parallel_merge_data)
{
parallel_merge_data.reset(new ParallelMergeData(threads));
auto & pool = parallel_merge_data->pool;
/** Если child - RemoteBlockInputStream, то соединения и отправку запроса тоже будем делать параллельно.
*/
started = true;
size_t num_children = children.size();
std::vector<std::packaged_task<void()>> tasks(num_children);
for (size_t i = 0; i < num_children; ++i)
{
auto & child = children[i];
auto & task = tasks[i];
task = std::packaged_task<void()>([&child] { child->readPrefix(); });
pool.schedule([&task] { task(); });
}
pool.wait();
for (auto & task : tasks)
task.get_future().get();
/** Создаём потоки, которые будут получать и мерджить данные.
*/
for (size_t i = 0; i < threads; ++i)
pool.schedule(std::bind(&MergingAggregatedMemoryEfficientBlockInputStream::mergeThread,
this, current_memory_tracker));
}
OutputData res;
parallel_merge_data->result_queue.pop(res);
if (res.exception)
std::rethrow_exception(res.exception);
if (!res.block)
parallel_merge_data->pool.wait();
return res.block;
}
}
void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread(MemoryTracker * memory_tracker)
{
setThreadName("MrgAggMemEffThr");
current_memory_tracker = memory_tracker;
try
{
while (true)
{
/// Получение следующих блоков делается последовательно, а мердж - параллельно.
BlocksToMerge blocks_to_merge;
{
std::lock_guard<std::mutex> lock(parallel_merge_data->get_next_blocks_mutex);
if (parallel_merge_data->exhausted)
break;
blocks_to_merge = getNextBlocksToMerge();
if (!blocks_to_merge)
{
parallel_merge_data->exhausted = true;
parallel_merge_data->result_queue.push(Block());
break;
}
}
parallel_merge_data->result_queue.push(aggregator.mergeBlocks(*blocks_to_merge, final));
}
}
catch (...)
{
parallel_merge_data->result_queue.push(std::current_exception());
}
}
MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregatedMemoryEfficientBlockInputStream::getNextBlocksToMerge()
{
/** Имеем несколько источников.
* Из каждого из них могут приходить следующие данные:
*
@ -123,13 +220,13 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
// std::cerr << "merging overflows\n";
has_overflows = false;
BlocksList blocks_to_merge;
BlocksToMerge blocks_to_merge = new BlocksList;
for (auto & input : inputs)
if (input.overflow_block)
blocks_to_merge.emplace_back(std::move(input.overflow_block));
blocks_to_merge->emplace_back(std::move(input.overflow_block));
return aggregator.mergeBlocks(blocks_to_merge, final);
return blocks_to_merge;
}
else
return {};
@ -183,7 +280,7 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
continue;
/// Теперь собираем блоки для current_bucket_num, чтобы их померджить.
BlocksList blocks_to_merge;
BlocksToMerge blocks_to_merge = new BlocksList;
for (auto & input : inputs)
{
@ -191,33 +288,33 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
{
// std::cerr << "having block for current_bucket_num\n";
blocks_to_merge.emplace_back(std::move(input.block));
blocks_to_merge->emplace_back(std::move(input.block));
input.block = Block();
}
else if (!input.splitted_blocks.empty() && input.splitted_blocks[min_bucket_num])
{
// std::cerr << "having splitted data for bucket\n";
blocks_to_merge.emplace_back(std::move(input.splitted_blocks[min_bucket_num]));
blocks_to_merge->emplace_back(std::move(input.splitted_blocks[min_bucket_num]));
input.splitted_blocks[min_bucket_num] = Block();
}
}
return aggregator.mergeBlocks(blocks_to_merge, final);
return blocks_to_merge;
}
else
{
/// Есть только одноуровневые данные. Просто мерджим их.
// std::cerr << "don't have two level\n";
BlocksList blocks_to_merge;
BlocksToMerge blocks_to_merge = new BlocksList;
for (auto & input : inputs)
if (input.block)
blocks_to_merge.emplace_back(std::move(input.block));
blocks_to_merge->emplace_back(std::move(input.block));
current_bucket_num = NUM_BUCKETS;
return aggregator.mergeBlocks(blocks_to_merge, final);
return blocks_to_merge;
}
}
}

View File

@ -91,9 +91,10 @@ int main(int argc, char ** argv)
sample.insert(col);
}
Aggregator::Params params(key_column_names, aggregate_descriptions, false);
BlockInputStreamPtr stream = new OneBlockInputStream(block);
stream = new AggregatingBlockInputStream(stream, key_column_names, aggregate_descriptions, false, true,
0, OverflowMode::THROW, nullptr, 0, 0);
stream = new AggregatingBlockInputStream(stream, params, true);
WriteBufferFromOStream ob(std::cout);
RowOutputStreamPtr row_out = new TabSeparatedRowOutputStream(ob, sample);

View File

@ -35,10 +35,10 @@ static void numWidthConstant(T a, UInt64 & c)
inline UInt64 floatWidth(const double x)
{
char tmp[25];
double_conversion::StringBuilder builder{tmp, sizeof(tmp)};
DoubleConverter<false>::BufferType buffer;
double_conversion::StringBuilder builder{buffer, sizeof(buffer)};
const auto result = getDoubleToStringConverter<false>().ToShortest(x, &builder);
const auto result = DoubleConverter<false>::instance().ToShortest(x, &builder);
if (!result)
throw Exception("Cannot print double number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);
@ -48,10 +48,10 @@ inline UInt64 floatWidth(const double x)
inline UInt64 floatWidth(const float x)
{
char tmp[25];
double_conversion::StringBuilder builder{tmp, sizeof(tmp)};
DoubleConverter<false>::BufferType buffer;
double_conversion::StringBuilder builder{buffer, sizeof(buffer)};
const auto result = getDoubleToStringConverter<false>().ToShortestSingle(x, &builder);
const auto result = DoubleConverter<false>::instance().ToShortestSingle(x, &builder);
if (!result)
throw Exception("Cannot print float number", ErrorCodes::CANNOT_PRINT_FLOAT_OR_DOUBLE_NUMBER);

View File

@ -11,8 +11,12 @@
#include <DB/Columns/ColumnArray.h>
#include <DB/AggregateFunctions/AggregateFunctionCount.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/Interpreters/Aggregator.h>
#include <common/Revision.h>
namespace DB
@ -59,6 +63,19 @@ void AggregatedDataVariants::convertToTwoLevel()
}
void Aggregator::Params::calculateColumnNumbers(const Block & block)
{
if (keys.empty() && !key_names.empty())
for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it)
keys.push_back(block.getPositionByName(*it));
for (AggregateDescriptions::iterator it = aggregates.begin(); it != aggregates.end(); ++it)
if (it->arguments.empty() && !it->argument_names.empty())
for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt)
it->arguments.push_back(block.getPositionByName(*jt));
}
void Aggregator::initialize(const Block & block)
{
if (isCancelled())
@ -71,21 +88,23 @@ void Aggregator::initialize(const Block & block)
initialized = true;
aggregate_functions.resize(aggregates_size);
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i] = &*aggregates[i].function;
memory_usage_before_aggregation = current_memory_tracker->get();
aggregate_functions.resize(params.aggregates_size);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i] = params.aggregates[i].function.get();
/// Инициализируем размеры состояний и смещения для агрегатных функций.
offsets_of_aggregate_states.resize(aggregates_size);
offsets_of_aggregate_states.resize(params.aggregates_size);
total_size_of_aggregate_states = 0;
all_aggregates_has_trivial_destructor = true;
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
{
offsets_of_aggregate_states[i] = total_size_of_aggregate_states;
total_size_of_aggregate_states += aggregates[i].function->sizeOfData();
total_size_of_aggregate_states += params.aggregates[i].function->sizeOfData();
if (!aggregates[i].function->hasTrivialDestructor())
if (!params.aggregates[i].function->hasTrivialDestructor())
all_aggregates_has_trivial_destructor = false;
}
@ -99,14 +118,7 @@ void Aggregator::initialize(const Block & block)
return;
/// Преобразуем имена столбцов в номера, если номера не заданы
if (keys.empty() && !key_names.empty())
for (Names::const_iterator it = key_names.begin(); it != key_names.end(); ++it)
keys.push_back(block.getPositionByName(*it));
for (AggregateDescriptions::iterator it = aggregates.begin(); it != aggregates.end(); ++it)
if (it->arguments.empty() && !it->argument_names.empty())
for (Names::const_iterator jt = it->argument_names.begin(); jt != it->argument_names.end(); ++jt)
it->arguments.push_back(block.getPositionByName(*jt));
params.calculateColumnNumbers(block);
if (isCancelled())
return;
@ -114,24 +126,24 @@ void Aggregator::initialize(const Block & block)
/// Создадим пример блока, описывающего результат
if (!sample)
{
for (size_t i = 0; i < keys_size; ++i)
for (size_t i = 0; i < params.keys_size; ++i)
{
sample.insert(block.getByPosition(keys[i]).cloneEmpty());
sample.insert(block.getByPosition(params.keys[i]).cloneEmpty());
if (auto converted = sample.getByPosition(i).column->convertToFullColumnIfConst())
sample.getByPosition(i).column = converted;
}
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
{
ColumnWithTypeAndName col;
col.name = aggregates[i].column_name;
col.name = params.aggregates[i].column_name;
size_t arguments_size = aggregates[i].arguments.size();
size_t arguments_size = params.aggregates[i].arguments.size();
DataTypes argument_types(arguments_size);
for (size_t j = 0; j < arguments_size; ++j)
argument_types[j] = block.getByPosition(aggregates[i].arguments[j]).type;
argument_types[j] = block.getByPosition(params.aggregates[i].arguments[j]).type;
col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types, aggregates[i].parameters);
col.type = new DataTypeAggregateFunction(params.aggregates[i].function, argument_types, params.aggregates[i].parameters);
col.column = col.type->createColumn();
sample.insert(col);
@ -175,7 +187,7 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
/// Список типов агрегатных функций.
std::stringstream aggregate_functions_typenames_str;
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
{
IAggregateFunction & func = *aggregate_functions[i];
@ -306,7 +318,7 @@ void Aggregator::compileIfPossible(AggregatedDataVariants::Type type)
* Если счётчик достигнул значения min_count_to_compile, то асинхронно (в отдельном потоке) запускается компиляция,
* по окончании которой вызывается колбэк on_ready.
*/
SharedLibraryPtr lib = compiler->getOrCount(key, min_count_to_compile,
SharedLibraryPtr lib = params.compiler->getOrCount(key, params.min_count_to_compile,
"-include /usr/share/clickhouse/headers/dbms/include/DB/Interpreters/SpecializedAggregator.h",
get_code, on_ready);
@ -329,8 +341,8 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu
bool has_arrays_of_non_fixed_elems = false;
bool all_non_array_keys_are_fixed = true;
key_sizes.resize(keys_size);
for (size_t j = 0; j < keys_size; ++j)
key_sizes.resize(params.keys_size);
for (size_t j = 0; j < params.keys_size; ++j)
{
if (key_columns[j]->isFixed())
{
@ -354,11 +366,11 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu
}
/// Если ключей нет
if (keys_size == 0)
if (params.keys_size == 0)
return AggregatedDataVariants::Type::without_key;
/// Если есть один числовой ключ, который помещается в 64 бита
if (keys_size == 1 && key_columns[0]->isNumeric())
if (params.keys_size == 1 && key_columns[0]->isNumeric())
{
size_t size_of_field = key_columns[0]->sizeOfField();
if (size_of_field == 1)
@ -379,10 +391,10 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu
return AggregatedDataVariants::Type::keys256;
/// Если есть один строковый ключ, то используем хэш-таблицу с ним
if (keys_size == 1 && typeid_cast<const ColumnString *>(key_columns[0]))
if (params.keys_size == 1 && typeid_cast<const ColumnString *>(key_columns[0]))
return AggregatedDataVariants::Type::key_string;
if (keys_size == 1 && typeid_cast<const ColumnFixedString *>(key_columns[0]))
if (params.keys_size == 1 && typeid_cast<const ColumnFixedString *>(key_columns[0]))
return AggregatedDataVariants::Type::key_fixed_string;
/** Если есть массивы.
@ -401,7 +413,7 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod(const ConstColu
void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const
{
for (size_t j = 0; j < aggregates_size; ++j)
for (size_t j = 0; j < params.aggregates_size; ++j)
{
try
{
@ -475,7 +487,7 @@ void NO_INLINE Aggregator::executeImplCase(
bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys.
/// Получаем ключ для вставки в хэш-таблицу.
typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *aggregates_pool);
typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
if (!no_more_keys) /// Вставляем.
{
@ -522,7 +534,7 @@ void NO_INLINE Aggregator::executeImplCase(
/// exception-safety - если не удалось выделить память или создать состояния, то не будут вызываться деструкторы.
aggregate_data = nullptr;
method.onNewKey(*it, keys_size, i, keys, *aggregates_pool);
method.onNewKey(*it, params.keys_size, i, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alloc(total_size_of_aggregate_states);
createAggregateStates(place);
@ -549,7 +561,7 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregateFunctionInstruction * aggregate_instructions) const
{
/// Оптимизация в случае единственной агрегатной функции count.
AggregateFunctionCount * agg_count = aggregates_size == 1
AggregateFunctionCount * agg_count = params.aggregates_size == 1
? typeid_cast<AggregateFunctionCount *>(aggregate_functions[0])
: NULL;
@ -580,8 +592,8 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
/// result будет уничтожать состояния агрегатных функций в деструкторе
result.aggregator = this;
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_columns[i].resize(aggregates[i].arguments.size());
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i].resize(params.aggregates[i].arguments.size());
/** Константные столбцы не поддерживаются напрямую при агрегации.
* Чтобы они всё-равно работали, материализуем их.
@ -589,9 +601,9 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
Columns materialized_columns;
/// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i)
for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = block.getByPosition(keys[i]).column;
key_columns[i] = block.getByPosition(params.keys[i]).column;
if (auto converted = key_columns[i]->convertToFullColumnIfConst())
{
@ -600,14 +612,14 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
}
}
AggregateFunctionInstructions aggregate_functions_instructions(aggregates_size + 1);
aggregate_functions_instructions[aggregates_size].that = nullptr;
AggregateFunctionInstructions aggregate_functions_instructions(params.aggregates_size + 1);
aggregate_functions_instructions[params.aggregates_size].that = nullptr;
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
{
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
{
aggregate_columns[i][j] = block.getByPosition(aggregates[i].arguments[j]).column;
aggregate_columns[i][j] = block.getByPosition(params.aggregates[i].arguments[j]).column;
if (auto converted = aggregate_columns[i][j]->convertToFullColumnIfConst())
{
@ -631,18 +643,18 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
if (result.empty())
{
result.init(chooseAggregationMethod(key_columns, key_sizes));
result.keys_size = keys_size;
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
if (compiler)
if (params.compiler)
compileIfPossible(result.type);
}
if (isCancelled())
return true;
if ((overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
if ((params.overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
{
AggregateDataPtr place = result.aggregates_pool->alloc(total_size_of_aggregate_states);
createAggregateStates(place);
@ -667,7 +679,7 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
else
{
/// Сюда пишутся данные, не поместившиеся в max_rows_to_group_by при group_by_overflow_mode = any.
AggregateDataPtr overflow_row_ptr = overflow_row ? result.without_key : nullptr;
AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr;
bool is_two_level = result.isTwoLevel();
@ -718,29 +730,155 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
}
size_t result_size = result.sizeWithoutOverflowRow();
auto current_memory_usage = current_memory_tracker->get();
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Здесь учитываются все результаты в сумме, из разных потоков.
if (group_by_two_level_threshold && result.isConvertibleToTwoLevel() && result_size >= group_by_two_level_threshold)
bool worth_convert_to_two_level
= (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold)
|| (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast<Int64>(params.group_by_two_level_threshold_bytes));
/** Преобразование в двухуровневую структуру данных.
* Она позволяет делать, в последующем, эффективный мердж - либо экономный по памяти, либо распараллеленный.
*/
if (result.isConvertibleToTwoLevel() && worth_convert_to_two_level)
result.convertToTwoLevel();
/// Проверка ограничений.
if (!checkLimits(result_size, no_more_keys))
return false;
/** Сброс данных на диск, если потребляется слишком много оперативки.
* Данные можно сбросить на диск только если используется двухуровневая структура агрегации.
*/
if (params.max_bytes_before_external_group_by
&& result.isTwoLevel()
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
writeToTemporaryFile(result, result_size);
}
return true;
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t rows)
{
Stopwatch watch;
auto file = std::make_unique<Poco::TemporaryFile>(params.tmp_path);
const std::string & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, Revision::get());
LOG_DEBUG(log, "Writing part of aggregation data into temporary file " << path << ".");
/// Сбрасываем только двухуровневые данные.
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out, path);
if (false) {}
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
/// NOTE Вместо освобождения памяти и создания новых хэш-таблиц и арены, можно переиспользовать старые.
data_variants.init(data_variants.type);
data_variants.aggregates_pools = Arenas(1, new Arena);
data_variants.aggregates_pool = data_variants.aggregates_pools.back().get();
block_out.flush();
compressed_buf.next();
file_buf.next();
double elapsed_seconds = watch.elapsedSeconds();
double compressed_bytes = file_buf.count();
double uncompressed_bytes = compressed_buf.count();
{
std::lock_guard<std::mutex> lock(temporary_files.mutex);
temporary_files.files.emplace_back(std::move(file));
temporary_files.sum_size_uncompressed += uncompressed_bytes;
temporary_files.sum_size_compressed += compressed_bytes;
}
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Written part in " << elapsed_seconds << " sec., "
<< rows << " rows, "
<< (uncompressed_bytes / 1048576.0) << " MiB uncompressed, "
<< (compressed_bytes / 1048576.0) << " MiB compressed, "
<< (uncompressed_bytes / rows) << " uncompressed bytes per row, "
<< (compressed_bytes / rows) << " compressed bytes per row, "
<< "compression rate: " << (uncompressed_bytes / compressed_bytes)
<< " (" << (rows / elapsed_seconds) << " rows/sec., "
<< (uncompressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. uncompressed, "
<< (compressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. compressed)");
}
template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out,
const String & path)
{
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
if (method.data.impls[bucket].empty())
continue;
Block block = prepareBlockAndFill(data_variants, false, method.data.impls[bucket].size(),
[bucket, &method, this] (
ColumnPlainPtrs & key_columns,
AggregateColumnsData & aggregate_columns,
ColumnPlainPtrs & final_aggregate_columns,
const Sizes & key_sizes,
bool final)
{
convertToBlockImpl(method, method.data.impls[bucket],
key_columns, aggregate_columns, final_aggregate_columns, key_sizes, final);
});
block.info.bucket_num = bucket;
out.write(block);
size_t block_size_rows = block.rowsInFirstColumn();
size_t block_size_bytes = block.bytes();
if (block_size_rows > max_temporary_block_size_rows)
max_temporary_block_size_rows = block.rowsInFirstColumn();
if (block_size_bytes > max_temporary_block_size_bytes)
max_temporary_block_size_bytes = block_size_bytes;
}
/// data_variants не будет уничтожать состояния агрегатных функций в деструкторе. Теперь состояниями владеют ColumnAggregateFunction.
data_variants.aggregator = nullptr;
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Max size of temporary block: " << max_temporary_block_size_rows << " rows, "
<< (max_temporary_block_size_bytes / 1048576.0) << " MiB.");
}
bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
{
if (!no_more_keys && max_rows_to_group_by && result_size > max_rows_to_group_by)
if (!no_more_keys && params.max_rows_to_group_by && result_size > params.max_rows_to_group_by)
{
if (group_by_overflow_mode == OverflowMode::THROW)
if (params.group_by_overflow_mode == OverflowMode::THROW)
throw Exception("Limit for rows to GROUP BY exceeded: has " + toString(result_size)
+ " rows, maximum: " + toString(max_rows_to_group_by),
+ " rows, maximum: " + toString(params.max_rows_to_group_by),
ErrorCodes::TOO_MUCH_ROWS);
else if (group_by_overflow_mode == OverflowMode::BREAK)
else if (params.group_by_overflow_mode == OverflowMode::BREAK)
return false;
else if (group_by_overflow_mode == OverflowMode::ANY)
else if (params.group_by_overflow_mode == OverflowMode::ANY)
no_more_keys = true;
else
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
@ -750,16 +888,14 @@ bool Aggregator::checkLimits(size_t result_size, bool & no_more_keys) const
}
/** Результат хранится в оперативке и должен полностью помещаться в оперативку.
*/
void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & result)
{
if (isCancelled())
return;
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
AggregateColumns aggregate_columns(aggregates_size);
StringRefs key(params.keys_size);
ConstColumnPlainPtrs key_columns(params.keys_size);
AggregateColumns aggregate_columns(params.aggregates_size);
Sizes key_sizes;
/** Используется, если есть ограничение на максимальное количество строк при агрегации,
@ -827,9 +963,9 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
{
for (typename Table::const_iterator it = data.begin(); it != data.end(); ++it)
{
method.insertKeyIntoColumns(*it, key_columns, keys_size, key_sizes);
method.insertKeyIntoColumns(*it, key_columns, params.keys_size, key_sizes);
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
@ -847,9 +983,9 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
size_t j = 0;
for (typename Table::const_iterator it = data.begin(); it != data.end(); ++it, ++j)
{
method.insertKeyIntoColumns(*it, key_columns, keys_size, key_sizes);
method.insertKeyIntoColumns(*it, key_columns, params.keys_size, key_sizes);
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
(*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i];
}
}
@ -864,11 +1000,11 @@ Block Aggregator::prepareBlockAndFill(
{
Block res = sample.cloneEmpty();
ColumnPlainPtrs key_columns(keys_size);
AggregateColumnsData aggregate_columns(aggregates_size);
ColumnPlainPtrs final_aggregate_columns(aggregates_size);
ColumnPlainPtrs key_columns(params.keys_size);
AggregateColumnsData aggregate_columns(params.aggregates_size);
ColumnPlainPtrs final_aggregate_columns(params.aggregates_size);
for (size_t i = 0; i < keys_size; ++i)
for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = res.getByPosition(i).column;
key_columns[i]->reserve(rows);
@ -876,12 +1012,12 @@ Block Aggregator::prepareBlockAndFill(
try
{
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (!final)
{
/// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*res.getByPosition(i + keys_size).column);
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*res.getByPosition(i + params.keys_size).column);
for (size_t j = 0; j < data_variants.aggregates_pools.size(); ++j)
column_aggregate_func.addArena(data_variants.aggregates_pools[j]);
@ -891,7 +1027,7 @@ Block Aggregator::prepareBlockAndFill(
}
else
{
ColumnWithTypeAndName & column = res.getByPosition(i + keys_size);
ColumnWithTypeAndName & column = res.getByPosition(i + params.keys_size);
column.type = aggregate_functions[i]->getReturnType();
column.column = column.type->createColumn();
column.column->reserve(rows);
@ -926,7 +1062,7 @@ Block Aggregator::prepareBlockAndFill(
* а также деструкторы будут вызываться у AggregatedDataVariants.
* Поэтому, вручную "откатываем" их.
*/
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
if (aggregate_columns[i])
aggregate_columns[i]->clear();
@ -948,18 +1084,18 @@ BlocksList Aggregator::prepareBlocksAndFillWithoutKey(AggregatedDataVariants & d
const Sizes & key_sizes,
bool final)
{
if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row)
if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
{
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!final)
(*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i];
else
aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
if (overflow_row)
for (size_t i = 0; i < keys_size; ++i)
if (params.overflow_row)
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i]->insertDefault();
}
};
@ -1112,7 +1248,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
{
for (auto & block : blocks)
{
for (size_t column_num = keys_size; column_num < keys_size + aggregates_size; ++column_num)
for (size_t column_num = params.keys_size; column_num < params.keys_size + params.aggregates_size; ++column_num)
{
IColumn & col = *block.getByPosition(column_num).column;
if (ColumnAggregateFunction * col_aggregate = typeid_cast<ColumnAggregateFunction *>(&col))
@ -1151,7 +1287,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
if (isCancelled())
return BlocksList();
if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row)
if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
blocks.splice(blocks.end(), prepareBlocksAndFillWithoutKey(
data_variants, final, data_variants.type != AggregatedDataVariants::Type::without_key));
@ -1173,7 +1309,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
*/
for (auto & block : blocks)
{
for (size_t column_num = keys_size; column_num < keys_size + aggregates_size; ++column_num)
for (size_t column_num = params.keys_size; column_num < params.keys_size + params.aggregates_size; ++column_num)
{
IColumn & col = *block.getByPosition(column_num).column;
if (ColumnAggregateFunction * col_aggregate = typeid_cast<ColumnAggregateFunction *>(&col))
@ -1226,12 +1362,12 @@ void NO_INLINE Aggregator::mergeDataImpl(
if (!inserted)
{
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
Method::getAggregateData(res_it->second) + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
}
@ -1259,12 +1395,12 @@ void NO_INLINE Aggregator::mergeDataNoMoreKeysImpl(
? overflows
: Method::getAggregateData(res_it->second);
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
@ -1286,12 +1422,12 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
AggregateDataPtr res_data = Method::getAggregateData(res_it->second);
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
@ -1311,12 +1447,12 @@ void NO_INLINE Aggregator::mergeDataRemainingKeysToOverflowsImpl(
AggregateDataPtr res_data = overflows;
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i]);
@ -1336,10 +1472,10 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
AggregatedDataWithoutKey & res_data = res->without_key;
AggregatedDataWithoutKey & current_data = non_empty_data[i]->without_key;
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res_data + offsets_of_aggregate_states[i], current_data + offsets_of_aggregate_states[i]);
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(current_data + offsets_of_aggregate_states[i]);
current_data = nullptr;
@ -1451,7 +1587,7 @@ void NO_INLINE Aggregator::mergeTwoLevelDataImpl(
if (task.valid())
task.get_future().get();
if (no_more_keys && overflow_row)
if (no_more_keys && params.overflow_row)
{
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{
@ -1533,7 +1669,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
}
/// В какой структуре данных агрегированы данные?
if (res->type == AggregatedDataVariants::Type::without_key || overflow_row)
if (res->type == AggregatedDataVariants::Type::without_key || params.overflow_row)
mergeWithoutKeyDataImpl(non_empty_data);
std::unique_ptr<boost::threadpool::pool> thread_pool;
@ -1607,21 +1743,21 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
Table & data,
AggregateDataPtr overflow_row) const
{
ConstColumnPlainPtrs key_columns(keys_size);
AggregateColumnsData aggregate_columns(aggregates_size);
ConstColumnPlainPtrs key_columns(params.keys_size);
AggregateColumnsData aggregate_columns(params.aggregates_size);
/// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i)
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i] = block.getByPosition(i).column;
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_columns[i] = &typeid_cast<ColumnAggregateFunction &>(*block.getByPosition(keys_size + i).column).getData();
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i] = &typeid_cast<ColumnAggregateFunction &>(*block.getByPosition(params.keys_size + i).column).getData();
typename Method::State state;
state.init(key_columns);
/// Для всех строчек.
StringRefs keys(keys_size);
StringRefs keys(params.keys_size);
size_t rows = block.rowsInFirstColumn();
for (size_t i = 0; i < rows; ++i)
{
@ -1631,7 +1767,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
bool overflow = false; /// Новый ключ не поместился в хэш-таблицу из-за no_more_keys.
/// Получаем ключ для вставки в хэш-таблицу.
auto key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *aggregates_pool);
auto key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *aggregates_pool);
if (!no_more_keys)
{
@ -1658,7 +1794,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
AggregateDataPtr & aggregate_data = Method::getAggregateData(it->second);
aggregate_data = nullptr;
method.onNewKey(*it, keys_size, i, keys, *aggregates_pool);
method.onNewKey(*it, params.keys_size, i, keys, *aggregates_pool);
AggregateDataPtr place = aggregates_pool->alloc(total_size_of_aggregate_states);
createAggregateStates(place);
@ -1670,7 +1806,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
AggregateDataPtr value = (!no_more_keys || !overflow) ? Method::getAggregateData(it->second) : overflow_row;
/// Мерджим состояния агрегатных функций.
for (size_t j = 0; j < aggregates_size; ++j)
for (size_t j = 0; j < params.aggregates_size; ++j)
aggregate_functions[j]->merge(
value + offsets_of_aggregate_states[j],
(*aggregate_columns[j])[i]);
@ -1701,11 +1837,11 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
Block & block,
AggregatedDataVariants & result) const
{
AggregateColumnsData aggregate_columns(aggregates_size);
AggregateColumnsData aggregate_columns(params.aggregates_size);
/// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_columns[i] = &typeid_cast<ColumnAggregateFunction &>(*block.getByPosition(keys_size + i).column).getData();
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i] = &typeid_cast<ColumnAggregateFunction &>(*block.getByPosition(params.keys_size + i).column).getData();
AggregatedDataWithoutKey & res = result.without_key;
if (!res)
@ -1716,7 +1852,7 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
}
/// Добавляем значения
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[0]);
/// Пораньше освобождаем память.
@ -1729,10 +1865,10 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
if (isCancelled())
return;
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
StringRefs key(params.keys_size);
ConstColumnPlainPtrs key_columns(params.keys_size);
AggregateColumnsData aggregate_columns(aggregates_size);
AggregateColumnsData aggregate_columns(params.aggregates_size);
Block empty_block;
initialize(empty_block);
@ -1748,7 +1884,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
using BucketToBlocks = std::map<Int32, BlocksList>;
BucketToBlocks bucket_to_blocks;
/// Читаем все данные. TODO memory-savvy режим, при котором в один момент времени обрабатывается только одна корзина.
/// Читаем все данные.
LOG_TRACE(log, "Reading blocks of partially aggregated data.");
size_t total_input_rows = 0;
@ -1772,7 +1908,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
sample = bucket_to_blocks.begin()->second.front().cloneEmpty();
/// Каким способом выполнять агрегацию?
for (size_t i = 0; i < keys_size; ++i)
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i] = sample.getByPosition(i).column;
Sizes key_sizes;
@ -1803,7 +1939,7 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
result.aggregator = this;
result.init(method);
result.keys_size = keys_size;
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
bool has_blocks_with_unknown_bucket = bucket_to_blocks.count(-1);
@ -1923,10 +2059,10 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
if (blocks.empty())
return {};
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
StringRefs key(params.keys_size);
ConstColumnPlainPtrs key_columns(params.keys_size);
AggregateColumnsData aggregate_columns(aggregates_size);
AggregateColumnsData aggregate_columns(params.aggregates_size);
Block empty_block;
initialize(empty_block);
@ -1935,7 +2071,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
sample = blocks.front().cloneEmpty();
/// Каким способом выполнять агрегацию?
for (size_t i = 0; i < keys_size; ++i)
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i] = sample.getByPosition(i).column;
Sizes key_sizes;
@ -1948,10 +2084,10 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
result.aggregator = this;
result.init(method);
result.keys_size = keys_size;
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Merging partially aggregated blocks.");
LOG_TRACE(log, "Merging partially aggregated blocks (bucket = " << blocks.front().info.bucket_num << ").");
for (Block & block : blocks)
{
@ -2050,7 +2186,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
for (size_t i = 0; i < rows; ++i)
{
/// Получаем ключ. Вычисляем на его основе номер корзины.
typename Method::Key key = state.getKey(key_columns, keys_size, i, key_sizes, keys, *pool);
typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *pool);
auto hash = method.data.hash(key);
auto bucket = method.data.getBucketFromHash(hash);
@ -2102,16 +2238,16 @@ std::vector<Block> Aggregator::convertBlockToTwoLevel(const Block & block)
AggregatedDataVariants data;
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
StringRefs key(params.keys_size);
ConstColumnPlainPtrs key_columns(params.keys_size);
Sizes key_sizes;
/// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i)
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i] = block.getByPosition(i).column;
AggregatedDataVariants::Type type = chooseAggregationMethod(key_columns, key_sizes);
data.keys_size = keys_size;
data.keys_size = params.keys_size;
data.key_sizes = key_sizes;
#define M(NAME) \
@ -2170,7 +2306,7 @@ void NO_INLINE Aggregator::destroyImpl(
if (nullptr == data)
continue;
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
}
@ -2185,12 +2321,12 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
LOG_TRACE(log, "Destroying aggregate states");
/// В какой структуре данных агрегированы данные?
if (result.type == AggregatedDataVariants::Type::without_key || overflow_row)
if (result.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
{
AggregatedDataWithoutKey & res_data = result.without_key;
if (nullptr != res_data)
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
}
@ -2211,22 +2347,22 @@ String Aggregator::getID() const
{
std::stringstream res;
if (keys.empty())
if (params.keys.empty())
{
res << "key_names";
for (size_t i = 0; i < key_names.size(); ++i)
res << ", " << key_names[i];
for (size_t i = 0; i < params.key_names.size(); ++i)
res << ", " << params.key_names[i];
}
else
{
res << "keys";
for (size_t i = 0; i < keys.size(); ++i)
res << ", " << keys[i];
for (size_t i = 0; i < params.keys.size(); ++i)
res << ", " << params.keys[i];
}
res << ", aggregates";
for (size_t i = 0; i < aggregates.size(); ++i)
res << ", " << aggregates[i].column_name;
for (size_t i = 0; i < params.aggregates_size; ++i)
res << ", " << params.aggregates[i].column_name;
return res.str();
}

View File

@ -852,12 +852,23 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression,
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
/** Двухуровневая агрегация полезна в двух случаях:
* 1. Делается параллельная агрегация, и результаты надо параллельно мерджить.
* 2. Делается агрегация с сохранением временных данных на диск, и их нужно мерджить эффективно по памяти.
*/
bool allow_to_use_two_level_group_by = streams.size() > 1 || settings.limits.max_bytes_before_external_group_by != 0;
Aggregator::Params params(key_names, aggregates,
overflow_row, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile,
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.limits.max_bytes_before_external_group_by, context.getTemporaryPath());
/// Если источников несколько, то выполняем параллельную агрегацию
if (streams.size() > 1)
{
streams[0] = new ParallelAggregatingBlockInputStream(streams, stream_with_non_joined_data, key_names, aggregates, overflow_row, final,
settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, settings.group_by_two_level_threshold);
streams[0] = new ParallelAggregatingBlockInputStream(streams, stream_with_non_joined_data, params, final, settings.max_threads);
stream_with_non_joined_data = nullptr;
streams.resize(1);
@ -873,9 +884,7 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression,
if (stream_with_non_joined_data)
inputs.push_back(stream_with_non_joined_data);
streams[0] = new AggregatingBlockInputStream(new ConcatBlockInputStream(inputs), key_names, aggregates, overflow_row, final,
settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile, 0);
streams[0] = new AggregatingBlockInputStream(new ConcatBlockInputStream(inputs), params, final);
stream_with_non_joined_data = nullptr;
}
@ -903,17 +912,19 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
* но при этом может работать медленнее.
*/
Aggregator::Params params(key_names, aggregates, overflow_row);
if (!settings.distributed_aggregation_memory_efficient)
{
/// Склеим несколько источников в один, распараллеливая работу.
executeUnion();
/// Теперь объединим агрегированные блоки
streams[0] = new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final, original_max_threads);
streams[0] = new MergingAggregatedBlockInputStream(streams[0], params, final, original_max_threads);
}
else
{
streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, key_names, aggregates, overflow_row, final);
streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, params, final, original_max_threads);
streams.resize(1);
}
}

View File

@ -18,16 +18,18 @@
int main(int argc, char ** argv)
{
using namespace DB;
try
{
size_t n = argc == 2 ? atoi(argv[1]) : 10;
DB::Block block;
Block block;
DB::ColumnWithTypeAndName column_x;
ColumnWithTypeAndName column_x;
column_x.name = "x";
column_x.type = new DB::DataTypeInt16;
DB::ColumnInt16 * x = new DB::ColumnInt16;
column_x.type = new DataTypeInt16;
ColumnInt16 * x = new ColumnInt16;
column_x.column = x;
auto & vec_x = x->getData();
@ -39,41 +41,42 @@ int main(int argc, char ** argv)
const char * strings[] = {"abc", "def", "abcd", "defg", "ac"};
DB::ColumnWithTypeAndName column_s1;
ColumnWithTypeAndName column_s1;
column_s1.name = "s1";
column_s1.type = new DB::DataTypeString;
column_s1.column = new DB::ColumnString;
column_s1.type = new DataTypeString;
column_s1.column = new ColumnString;
for (size_t i = 0; i < n; ++i)
column_s1.column->insert(std::string(strings[i % 5]));
block.insert(column_s1);
DB::ColumnWithTypeAndName column_s2;
ColumnWithTypeAndName column_s2;
column_s2.name = "s2";
column_s2.type = new DB::DataTypeString;
column_s2.column = new DB::ColumnString;
column_s2.type = new DataTypeString;
column_s2.column = new ColumnString;
for (size_t i = 0; i < n; ++i)
column_s2.column->insert(std::string(strings[i % 3]));
block.insert(column_s2);
DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block);
DB::AggregatedDataVariants aggregated_data_variants;
BlockInputStreamPtr stream = new OneBlockInputStream(block);
AggregatedDataVariants aggregated_data_variants;
DB::Names key_column_names;
Names key_column_names;
key_column_names.emplace_back("x");
key_column_names.emplace_back("s1");
DB::AggregateFunctionFactory factory;
AggregateFunctionFactory factory;
DB::AggregateDescriptions aggregate_descriptions(1);
AggregateDescriptions aggregate_descriptions(1);
DB::DataTypes empty_list_of_types;
DataTypes empty_list_of_types;
aggregate_descriptions[0].function = factory.get("count", empty_list_of_types);
DB::Aggregator aggregator(key_column_names, aggregate_descriptions, false, 0, DB::OverflowMode::THROW, nullptr, 0, 0);
Aggregator::Params params(key_column_names, aggregate_descriptions, false);
Aggregator aggregator(params);
{
Poco::Stopwatch stopwatch;
@ -88,7 +91,7 @@ int main(int argc, char ** argv)
<< std::endl;
}
}
catch (const DB::Exception & e)
catch (const Exception & e)
{
std::cerr << e.displayText() << std::endl;
}

View File

@ -151,11 +151,7 @@ static void appendBlock(const Block & from, Block & to)
throw Exception("Cannot append block to another: different type of columns at index " + toString(column_no)
+ ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE);
if (col_to.empty())
to.getByPosition(column_no).column = col_from.clone();
else
for (size_t row_no = 0; row_no < rows; ++row_no)
col_to.insertFrom(col_from, row_no);
col_to.insertRangeFrom(col_from, 0, rows);
}
}

View File

@ -0,0 +1,2 @@
49999995000000 10000000
499999500000 1000000 15

View File

@ -0,0 +1,5 @@
SET max_bytes_before_external_group_by = 100000000;
SET max_memory_usage = 200000000;
SELECT sum(k), sum(c) FROM (SELECT number AS k, count() AS c FROM (SELECT * FROM system.numbers LIMIT 10000000) GROUP BY k);
SELECT sum(k), sum(c), max(u) FROM (SELECT number AS k, count() AS c, uniqArray(range(number % 16)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k);