From 7cf0bca8af682ca1b7dd8a1cf45ad03a991108dd Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 2 Jan 2015 08:28:21 +0300 Subject: [PATCH] dbms: more scalable aggregator: development [#METR-2944]. --- .../TotalsHavingBlockInputStream.h | 22 +++--- .../TotalsHavingBlockInputStream.cpp | 69 ++++++++----------- dbms/src/Interpreters/Aggregator.cpp | 18 ++--- .../00104_totals_having_mode.reference | 12 ++++ .../0_stateless/00104_totals_having_mode.sql | 17 +++++ 5 files changed, 76 insertions(+), 62 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00104_totals_having_mode.reference create mode 100644 dbms/tests/queries/0_stateless/00104_totals_having_mode.sql diff --git a/dbms/include/DB/DataStreams/TotalsHavingBlockInputStream.h b/dbms/include/DB/DataStreams/TotalsHavingBlockInputStream.h index 7c740b6ecb5..1b3ee309c9d 100644 --- a/dbms/include/DB/DataStreams/TotalsHavingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/TotalsHavingBlockInputStream.h @@ -20,8 +20,8 @@ class TotalsHavingBlockInputStream : public IProfilingBlockInputStream public: TotalsHavingBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_, const AggregateDescriptions & aggregates_, bool overflow_row_, ExpressionActionsPtr expression_, - const std::string & filter_column_, TotalsMode totals_mode_, float auto_include_threshold_) - : aggregator(new Aggregator(keys_names_, aggregates_, overflow_row_)), overflow_row(overflow_row_), + const std::string & filter_column_, TotalsMode totals_mode_, double auto_include_threshold_) + : overflow_row(overflow_row_), expression(expression_), filter_column_name(filter_column_), totals_mode(totals_mode_), auto_include_threshold(auto_include_threshold_), passed_keys(0), total_keys(0) { @@ -33,7 +33,7 @@ public: String getID() const override { std::stringstream res; - res << "TotalsHavingBlockInputStream(" << children.back()->getID() << ", " << aggregator->getID() + res << "TotalsHavingBlockInputStream(" << children.back()->getID() << "," << filter_column_name << ")"; return res.str(); } @@ -44,24 +44,24 @@ protected: Block readImpl() override; private: - SharedPtr aggregator; bool overflow_row; ExpressionActionsPtr expression; String filter_column_name; TotalsMode totals_mode; - float auto_include_threshold; + double auto_include_threshold; size_t passed_keys; size_t total_keys; - Block current_totals; + /** Здесь находятся значения, не прошедшие max_rows_to_group_by. + * Они прибавляются или не прибавляются к current_totals в зависимости от totals_mode. + */ Block overflow_aggregates; - void addToTotals(Block & totals, Block & block, const IColumn::Filter * filter, size_t rows); + /// Здесь накапливаются тотальные значения. После окончания работы, они будут помещены в IProfilingBlockInputStream::totals. + Block current_totals; - void addToTotals(Block & totals, Block & block, const IColumn::Filter * filter) - { - addToTotals(totals, block, filter, block.rows()); - } + /// Если filter == nullptr - прибавлять все строки. Иначе - только строки, проходящие фильтр (HAVING). + void addToTotals(Block & totals, Block & block, const IColumn::Filter * filter); }; } diff --git a/dbms/src/DataStreams/TotalsHavingBlockInputStream.cpp b/dbms/src/DataStreams/TotalsHavingBlockInputStream.cpp index 8aa2fd69c9b..c1da94b36cd 100644 --- a/dbms/src/DataStreams/TotalsHavingBlockInputStream.cpp +++ b/dbms/src/DataStreams/TotalsHavingBlockInputStream.cpp @@ -27,8 +27,14 @@ const Block & TotalsHavingBlockInputStream::getTotals() /** Если totals_mode == AFTER_HAVING_AUTO, нужно решить, добавлять ли в TOTALS агрегаты для строк, * не прошедших max_rows_to_group_by. */ - if (overflow_aggregates && static_cast(passed_keys) / total_keys >= auto_include_threshold) - addToTotals(current_totals, overflow_aggregates, nullptr); + if (overflow_aggregates) + { + if (totals_mode == TotalsMode::BEFORE_HAVING + || totals_mode == TotalsMode::AFTER_HAVING_INCLUSIVE + || (totals_mode == TotalsMode::AFTER_HAVING_AUTO + && static_cast(passed_keys) / total_keys >= auto_include_threshold)) + addToTotals(current_totals, overflow_aggregates, nullptr); + } finalize(current_totals); totals = current_totals; @@ -50,24 +56,28 @@ Block TotalsHavingBlockInputStream::readImpl() { block = children[0]->read(); + /// В этом случае, первый блок - блок со значениями, не вошедшими в max_rows_to_group_by. Отложим его. + if (overflow_row && !overflow_aggregates && block) + { + overflow_aggregates = block; + continue; + } + if (!block) return finalized; finalized = block; finalize(finalized); - total_keys += finalized.rows() - (overflow_row ? 1 : 0); + total_keys += finalized.rows(); - if (filter_column_name.empty() || totals_mode == TotalsMode::BEFORE_HAVING) + if (filter_column_name.empty()) { - /** Включая особую нулевую строку, если overflow_row == true. - * Предполагается, что если totals_mode == AFTER_HAVING_EXCLUSIVE, нам эту строку не дадут. - */ addToTotals(current_totals, block, nullptr); } - - if (!filter_column_name.empty()) + else { + /// Вычисляем выражение в HAVING. expression->execute(finalized); size_t filter_column_pos = finalized.getPositionByName(filter_column_name); @@ -85,25 +95,13 @@ Block TotalsHavingBlockInputStream::readImpl() IColumn::Filter & filter = filter_column->getData(); - if (totals_mode != TotalsMode::BEFORE_HAVING) - { - if (overflow_row) - { - filter[0] = totals_mode == TotalsMode::AFTER_HAVING_INCLUSIVE; - addToTotals(current_totals, block, &filter); - - if (totals_mode == TotalsMode::AFTER_HAVING_AUTO) - addToTotals(overflow_aggregates, block, nullptr, 1); - } - else - { - addToTotals(current_totals, block, &filter); - } - } - - if (overflow_row) - filter[0] = 0; + /// Прибавляем значения в totals (если это не было сделано ранее). + if (totals_mode == TotalsMode::BEFORE_HAVING) + addToTotals(current_totals, block, nullptr); + else + addToTotals(current_totals, block, &filter); + /// Фильтруем блок по выражению в HAVING. size_t columns = finalized.columns(); for (size_t i = 0; i < columns; ++i) @@ -117,19 +115,6 @@ Block TotalsHavingBlockInputStream::readImpl() } } } - else - { - if (overflow_row) - { - /// Придется выбросить одну строку из начала всех столбцов. - size_t columns = finalized.columns(); - for (size_t i = 0; i < columns; ++i) - { - ColumnWithNameAndType & current_column = finalized.getByPosition(i); - current_column.column = current_column.column->cut(1, current_column.column->size() - 1); - } - } - } if (!finalized) continue; @@ -139,7 +124,7 @@ Block TotalsHavingBlockInputStream::readImpl() } } -void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, const IColumn::Filter * filter, size_t rows) +void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, const IColumn::Filter * filter) { bool init = !totals; @@ -188,7 +173,7 @@ void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, co } const ColumnAggregateFunction::Container_t & vec = column->getData(); - size_t size = std::min(vec.size(), rows); + size_t size = vec.size(); if (filter) { diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 69836918d5b..65a7536a6e8 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -611,7 +611,7 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, LOG_TRACE(log, "Aggregation method: " << result.getMethodName()); } - if (overflow_row && !result.without_key) + if ((overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key) { result.without_key = result.aggregates_pool->alloc(total_size_of_aggregate_states); createAggregateStates(result.without_key); @@ -620,11 +620,6 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result, if (result.type == AggregatedDataVariants::Type::without_key) { AggregatedDataWithoutKey & res = result.without_key; - if (!res) - { - res = result.aggregates_pool->alloc(total_size_of_aggregate_states); - createAggregateStates(res); - } /// Оптимизация в случае единственной агрегатной функции count. AggregateFunctionCount * agg_count = aggregates_size == 1 @@ -1001,15 +996,20 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b && data_variants.isTwoLevel()) /// TODO Использовать общий тред-пул с функцией merge. thread_pool.reset(new boost::threadpool::pool(max_threads)); + /** Если требуется выдать overflow_row + * (то есть, блок со значениями, не поместившимися в max_rows_to_group_by), + * то этот блок должен идти первым (на это рассчитывает TotalsHavingBlockInputStream). + */ + if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row) - blocks = prepareBlocksAndFillWithoutKey(data_variants, final); + blocks.splice(blocks.end(), prepareBlocksAndFillWithoutKey(data_variants, final)); if (data_variants.type != AggregatedDataVariants::Type::without_key) { if (!data_variants.isTwoLevel()) - blocks = prepareBlocksAndFillSingleLevel(data_variants, final); + blocks.splice(blocks.end(), prepareBlocksAndFillSingleLevel(data_variants, final)); else - blocks = prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()); + blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get())); } if (!final) diff --git a/dbms/tests/queries/0_stateless/00104_totals_having_mode.reference b/dbms/tests/queries/0_stateless/00104_totals_having_mode.reference new file mode 100644 index 00000000000..4777d66c114 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00104_totals_having_mode.reference @@ -0,0 +1,12 @@ +0 1 + +0 100000 +0 1 + +0 56310 +0 1 + +0 21846 +0 1 + +0 21846 diff --git a/dbms/tests/queries/0_stateless/00104_totals_having_mode.sql b/dbms/tests/queries/0_stateless/00104_totals_having_mode.sql new file mode 100644 index 00000000000..96e83b263c3 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00104_totals_having_mode.sql @@ -0,0 +1,17 @@ +SET max_threads = 1; +SET max_block_size = 65536; +SET max_rows_to_group_by = 65535; +SET group_by_overflow_mode = 'any'; + +SET totals_mode = 'before_having'; +SELECT number, count() FROM (SELECT * FROM system.numbers LIMIT 100000) GROUP BY number WITH TOTALS HAVING number % 3 = 0 ORDER BY number LIMIT 1; + +SET totals_mode = 'after_having_inclusive'; +SELECT number, count() FROM (SELECT * FROM system.numbers LIMIT 100000) GROUP BY number WITH TOTALS HAVING number % 3 = 0 ORDER BY number LIMIT 1; + +SET totals_mode = 'after_having_exclusive'; +SELECT number, count() FROM (SELECT * FROM system.numbers LIMIT 100000) GROUP BY number WITH TOTALS HAVING number % 3 = 0 ORDER BY number LIMIT 1; + +SET totals_mode = 'after_having_auto'; +SET totals_auto_threshold = 0.5; +SELECT number, count() FROM (SELECT * FROM system.numbers LIMIT 100000) GROUP BY number WITH TOTALS HAVING number % 3 = 0 ORDER BY number LIMIT 1;