mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
dbms: added memory-efficient mode of distributed aggregation [#METR-17536].
This commit is contained in:
parent
5ee37fd3db
commit
5b0b5dc9cf
@ -0,0 +1,110 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/Interpreters/Aggregator.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
/** Доагрегирует потоки блоков, держа в оперативной памяти только по одному блоку из каждого потока.
|
||||
* Это экономит оперативку в случае использования двухуровневой агрегации, где в каждом потоке будет до 256 блоков с частями результата.
|
||||
*
|
||||
* Агрегатные функции в блоках не должны быть финализированы, чтобы их состояния можно было объединить.
|
||||
*/
|
||||
class MergingAggregatedMemoryEfficientBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
MergingAggregatedMemoryEfficientBlockInputStream(BlockInputStreams inputs_, const Names & keys_names_,
|
||||
const AggregateDescriptions & aggregates_, bool overflow_row_, bool final_)
|
||||
: aggregator(keys_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0),
|
||||
final(final_)
|
||||
{
|
||||
children = inputs_;
|
||||
current_blocks.resize(children.size());
|
||||
overflow_blocks.resize(children.size());
|
||||
is_exhausted.resize(children.size());
|
||||
}
|
||||
|
||||
String getName() const override { return "MergingAggregatedMemorySavvy"; }
|
||||
|
||||
String getID() const override
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "MergingAggregatedMemorySavvy(" << aggregator.getID();
|
||||
for (size_t i = 0, size = children.size(); i < size; ++i)
|
||||
res << ", " << children.back()->getID();
|
||||
res << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
protected:
|
||||
Block readImpl() override
|
||||
{
|
||||
/// Если child - RemoteBlockInputStream, то отправляет запрос на все удалённые серверы, инициируя вычисления.
|
||||
if (current_bucket_num == -1)
|
||||
for (auto & child : children)
|
||||
child->readPrefix();
|
||||
|
||||
/// Всё прочитали.
|
||||
if (current_bucket_num > 255)
|
||||
return {};
|
||||
|
||||
/// Читаем следующие блоки для current_bucket_num
|
||||
for (size_t i = 0, size = children.size(); i < size; ++i)
|
||||
{
|
||||
while (!is_exhausted[i] && (!current_blocks[i] || current_blocks[i].info.bucket_num < current_bucket_num))
|
||||
{
|
||||
current_blocks[i] = children[i]->read();
|
||||
|
||||
if (!current_blocks[i])
|
||||
{
|
||||
is_exhausted[i] = true;
|
||||
}
|
||||
else if (current_blocks[i].info.is_overflows)
|
||||
{
|
||||
overflow_blocks[i].swap(current_blocks[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Может быть, нет блоков для current_bucket_num, а все блоки имеют больший bucket_num.
|
||||
Int32 min_bucket_num = 256;
|
||||
for (size_t i = 0, size = children.size(); i < size; ++i)
|
||||
if (!is_exhausted[i] && current_blocks[i].info.bucket_num < min_bucket_num)
|
||||
min_bucket_num = current_blocks[i].info.bucket_num;
|
||||
|
||||
current_bucket_num = min_bucket_num;
|
||||
|
||||
/// Все потоки исчерпаны.
|
||||
if (current_bucket_num > 255)
|
||||
return {}; /// TODO overflow_blocks.
|
||||
|
||||
/// TODO Если есть single_level и two_level блоки.
|
||||
|
||||
/// Объединяем все блоки с current_bucket_num.
|
||||
|
||||
BlocksList blocks_to_merge;
|
||||
for (size_t i = 0, size = children.size(); i < size; ++i)
|
||||
if (current_blocks[i].info.bucket_num == current_bucket_num)
|
||||
blocks_to_merge.emplace_back(std::move(current_blocks[i]));
|
||||
|
||||
Block res = aggregator.mergeBlocks(blocks_to_merge, final);
|
||||
|
||||
++current_bucket_num;
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
Aggregator aggregator;
|
||||
bool final;
|
||||
|
||||
Int32 current_bucket_num = -1;
|
||||
std::vector<Block> current_blocks;
|
||||
std::vector<UInt8> is_exhausted;
|
||||
|
||||
std::vector<Block> overflow_blocks;
|
||||
};
|
||||
|
||||
}
|
@ -111,6 +111,14 @@ public:
|
||||
parallel_replicas->disconnect();
|
||||
}
|
||||
|
||||
|
||||
/// Отправляет запрос (инициирует вычисления) раньше, чем read.
|
||||
void readPrefix() override
|
||||
{
|
||||
if (!sent_query)
|
||||
sendQuery();
|
||||
}
|
||||
|
||||
protected:
|
||||
/// Отправить на удаленные реплики все временные таблицы
|
||||
void sendExternalTables()
|
||||
@ -147,19 +155,10 @@ protected:
|
||||
{
|
||||
if (!sent_query)
|
||||
{
|
||||
createParallelReplicas();
|
||||
sendQuery();
|
||||
|
||||
if (settings.skip_unavailable_shards && 0 == parallel_replicas->size())
|
||||
return Block();
|
||||
|
||||
established = true;
|
||||
|
||||
parallel_replicas->sendQuery(query, "", stage, true);
|
||||
|
||||
established = false;
|
||||
sent_query = true;
|
||||
|
||||
sendExternalTables();
|
||||
return {};
|
||||
}
|
||||
|
||||
while (true)
|
||||
@ -280,6 +279,23 @@ protected:
|
||||
}
|
||||
|
||||
private:
|
||||
void sendQuery()
|
||||
{
|
||||
createParallelReplicas();
|
||||
|
||||
if (settings.skip_unavailable_shards && 0 == parallel_replicas->size())
|
||||
return;
|
||||
|
||||
established = true;
|
||||
|
||||
parallel_replicas->sendQuery(query, "", stage, true);
|
||||
|
||||
established = false;
|
||||
sent_query = true;
|
||||
|
||||
sendExternalTables();
|
||||
}
|
||||
|
||||
/// ITable::read requires a Context, therefore we should create one if the user can't supply it
|
||||
static Context & getDefaultContext()
|
||||
{
|
||||
|
@ -674,17 +674,6 @@ typedef SharedPtr<AggregatedDataVariants> AggregatedDataVariantsPtr;
|
||||
typedef std::vector<AggregatedDataVariantsPtr> ManyAggregatedDataVariants;
|
||||
|
||||
|
||||
/** Достать вариант агрегации по его типу. */
|
||||
template <typename Method> Method & getDataVariant(AggregatedDataVariants & variants);
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant<decltype(AggregatedDataVariants::NAME)::element_type>(AggregatedDataVariants & variants) { return *variants.NAME; }
|
||||
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
|
||||
#undef M
|
||||
|
||||
|
||||
/** Агрегирует источник блоков.
|
||||
*/
|
||||
class Aggregator
|
||||
@ -733,11 +722,15 @@ public:
|
||||
*/
|
||||
AggregatedDataVariantsPtr merge(ManyAggregatedDataVariants & data_variants, size_t max_threads);
|
||||
|
||||
/** Объединить несколько агрегированных блоков в одну структуру данных.
|
||||
/** Объединить поток частично агрегированных блоков в одну структуру данных.
|
||||
* (Доагрегировать несколько блоков, которые представляют собой результат независимых агрегаций с удалённых серверов.)
|
||||
*/
|
||||
void mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants & result, size_t max_threads);
|
||||
|
||||
/** Объединить несколько частично агрегированных блоков в один.
|
||||
*/
|
||||
Block mergeBlocks(BlocksList & blocks, bool final);
|
||||
|
||||
using CancellationHook = std::function<bool()>;
|
||||
|
||||
/** Установить функцию, которая проверяет, можно ли прервать текущую задачу.
|
||||
@ -974,4 +967,15 @@ protected:
|
||||
};
|
||||
|
||||
|
||||
/** Достать вариант агрегации по его типу. */
|
||||
template <typename Method> Method & getDataVariant(AggregatedDataVariants & variants);
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant<decltype(AggregatedDataVariants::NAME)::element_type>(AggregatedDataVariants & variants) { return *variants.NAME; }
|
||||
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
|
||||
#undef M
|
||||
|
||||
|
||||
}
|
||||
|
@ -91,6 +91,8 @@ struct Settings
|
||||
M(SettingUInt64, min_count_to_compile, 3) \
|
||||
/** При каком количестве ключей, начинает использоваться двухуровневая агрегация. 0 - никогда не использовать. */ \
|
||||
M(SettingUInt64, group_by_two_level_threshold, 100000) \
|
||||
/** Включён ли экономный по памяти режим распределённой агрегации. */ \
|
||||
M(SettingBool, distributed_aggregation_memory_efficient, false) \
|
||||
\
|
||||
/** Максимальное количество используемых реплик каждого шарда при выполнении запроса */ \
|
||||
M(SettingUInt64, max_parallel_replicas, 1) \
|
||||
|
@ -141,9 +141,12 @@ void Block::insertUnique(const ColumnWithTypeAndName & elem)
|
||||
|
||||
void Block::erase(size_t position)
|
||||
{
|
||||
if (index_by_position.empty())
|
||||
throw Exception("Block is empty", ErrorCodes::POSITION_OUT_OF_BOUND);
|
||||
|
||||
if (position >= index_by_position.size())
|
||||
throw Exception("Position out of bound in Block::erase(), max position = "
|
||||
+ toString(index_by_position.size()), ErrorCodes::POSITION_OUT_OF_BOUND);
|
||||
+ toString(index_by_position.size() - 1), ErrorCodes::POSITION_OUT_OF_BOUND);
|
||||
|
||||
Container_t::iterator it = index_by_position[position];
|
||||
index_by_name.erase(index_by_name.find(it->name));
|
||||
@ -177,6 +180,9 @@ void Block::erase(const String & name)
|
||||
|
||||
ColumnWithTypeAndName & Block::getByPosition(size_t position)
|
||||
{
|
||||
if (index_by_position.empty())
|
||||
throw Exception("Block is empty", ErrorCodes::POSITION_OUT_OF_BOUND);
|
||||
|
||||
if (position >= index_by_position.size())
|
||||
throw Exception("Position " + toString(position)
|
||||
+ " is out of bound in Block::getByPosition(), max position = "
|
||||
@ -189,6 +195,9 @@ ColumnWithTypeAndName & Block::getByPosition(size_t position)
|
||||
|
||||
const ColumnWithTypeAndName & Block::getByPosition(size_t position) const
|
||||
{
|
||||
if (index_by_position.empty())
|
||||
throw Exception("Block is empty", ErrorCodes::POSITION_OUT_OF_BOUND);
|
||||
|
||||
if (position >= index_by_position.size())
|
||||
throw Exception("Position " + toString(position)
|
||||
+ " is out of bound in Block::getByPosition(), max position = "
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <DB/DataTypes/DataTypeAggregateFunction.h>
|
||||
#include <DB/Columns/ColumnsNumber.h>
|
||||
#include <DB/AggregateFunctions/AggregateFunctionCount.h>
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
|
||||
#include <DB/Interpreters/Aggregator.h>
|
||||
|
||||
@ -1688,6 +1689,66 @@ void Aggregator::mergeStream(BlockInputStreamPtr stream, AggregatedDataVariants
|
||||
}
|
||||
|
||||
|
||||
Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
|
||||
{
|
||||
if (blocks.empty())
|
||||
return {};
|
||||
|
||||
StringRefs key(keys_size);
|
||||
ConstColumnPlainPtrs key_columns(keys_size);
|
||||
|
||||
AggregateColumnsData aggregate_columns(aggregates_size);
|
||||
|
||||
initialize(blocks.front());
|
||||
|
||||
/// Каким способом выполнять агрегацию?
|
||||
for (size_t i = 0; i < keys_size; ++i)
|
||||
key_columns[i] = sample.getByPosition(i).column;
|
||||
|
||||
Sizes key_sizes;
|
||||
AggregatedDataVariants::Type method = chooseAggregationMethod(key_columns, key_sizes);
|
||||
|
||||
/// Временные данные для агрегации.
|
||||
AggregatedDataVariants result;
|
||||
|
||||
/// result будет уничтожать состояния агрегатных функций в деструкторе
|
||||
result.aggregator = this;
|
||||
|
||||
result.init(method);
|
||||
result.keys_size = keys_size;
|
||||
result.key_sizes = key_sizes;
|
||||
|
||||
LOG_TRACE(log, "Merging partially aggregated blocks.");
|
||||
|
||||
for (Block & block : blocks)
|
||||
{
|
||||
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
|
||||
mergeWithoutKeyStreamsImpl(block, result);
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
else if (result.type == AggregatedDataVariants::Type::NAME) \
|
||||
mergeStreamsImpl(block, key_sizes, result.aggregates_pool, *result.NAME, result.NAME->data);
|
||||
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
else if (result.type != AggregatedDataVariants::Type::without_key)
|
||||
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
||||
}
|
||||
|
||||
BlocksList merged_block = convertToBlocks(result, final, 1);
|
||||
|
||||
if (merged_block.size() > 1) /// TODO overflows
|
||||
throw Exception("Logical error: temporary result is not single-level", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
LOG_TRACE(log, "Merged partially aggregated blocks.");
|
||||
|
||||
if (merged_block.empty())
|
||||
return {};
|
||||
|
||||
return merged_block.front();
|
||||
}
|
||||
|
||||
|
||||
template <typename Method>
|
||||
void NO_INLINE Aggregator::destroyImpl(
|
||||
Method & method) const
|
||||
@ -1769,4 +1830,5 @@ void Aggregator::setCancellationHook(const CancellationHook cancellation_hook)
|
||||
isCancelled = cancellation_hook;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
|
||||
#include <DB/DataStreams/AggregatingBlockInputStream.h>
|
||||
#include <DB/DataStreams/MergingAggregatedBlockInputStream.h>
|
||||
#include <DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
|
||||
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
|
||||
#include <DB/DataStreams/UnionBlockInputStream.h>
|
||||
#include <DB/DataStreams/ParallelAggregatingBlockInputStream.h>
|
||||
@ -856,14 +857,38 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression,
|
||||
|
||||
void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool final)
|
||||
{
|
||||
/// Склеим несколько источников в один
|
||||
executeUnion();
|
||||
|
||||
/// Теперь объединим агрегированные блоки
|
||||
Names key_names;
|
||||
AggregateDescriptions aggregates;
|
||||
query_analyzer->getAggregateInfo(key_names, aggregates);
|
||||
streams[0] = new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final, original_max_threads);
|
||||
|
||||
/** Есть два режима распределённой агрегации.
|
||||
*
|
||||
* 1. В разных потоках читать из удалённых серверов блоки.
|
||||
* Сохранить все блоки в оперативку. Объединить блоки.
|
||||
* Если агрегация двухуровневая - распараллелить по номерам корзин.
|
||||
*
|
||||
* 2. В одном потоке читать по очереди блоки с разных серверов.
|
||||
* В оперативке хранится только по одному блоку с каждого сервера.
|
||||
* Если агрегация двухуровневая - последовательно объединяем блоки каждого следующего уровня.
|
||||
*
|
||||
* Второй вариант расходует меньше памяти (до 256 раз меньше)
|
||||
* в случае двухуровневой агрегации, которая используется для больших результатов после GROUP BY,
|
||||
* но при этом может работать медленнее.
|
||||
*/
|
||||
|
||||
if (!settings.distributed_aggregation_memory_efficient)
|
||||
{
|
||||
/// Склеим несколько источников в один, распараллеливая работу.
|
||||
executeUnion();
|
||||
|
||||
/// Теперь объединим агрегированные блоки
|
||||
streams[0] = new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final, original_max_threads);
|
||||
}
|
||||
else
|
||||
{
|
||||
streams[0] = new MergingAggregatedMemoryEfficientBlockInputStream(streams, key_names, aggregates, overflow_row, final);
|
||||
streams.resize(1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user