dbms: more scalable aggregator: development [#METR-2944].

This commit is contained in:
Alexey Milovidov 2015-01-02 08:28:21 +03:00
parent a7eacd1269
commit 7cf0bca8af
5 changed files with 76 additions and 62 deletions

View File

@ -20,8 +20,8 @@ class TotalsHavingBlockInputStream : public IProfilingBlockInputStream
public: public:
TotalsHavingBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_, TotalsHavingBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, ExpressionActionsPtr expression_, const AggregateDescriptions & aggregates_, bool overflow_row_, ExpressionActionsPtr expression_,
const std::string & filter_column_, TotalsMode totals_mode_, float auto_include_threshold_) const std::string & filter_column_, TotalsMode totals_mode_, double auto_include_threshold_)
: aggregator(new Aggregator(keys_names_, aggregates_, overflow_row_)), overflow_row(overflow_row_), : overflow_row(overflow_row_),
expression(expression_), filter_column_name(filter_column_), totals_mode(totals_mode_), expression(expression_), filter_column_name(filter_column_), totals_mode(totals_mode_),
auto_include_threshold(auto_include_threshold_), passed_keys(0), total_keys(0) auto_include_threshold(auto_include_threshold_), passed_keys(0), total_keys(0)
{ {
@ -33,7 +33,7 @@ public:
String getID() const override String getID() const override
{ {
std::stringstream res; std::stringstream res;
res << "TotalsHavingBlockInputStream(" << children.back()->getID() << ", " << aggregator->getID() res << "TotalsHavingBlockInputStream(" << children.back()->getID()
<< "," << filter_column_name << ")"; << "," << filter_column_name << ")";
return res.str(); return res.str();
} }
@ -44,24 +44,24 @@ protected:
Block readImpl() override; Block readImpl() override;
private: private:
SharedPtr<Aggregator> aggregator;
bool overflow_row; bool overflow_row;
ExpressionActionsPtr expression; ExpressionActionsPtr expression;
String filter_column_name; String filter_column_name;
TotalsMode totals_mode; TotalsMode totals_mode;
float auto_include_threshold; double auto_include_threshold;
size_t passed_keys; size_t passed_keys;
size_t total_keys; size_t total_keys;
Block current_totals; /** Здесь находятся значения, не прошедшие max_rows_to_group_by.
* Они прибавляются или не прибавляются к current_totals в зависимости от totals_mode.
*/
Block overflow_aggregates; Block overflow_aggregates;
void addToTotals(Block & totals, Block & block, const IColumn::Filter * filter, size_t rows); /// Здесь накапливаются тотальные значения. После окончания работы, они будут помещены в IProfilingBlockInputStream::totals.
Block current_totals;
void addToTotals(Block & totals, Block & block, const IColumn::Filter * filter) /// Если filter == nullptr - прибавлять все строки. Иначе - только строки, проходящие фильтр (HAVING).
{ void addToTotals(Block & totals, Block & block, const IColumn::Filter * filter);
addToTotals(totals, block, filter, block.rows());
}
}; };
} }

View File

@ -27,8 +27,14 @@ const Block & TotalsHavingBlockInputStream::getTotals()
/** Если totals_mode == AFTER_HAVING_AUTO, нужно решить, добавлять ли в TOTALS агрегаты для строк, /** Если totals_mode == AFTER_HAVING_AUTO, нужно решить, добавлять ли в TOTALS агрегаты для строк,
* не прошедших max_rows_to_group_by. * не прошедших max_rows_to_group_by.
*/ */
if (overflow_aggregates && static_cast<float>(passed_keys) / total_keys >= auto_include_threshold) if (overflow_aggregates)
addToTotals(current_totals, overflow_aggregates, nullptr); {
if (totals_mode == TotalsMode::BEFORE_HAVING
|| totals_mode == TotalsMode::AFTER_HAVING_INCLUSIVE
|| (totals_mode == TotalsMode::AFTER_HAVING_AUTO
&& static_cast<double>(passed_keys) / total_keys >= auto_include_threshold))
addToTotals(current_totals, overflow_aggregates, nullptr);
}
finalize(current_totals); finalize(current_totals);
totals = current_totals; totals = current_totals;
@ -50,24 +56,28 @@ Block TotalsHavingBlockInputStream::readImpl()
{ {
block = children[0]->read(); block = children[0]->read();
/// В этом случае, первый блок - блок со значениями, не вошедшими в max_rows_to_group_by. Отложим его.
if (overflow_row && !overflow_aggregates && block)
{
overflow_aggregates = block;
continue;
}
if (!block) if (!block)
return finalized; return finalized;
finalized = block; finalized = block;
finalize(finalized); finalize(finalized);
total_keys += finalized.rows() - (overflow_row ? 1 : 0); total_keys += finalized.rows();
if (filter_column_name.empty() || totals_mode == TotalsMode::BEFORE_HAVING) if (filter_column_name.empty())
{ {
/** Включая особую нулевую строку, если overflow_row == true.
* Предполагается, что если totals_mode == AFTER_HAVING_EXCLUSIVE, нам эту строку не дадут.
*/
addToTotals(current_totals, block, nullptr); addToTotals(current_totals, block, nullptr);
} }
else
if (!filter_column_name.empty())
{ {
/// Вычисляем выражение в HAVING.
expression->execute(finalized); expression->execute(finalized);
size_t filter_column_pos = finalized.getPositionByName(filter_column_name); size_t filter_column_pos = finalized.getPositionByName(filter_column_name);
@ -85,25 +95,13 @@ Block TotalsHavingBlockInputStream::readImpl()
IColumn::Filter & filter = filter_column->getData(); IColumn::Filter & filter = filter_column->getData();
if (totals_mode != TotalsMode::BEFORE_HAVING) /// Прибавляем значения в totals (если это не было сделано ранее).
{ if (totals_mode == TotalsMode::BEFORE_HAVING)
if (overflow_row) addToTotals(current_totals, block, nullptr);
{ else
filter[0] = totals_mode == TotalsMode::AFTER_HAVING_INCLUSIVE; addToTotals(current_totals, block, &filter);
addToTotals(current_totals, block, &filter);
if (totals_mode == TotalsMode::AFTER_HAVING_AUTO)
addToTotals(overflow_aggregates, block, nullptr, 1);
}
else
{
addToTotals(current_totals, block, &filter);
}
}
if (overflow_row)
filter[0] = 0;
/// Фильтруем блок по выражению в HAVING.
size_t columns = finalized.columns(); size_t columns = finalized.columns();
for (size_t i = 0; i < columns; ++i) for (size_t i = 0; i < columns; ++i)
@ -117,19 +115,6 @@ Block TotalsHavingBlockInputStream::readImpl()
} }
} }
} }
else
{
if (overflow_row)
{
/// Придется выбросить одну строку из начала всех столбцов.
size_t columns = finalized.columns();
for (size_t i = 0; i < columns; ++i)
{
ColumnWithNameAndType & current_column = finalized.getByPosition(i);
current_column.column = current_column.column->cut(1, current_column.column->size() - 1);
}
}
}
if (!finalized) if (!finalized)
continue; continue;
@ -139,7 +124,7 @@ Block TotalsHavingBlockInputStream::readImpl()
} }
} }
void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, const IColumn::Filter * filter, size_t rows) void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, const IColumn::Filter * filter)
{ {
bool init = !totals; bool init = !totals;
@ -188,7 +173,7 @@ void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, co
} }
const ColumnAggregateFunction::Container_t & vec = column->getData(); const ColumnAggregateFunction::Container_t & vec = column->getData();
size_t size = std::min(vec.size(), rows); size_t size = vec.size();
if (filter) if (filter)
{ {

View File

@ -611,7 +611,7 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
LOG_TRACE(log, "Aggregation method: " << result.getMethodName()); LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
} }
if (overflow_row && !result.without_key) if ((overflow_row || result.type == AggregatedDataVariants::Type::without_key) && !result.without_key)
{ {
result.without_key = result.aggregates_pool->alloc(total_size_of_aggregate_states); result.without_key = result.aggregates_pool->alloc(total_size_of_aggregate_states);
createAggregateStates(result.without_key); createAggregateStates(result.without_key);
@ -620,11 +620,6 @@ bool Aggregator::executeOnBlock(Block & block, AggregatedDataVariants & result,
if (result.type == AggregatedDataVariants::Type::without_key) if (result.type == AggregatedDataVariants::Type::without_key)
{ {
AggregatedDataWithoutKey & res = result.without_key; AggregatedDataWithoutKey & res = result.without_key;
if (!res)
{
res = result.aggregates_pool->alloc(total_size_of_aggregate_states);
createAggregateStates(res);
}
/// Оптимизация в случае единственной агрегатной функции count. /// Оптимизация в случае единственной агрегатной функции count.
AggregateFunctionCount * agg_count = aggregates_size == 1 AggregateFunctionCount * agg_count = aggregates_size == 1
@ -1001,15 +996,20 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
&& data_variants.isTwoLevel()) /// TODO Использовать общий тред-пул с функцией merge. && data_variants.isTwoLevel()) /// TODO Использовать общий тред-пул с функцией merge.
thread_pool.reset(new boost::threadpool::pool(max_threads)); thread_pool.reset(new boost::threadpool::pool(max_threads));
/** Если требуется выдать overflow_row
* (то есть, блок со значениями, не поместившимися в max_rows_to_group_by),
* то этот блок должен идти первым (на это рассчитывает TotalsHavingBlockInputStream).
*/
if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row) if (data_variants.type == AggregatedDataVariants::Type::without_key || overflow_row)
blocks = prepareBlocksAndFillWithoutKey(data_variants, final); blocks.splice(blocks.end(), prepareBlocksAndFillWithoutKey(data_variants, final));
if (data_variants.type != AggregatedDataVariants::Type::without_key) if (data_variants.type != AggregatedDataVariants::Type::without_key)
{ {
if (!data_variants.isTwoLevel()) if (!data_variants.isTwoLevel())
blocks = prepareBlocksAndFillSingleLevel(data_variants, final); blocks.splice(blocks.end(), prepareBlocksAndFillSingleLevel(data_variants, final));
else else
blocks = prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()); blocks.splice(blocks.end(), prepareBlocksAndFillTwoLevel(data_variants, final, thread_pool.get()));
} }
if (!final) if (!final)

View File

@ -0,0 +1,12 @@
0 1
0 100000
0 1
0 56310
0 1
0 21846
0 1
0 21846

View File

@ -0,0 +1,17 @@
SET max_threads = 1;
SET max_block_size = 65536;
SET max_rows_to_group_by = 65535;
SET group_by_overflow_mode = 'any';
SET totals_mode = 'before_having';
SELECT number, count() FROM (SELECT * FROM system.numbers LIMIT 100000) GROUP BY number WITH TOTALS HAVING number % 3 = 0 ORDER BY number LIMIT 1;
SET totals_mode = 'after_having_inclusive';
SELECT number, count() FROM (SELECT * FROM system.numbers LIMIT 100000) GROUP BY number WITH TOTALS HAVING number % 3 = 0 ORDER BY number LIMIT 1;
SET totals_mode = 'after_having_exclusive';
SELECT number, count() FROM (SELECT * FROM system.numbers LIMIT 100000) GROUP BY number WITH TOTALS HAVING number % 3 = 0 ORDER BY number LIMIT 1;
SET totals_mode = 'after_having_auto';
SET totals_auto_threshold = 0.5;
SELECT number, count() FROM (SELECT * FROM system.numbers LIMIT 100000) GROUP BY number WITH TOTALS HAVING number % 3 = 0 ORDER BY number LIMIT 1;