This commit is contained in:
Alexey Arno 2015-08-03 18:54:38 +03:00
commit a012ab00a8
7 changed files with 257 additions and 29 deletions

View File

@ -0,0 +1,110 @@
#pragma once
#include <DB/Interpreters/Aggregator.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному блоку из каждого потока.
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата.
*
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
*/
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
{
public:
MergingAggregatedMemoryEfficientBlockInputStream(BlockInputStreams inputs_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_)
: aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),
final(final_)
{
children = inputs_;
current_blocks.resize(children.size());
overflow_blocks.resize(children.size());
is_exhausted.resize(children.size());
}
String getName() const override { return "MergingAggregatedMemorySavvy"; }
String getID() const override
{
std::stringstream res;
res << "MergingAggregatedMemorySavvy(" << aggregator.getID();
for (size_t i = 0, size = children.size(); i < size; ++i)
res << ", " << children.back()->getID();
res << ")";
return res.str();
}
protected:
Block readImpl() override
{
/// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления.
if (current_bucket_num == -1)
for (auto & child : children)
child->readPrefix();
/// Всё прочитали.
if (current_bucket_num > 255)
return {};
/// Читаем следующие блоки для current_bucket_num
for (size_t i = 0, size = children.size(); i < size; ++i)
{
while (!is_exhausted[i] && (!current_blocks[i] || current_blocks[i].info.bucket_num < current_bucket_num))
{
current_blocks[i] = children[i]->read();
if (!current_blocks[i])
{
is_exhausted[i] = true;
}
else if (current_blocks[i].info.is_overflows)
{
overflow_blocks[i].swap(current_blocks[i]);
}
}
}
/// Может быть, нет блоков для current_bucket_num, а все блоки имеют больший bucket_num.
Int32 min_bucket_num = 256;
for (size_t i = 0, size = children.size(); i < size; ++i)
if (!is_exhausted[i] && current_blocks[i].info.bucket_num < min_bucket_num)
min_bucket_num = current_blocks[i].info.bucket_num;
current_bucket_num = min_bucket_num;
/// Все потоки исчерпаны.
if (current_bucket_num > 255)
return {}; /// TODO overflow_blocks.
/// TODO Если есть single_level и two_level блоки.
/// Объединяем все блоки с current_bucket_num.
BlocksList blocks_to_merge;
for (size_t i = 0, size = children.size(); i < size; ++i)
if (current_blocks[i].info.bucket_num == current_bucket_num)
blocks_to_merge.emplace_back(std::move(current_blocks[i]));
Block res = aggregator.mergeBlocks(blocks_to_merge, final);
++current_bucket_num;
return res;
}
private:
Aggregator aggregator;
bool final;
Int32 current_bucket_num = -1;
std::vector<Block> current_blocks;
std::vector<UInt8> is_exhausted;
std::vector<Block> overflow_blocks;
};
}

View File

@ -111,6 +111,14 @@ public:
parallel_replicas->disconnect();
}
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
void readPrefix() override
{
if (!sent_query)
sendQuery();
}
protected:
/// Отправить на удаленные реплики все временные таблицы
void sendExternalTables()
@ -147,19 +155,10 @@ protected:
{
if (!sent_query)
{
createParallelReplicas();
sendQuery();
if (settings.skip_unavailable_shards && 0 == parallel_replicas->size())
return Block();
established = true;
parallel_replicas->sendQuery(query, "", stage, true);
established = false;
sent_query = true;
sendExternalTables();
return {};
}
while (true)
@ -280,6 +279,23 @@ protected:
}
private:
void sendQuery()
{
createParallelReplicas();
if (settings.skip_unavailable_shards && 0 == parallel_replicas->size())
return;
established = true;
parallel_replicas->sendQuery(query, "", stage, true);
established = false;
sent_query = true;
sendExternalTables();
}
/// ITable::read requires a Context, therefore we should create one if the user can't supply it
static Context & getDefaultContext()
{

View File

@ -674,17 +674,6 @@ typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr;
typedef std::vector<AggregatedDataVariantsPtr> ManyAggregatedDataVariants;
/** Достать вариант агрегации по его типу. */
template <typename Method> Method & getDataVariant(AggregatedDataVariants & variants);
#define M(NAME, IS_TWO_LEVEL) \
template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant<decltype(AggregatedDataVariants::NAME)::element_type>(AggregatedDataVariants & variants) { return *variants.NAME; }
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
/** Агрегирует источник блоков.
*/
class Aggregator
@ -733,11 +722,15 @@ public:
*/
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads);
/** Объединить несколько агрегированных блоков в одну структуру данных.
/** Объединить поток частично агрегированных блоков в одну структуру данных.
* (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.)
*/
void mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants & result, size_t max_threads);
/** Объединить несколько частично агрегированных блоков в один.
*/
Block mergeBlocks(BlocksList & blocks, bool final);
using CancellationHook = std::function<bool()>;
/** Установить функцию, которая проверяет, можно ли прервать текущую задачу.
@ -974,4 +967,15 @@ protected:
};
/** Достать вариант агрегации по его типу. */
template <typename Method> Method & getDataVariant(AggregatedDataVariants & variants);
#define M(NAME, IS_TWO_LEVEL) \
template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant<decltype(AggregatedDataVariants::NAME)::element_type>(AggregatedDataVariants & variants) { return *variants.NAME; }
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}

View File

@ -91,6 +91,8 @@ struct Settings
M(SettingUInt64, min_count_to_compile, 3) \
/** При каком количестве ключей, начинает использоваться двухуровневая агрегация. 0 - никогда не использовать. */ \
M(SettingUInt64, group_by_two_level_threshold, 100000) \
/** Включён ли экономный по памяти режим распределённой агрегации. */ \
M(SettingBool, distributed_aggregation_memory_efficient, false) \
\
/** Максимальное количество используемых реплик каждого шарда при выполнении запроса */ \
M(SettingUInt64, max_parallel_replicas, 1) \

View File

@ -141,9 +141,12 @@ void Block::insertUnique(const ColumnWithTypeAndName & elem)
void Block::erase(size_t position)
{
if (index_by_position.empty())
throw Exception("Block is empty", ErrorCodes::POSITION_OUT_OF_BOUND);
if (position >= index_by_position.size())
throw Exception("Position out of bound in Block::erase(), max position = "
+ toString(index_by_position.size()), ErrorCodes::POSITION_OUT_OF_BOUND);
+ toString(index_by_position.size() - 1), ErrorCodes::POSITION_OUT_OF_BOUND);
Container_t::iterator it = index_by_position[position];
index_by_name.erase(index_by_name.find(it->name));
@ -177,6 +180,9 @@ void Block::erase(const String & name)
ColumnWithTypeAndName & Block::getByPosition(size_t position)
{
if (index_by_position.empty())
throw Exception("Block is empty", ErrorCodes::POSITION_OUT_OF_BOUND);
if (position >= index_by_position.size())
throw Exception("Position " + toString(position)
+ " is out of bound in Block::getByPosition(), max position = "
@ -189,6 +195,9 @@ ColumnWithTypeAndName & Block::getByPosition(size_t position)
const ColumnWithTypeAndName & Block::getByPosition(size_t position) const
{
if (index_by_position.empty())
throw Exception("Block is empty", ErrorCodes::POSITION_OUT_OF_BOUND);
if (position >= index_by_position.size())
throw Exception("Position " + toString(position)
+ " is out of bound in Block::getByPosition(), max position = "

View File

@ -9,6 +9,7 @@
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/AggregateFunctions/AggregateFunctionCount.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Interpreters/Aggregator.h>
@ -1688,6 +1689,66 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
}
Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
{
if (blocks.empty())
return {};
StringRefs key(keys_size);
ConstColumnPlainPtrs key_columns(keys_size);
AggregateColumnsData aggregate_columns(aggregates_size);
initialize(blocks.front());
/// Каким способом выполнять агрегацию?
for (size_t i = 0; i < keys_size; ++i)
key_columns[i] = sample.getByPosition(i).column;
Sizes key_sizes;
AggregatedDataVariants::Type method = chooseAggregationMethod(key_columns, key_sizes);
/// Временные данные для агрегации.
AggregatedDataVariants result;
/// result будет уничтожать состояния агрегатных функций в деструкторе
result.aggregator = this;
result.init(method);
result.keys_size = keys_size;
result.key_sizes = key_sizes;
LOG_TRACE(log, "Merging partially aggregated blocks.");
for (Block & block : blocks)
{
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
mergeWithoutKeyStreamsImpl(block, result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
else if (result.type != AggregatedDataVariants::Type::without_key)
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
BlocksList merged_block = convertToBlocks(result, final, 1);
if (merged_block.size() > 1) /// TODO overflows
throw Exception("Logical error: temporary result is not single-level", ErrorCodes::LOGICAL_ERROR);
LOG_TRACE(log, "Merged partially aggregated blocks.");
if (merged_block.empty())
return {};
return merged_block.front();
}
template <typename Method>
void NO_INLINE Aggregator::destroyImpl(
Method & method) const
@ -1769,4 +1830,5 @@ void Aggregator::setCancellationHook(const CancellationHook cancellation_hook)
isCancelled = cancellation_hook;
}
}

View File

@ -6,6 +6,7 @@
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
#include <DB/DataStreams/AggregatingBlockInputStream.h>
#include <DB/DataStreams/MergingAggregatedBlockInputStream.h>
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
#include <DB/DataStreams/UnionBlockInputStream.h>
#include <DB/DataStreams/ParallelAggregatingBlockInputStream.h>
@ -856,14 +857,38 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression,
void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool final)
{
/// Склеим несколько источников в один
executeUnion();
/// Теперь объединим агрегированные блоки
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
streams[0] = new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final, original_max_threads);
/** Есть два режима распределённой агрегации.
*
* 1. В разных потоках читать из удалённых серверов блоки.
* Сохранить все блоки в оперативку. Объединить блоки.
* Если агрегация двухуровневая - распараллелить по номерам корзин.
*
* 2. В одном потоке читать по очереди блоки с разных серверов.
* В оперативке хранится только по одному блоку с каждого сервера.
* Если агрегация двухуровневая - последовательно объединяем блоки каждого следующего уровня.
*
* Второй вариант расходует меньше памяти (до 256 раз меньше)
* в случае двухуровневой агрегации, которая используется для больших результатов после GROUP BY,
* но при этом может работать медленнее.
*/
if (!settings.distributed_aggregation_memory_efficient)
{
/// Склеим несколько источников в один, распараллеливая работу.
executeUnion();
/// Теперь объединим агрегированные блоки
streams[0] = new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final, original_max_threads);
}
else
{
streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, key_names, aggregates, overflow_row, final);
streams.resize(1);
}
}