dbms: SplittingAggregator: added support for max_rows_to_group_by [#METR-2944].

This commit is contained in:
Alexey Milovidov 2013-11-04 00:49:37 +00:00
parent b3a5b32f50
commit 3f2ee82cdc
4 changed files with 84 additions and 17 deletions

View File

@ -14,8 +14,11 @@ class SplittingAggregatingBlockInputStream : public IProfilingBlockInputStream
{
public:
SplittingAggregatingBlockInputStream(
BlockInputStreamPtr input_, const ColumnNumbers & keys_, AggregateDescriptions & aggregates_, size_t threads_, bool final_)
: started(false), final(final_), aggregator(new SplittingAggregator(keys_, aggregates_, threads_)), current_result(results.end())
BlockInputStreamPtr input_, const ColumnNumbers & keys_, AggregateDescriptions & aggregates_, size_t threads_,
bool with_totals_, bool separate_totals_, bool final_, size_t max_rows_to_group_by_, Limits::OverflowMode group_by_overflow_mode_)
: started(false), separate_totals(separate_totals_), final(final_),
aggregator(new SplittingAggregator(keys_, aggregates_, threads_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_)),
current_result(results.end())
{
children.push_back(input_);
}
@ -25,8 +28,11 @@ public:
* Столбцы, соответствующие keys и аргументам агрегатных функций, уже должны быть вычислены.
*/
SplittingAggregatingBlockInputStream(
BlockInputStreamPtr input_, const Names & key_names, const AggregateDescriptions & aggregates, size_t threads_, bool final_)
: started(false), final(final_), aggregator(new SplittingAggregator(key_names, aggregates, threads_)), current_result(results.end())
BlockInputStreamPtr input_, const Names & key_names, const AggregateDescriptions & aggregates, size_t threads_,
bool with_totals_, bool separate_totals_, bool final_, size_t max_rows_to_group_by_, Limits::OverflowMode group_by_overflow_mode_)
: started(false), separate_totals(separate_totals_), final(final_),
aggregator(new SplittingAggregator(key_names, aggregates, threads_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_)),
current_result(results.end())
{
children.push_back(input_);
}
@ -67,6 +73,7 @@ protected:
}
bool started;
bool separate_totals; /// TODO
bool final;
SharedPtr<SplittingAggregator> aggregator;
Blocks results;

View File

@ -24,7 +24,6 @@ namespace DB
* Плохо работает при размере хэш-таблиц больше 2^32 элементов.
*
* TODO:
* - поддержка max_rows_to_group_by;
* - поддержка with_totals;
* - проверить работу при распределённой обработке запроса;
* - починить rows_before_limit_at_least;
@ -34,17 +33,19 @@ namespace DB
class SplittingAggregator : private Aggregator
{
public:
SplittingAggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, size_t threads_)
: Aggregator(keys_, aggregates_, false), threads(threads_), pool(threads),
SplittingAggregator(const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, size_t threads_,
bool with_totals_, size_t max_rows_to_group_by_ = 0, Limits::OverflowMode group_by_overflow_mode_ = Limits::THROW)
: Aggregator(keys_, aggregates_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_), threads(threads_), pool(threads),
log(&Logger::get("SplittingAggregator")), method(AggregatedDataVariants::EMPTY),
key_columns(keys_size), aggregate_columns(aggregates_size), rows(0), src_rows(0), src_bytes(0)
key_columns(keys_size), aggregate_columns(aggregates_size), rows(0), src_rows(0), src_bytes(0), size_of_all_results(0)
{
}
SplittingAggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, size_t threads_)
: Aggregator(key_names_, aggregates_, false), threads(threads_), pool(threads),
SplittingAggregator(const Names & key_names_, const AggregateDescriptions & aggregates_, size_t threads_,
bool with_totals_, size_t max_rows_to_group_by_ = 0, Limits::OverflowMode group_by_overflow_mode_ = Limits::THROW)
: Aggregator(key_names_, aggregates_, with_totals_, max_rows_to_group_by_, group_by_overflow_mode_), threads(threads_), pool(threads),
log(&Logger::get("SplittingAggregator")), method(AggregatedDataVariants::EMPTY),
key_columns(keys_size), aggregate_columns(aggregates_size), rows(0), src_rows(0), src_bytes(0)
key_columns(keys_size), aggregate_columns(aggregates_size), rows(0), src_rows(0), src_bytes(0), size_of_all_results(0)
{
}
@ -88,6 +89,8 @@ private:
UInt128Hash hash_func_128;
StringRefHash hash_func_string;
/// Для более точного контроля max_rows_to_group_by.
size_t size_of_all_results;
void calculateHashesThread(Block & block, size_t begin, size_t end, ExceptionPtr & exception);
void aggregateThread(Block & block, AggregatedDataVariants & result, size_t thread_no, ExceptionPtr & exception);

View File

@ -500,12 +500,15 @@ void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, Exp
if (streams.size() > 1)
{
if (!settings.use_splitting_aggregator || key_names.empty())
stream = maybeAsynchronous(new ParallelAggregatingBlockInputStream(streams, key_names, aggregates, query.group_by_with_totals, separate_totals, final,
stream = maybeAsynchronous(
new ParallelAggregatingBlockInputStream(streams, key_names, aggregates, query.group_by_with_totals, separate_totals, final,
settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous);
else
stream = maybeAsynchronous(
new SplittingAggregatingBlockInputStream(
new UnionBlockInputStream(streams, settings.max_threads), key_names, aggregates, settings.max_threads, final), settings.asynchronous);
new UnionBlockInputStream(streams, settings.max_threads),
key_names, aggregates, settings.max_threads, query.group_by_with_totals, separate_totals, final,
settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous);
streams.resize(1);
}

View File

@ -127,6 +127,11 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
rethrowFirstException(exceptions);
LOG_TRACE(log, "Parallel aggregated in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec.");
/// Проверка ограничений
if (max_rows_to_group_by && size_of_all_results > max_rows_to_group_by && group_by_overflow_mode == Limits::BREAK)
break;
}
/* double elapsed_seconds = watch.elapsedSeconds();
@ -238,6 +243,14 @@ void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants
{
result.aggregator = this;
/** Используется, если есть ограничение на максимальное количество строк при агрегации,
* и если group_by_overflow_mode == ANY.
* В этом случае, новые ключи не добавляются в набор, а производится агрегация только по
* ключам, которые уже успели попасть в набор.
*/
bool no_more_keys = max_rows_to_group_by && size_of_all_results > max_rows_to_group_by;
size_t old_result_size = result.size();
if (method == AggregatedDataVariants::KEY_64)
{
AggregatedDataWithUInt64Key & res = result.key64;
@ -253,7 +266,15 @@ void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants
AggregatedDataWithUInt64Key::iterator it;
bool inserted;
res.emplace(key, it, inserted);
if (!no_more_keys)
res.emplace(key, it, inserted);
else
{
inserted = false;
it = res.find(key);
if (res.end() == it)
continue;
}
if (inserted)
{
@ -281,8 +302,17 @@ void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants
bool inserted;
StringRef ref = string_refs[i];
res.emplace(ref, it, inserted, hashes64[i]);
if (!no_more_keys)
res.emplace(ref, it, inserted, hashes64[i]);
else
{
inserted = false;
it = res.find(ref);
if (res.end() == it)
continue;
}
if (inserted)
{
it->first.data = result.string_pool.insert(ref.data, ref.size);
@ -310,7 +340,15 @@ void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants
bool inserted;
UInt128 key128 = keys128[i];
res.emplace(key128, it, inserted);
if (!no_more_keys)
res.emplace(key128, it, inserted);
else
{
inserted = false;
it = res.find(key128);
if (res.end() == it)
continue;
}
if (inserted)
{
@ -339,7 +377,15 @@ void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants
bool inserted;
UInt128 key128 = hashes128[i];
res.emplace(key128, it, inserted);
if (!no_more_keys)
res.emplace(key128, it, inserted);
else
{
inserted = false;
it = res.find(key128);
if (res.end() == it)
continue;
}
if (inserted)
{
@ -357,6 +403,14 @@ void SplittingAggregator::aggregateThread(Block & block, AggregatedDataVariants
}
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
/// Проверка ограничений.
size_t current_size_of_all_results = __sync_add_and_fetch(&size_of_all_results, result.size() - old_result_size);
if (max_rows_to_group_by && current_size_of_all_results > max_rows_to_group_by && group_by_overflow_mode == Limits::THROW)
throw Exception("Limit for rows to GROUP BY exceeded: has " + toString(current_size_of_all_results)
+ " rows, maximum: " + toString(max_rows_to_group_by),
ErrorCodes::TOO_MUCH_ROWS);
}
catch (...)
{