clickhouse: totals modes. [#METR-9365]

This commit is contained in:
Michael Kolupaev 2014-02-27 16:49:21 +04:00
parent 04f0056b0a
commit d83ff9ceb2
8 changed files with 356 additions and 45 deletions

View File

@ -39,12 +39,32 @@ public:
func = func_;
}
AggregateFunctionPtr getAggregateFunction()
{
return func;
}
/// Захватить владение ареной.
void addArena(ArenaPtr arena_)
{
arenas.push_back(arena_);
}
ColumnPtr convertToValues()
{
IAggregateFunction * function = func;
ColumnPtr res = function->getReturnType()->createColumn();
IColumn & column = *res;
res->reserve(data.size());
for (size_t i = 0; i < data.size(); ++i)
{
function->insertResultInto(data[i], column);
}
return res;
}
~ColumnAggregateFunction()
{
if (!func->hasTrivialDestructor())

View File

@ -0,0 +1,65 @@
#pragma once
#include <DB/Interpreters/Aggregator.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Interpreters/ExpressionActions.h>
namespace DB
{
using Poco::SharedPtr;
/** Принимает блоки после группировки, с нефиализированными агрегатными функциями.
* Вычисляет тотальные значения в соответствии с totals_mode.
* Если нужно, вычисляет выражение из HAVING и фильтрует им строки. Отдает финализированные и отфильтрованные блоки.
*/
class TotalsHavingBlockInputStream : public IProfilingBlockInputStream
{
public:
TotalsHavingBlockInputStream(BlockInputStreamPtr input_, const Names & keys_names_,
const AggregateDescriptions & aggregates_, bool overflow_row_, ExpressionActionsPtr expression_,
const std::string & filter_column_, TotalsMode totals_mode_, double auto_include_threshold_)
: aggregator(new Aggregator(keys_names_, aggregates_, overflow_row_)), overflow_row(overflow_row_),
expression(expression_), filter_column_name(filter_column_), totals_mode(totals_mode_),
auto_include_threshold(auto_include_threshold_), passed_keys(0), total_keys(0)
{
children.push_back(input_);
}
String getName() const { return "TotalsHavingBlockInputStream"; }
String getID() const
{
std::stringstream res;
res << "TotalsHavingBlockInputStream(" << children.back()->getID() << ", " << aggregator->getID()
<< "," << filter_column_name << ")";
return res.str();
}
protected:
Block readImpl();
private:
SharedPtr<Aggregator> aggregator;
bool overflow_row;
ExpressionActionsPtr expression;
String filter_column_name;
TotalsMode totals_mode;
double auto_include_threshold;
size_t passed_keys;
size_t total_keys;
Block current_totals;
Block overflow_aggregates;
void addToTotals(Block & totals, Block & block, const IColumn::Filter * filter, size_t rows);
void addToTotals(Block & totals, Block & block, const IColumn::Filter * filter)
{
addToTotals(totals, block, filter, block.rows());
}
};
}

View File

@ -59,8 +59,11 @@ private:
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams);
void executeWhere( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression, bool final);
void executeMergeAggregated( BlockInputStreams & streams);
void executeAggregation( BlockInputStreams & streams, ExpressionActionsPtr expression,
bool overflow_row, bool final);
void executeMergeAggregated( BlockInputStreams & streams, bool overflow_row, bool final);
void executeTotalsAndHaving( BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression,
bool overflow_row);
void executeHaving( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeOuterExpression( BlockInputStreams & streams, ExpressionActionsPtr expression);
void executeOrder( BlockInputStreams & streams);

View File

@ -312,7 +312,7 @@ struct SettingTotalsMode
case TotalsMode::AFTER_HAVING_AUTO: return "after_having_auto";
default:
throw Exception("Unknown TotalsMode enum value: " + toString(value), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
throw Exception("Unknown TotalsMode enum value", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
}

View File

@ -0,0 +1,200 @@
#include <DB/DataStreams/TotalsHavingBlockInputStream.h>
#include <DB/Columns/ColumnAggregateFunction.h>
#include <DB/Columns/ColumnsNumber.h>
namespace DB
{
static void finalize(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithNameAndType & current = block.getByPosition(i);
ColumnAggregateFunction * unfinalized_column = dynamic_cast<ColumnAggregateFunction *>(&*current.column);
if (unfinalized_column)
{
current.type = unfinalized_column->getAggregateFunction()->getReturnType();
current.column = unfinalized_column->convertToValues();
}
}
}
Block TotalsHavingBlockInputStream::readImpl()
{
Block finalized;
Block block;
while (1)
{
block = children[0]->read();
if (!block)
{
/** Если totals_mode==AFTER_HAVING_AUTO, нужно решить, добавлять ли в TOTALS агрегаты для строк,
* не прошедших max_rows_to_group_by.
*/
if (overflow_aggregates && 1. * passed_keys / total_keys >= auto_include_threshold)
addToTotals(current_totals, overflow_aggregates, nullptr);
finalize(current_totals);
totals = current_totals;
return finalized;
}
finalized = block;
finalize(finalized);
total_keys += finalized.rows() - (overflow_row ? 1 : 0);
if (filter_column_name.empty() || totals_mode == TotalsMode::BEFORE_HAVING)
{
/** Включая особую нулевую строку, если overflow_row=true.
* Предполагается, что если totals_mode=AFTER_HAVING_EXCLUSIVE, нам эту строку не дадут.
*/
addToTotals(current_totals, block, nullptr);
}
if (!filter_column_name.empty())
{
expression->execute(finalized);
size_t filter_column_pos = finalized.getPositionByName(filter_column_name);
ColumnPtr filter_column_ptr = finalized.getByPosition(filter_column_pos).column;
ColumnConstUInt8 * column_const = dynamic_cast<ColumnConstUInt8 *>(&*filter_column_ptr);
if (column_const)
filter_column_ptr = column_const->convertToFullColumn();
ColumnUInt8 * filter_column = dynamic_cast<ColumnUInt8 *>(&*filter_column_ptr);
if (!filter_column)
throw Exception("Filter column must have type UInt8, found " +
finalized.getByPosition(filter_column_pos).type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
IColumn::Filter & filter = filter_column->getData();
if (totals_mode != TotalsMode::BEFORE_HAVING)
{
if (overflow_row)
{
filter[0] = totals_mode == TotalsMode::AFTER_HAVING_INCLUSIVE;
addToTotals(current_totals, block, &filter);
if (totals_mode == TotalsMode::AFTER_HAVING_AUTO)
addToTotals(overflow_aggregates, block, nullptr, 1);
}
else
{
addToTotals(current_totals, block, &filter);
}
}
if (overflow_row)
filter[0] = 0;
size_t columns = finalized.columns();
for (size_t i = 0; i < columns; ++i)
{
ColumnWithNameAndType & current_column = finalized.getByPosition(i);
current_column.column = current_column.column->filter(filter);
if (current_column.column->empty())
{
finalized.clear();
break;
}
}
}
else
{
if (overflow_row)
{
/// Придется выбросить одну строку из начала всех столбцов.
size_t columns = finalized.columns();
for (size_t i = 0; i < columns; ++i)
{
ColumnWithNameAndType & current_column = finalized.getByPosition(i);
current_column.column = current_column.column->cut(1, current_column.column->size() - 1);
}
}
}
if (!finalized)
continue;
passed_keys += finalized.rows();
return finalized;
}
}
void TotalsHavingBlockInputStream::addToTotals(Block & totals, Block & block, const IColumn::Filter * filter,
size_t rows)
{
bool init = !totals;
ArenaPtr arena;
if (init)
arena = new Arena;
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithNameAndType & current = block.getByPosition(i);
ColumnAggregateFunction * column =
dynamic_cast<ColumnAggregateFunction *>(&*current.column);
if (!column)
{
if (init)
{
ColumnPtr new_column = current.type->createColumn();
new_column->insertDefault();
totals.insert(ColumnWithNameAndType(new_column, current.type, current.name));
}
continue;
}
ColumnAggregateFunction * target;
IAggregateFunction * function;
AggregateDataPtr data;
if (init)
{
function = column->getAggregateFunction();
target = new ColumnAggregateFunction(column->getAggregateFunction(), Arenas(1, arena));
totals.insert(ColumnWithNameAndType(target, current.type, current.name));
data = arena->alloc(function->sizeOfData());
function->create(data);
target->getData().push_back(data);
}
else
{
target = dynamic_cast<ColumnAggregateFunction *>(&*totals.getByPosition(i).column);
if (!target)
throw Exception("Unexpected type of column: " + totals.getByPosition(i).column->getName(),
ErrorCodes::ILLEGAL_COLUMN);
function = target->getAggregateFunction();
data = target->getData()[0];
}
ColumnAggregateFunction::Container_t & vec = column->getData();
size_t size = std::min(vec.size(), rows);
if (filter)
{
for (size_t j = 0; j < size; ++j)
{
if ((*filter)[j])
function->merge(data, vec[j]);
}
}
else
{
for (size_t j = 0; j < size; ++j)
{
function->merge(data, vec[j]);
}
}
}
}
}

View File

@ -208,7 +208,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
LOG_TRACE(log, "Aggregation method: " << result.getMethodName());
}
if (have_overflow_aggregates && !result.without_key)
if (overflow_row && !result.without_key)
{
result.without_key = result.aggregates_pool->alloc(total_size_of_aggregate_states);
@ -270,7 +270,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
overflow = true;
}
if (overflow && !have_overflow_aggregates)
if (overflow && !overflow_row)
continue;
if (inserted)
@ -318,7 +318,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
overflow = true;
}
if (overflow && !have_overflow_aggregates)
if (overflow && !overflow_row)
continue;
if (inserted)
@ -362,7 +362,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
overflow = true;
}
if (overflow && !have_overflow_aggregates)
if (overflow && !overflow_row)
continue;
if (inserted)
@ -406,7 +406,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
overflow = true;
}
if (overflow && !have_overflow_aggregates)
if (overflow && !overflow_row)
continue;
if (inserted)
@ -446,7 +446,7 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
overflow = true;
}
if (overflow && !have_overflow_aggregates)
if (overflow && !overflow_row)
continue;
if (inserted)
@ -744,7 +744,7 @@ AggregatedDataVariantsPtr Aggregator::merge(ManyAggregatedDataVariants & data_va
throw Exception("Cannot merge different aggregated data variants.", ErrorCodes::CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS);
/// В какой структуре данных агрегированы данные?
if (res->type == AggregatedDataVariants::WITHOUT_KEY || have_overflow_aggregates)
if (res->type == AggregatedDataVariants::WITHOUT_KEY || overflow_row)
{
AggregatedDataWithoutKey & res_data = res->without_key;
AggregatedDataWithoutKey & current_data = current.without_key;
@ -916,7 +916,7 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
result.key_sizes = key_sizes;
}
if (result.type == AggregatedDataVariants::WITHOUT_KEY || have_overflow_aggregates)
if (result.type == AggregatedDataVariants::WITHOUT_KEY || overflow_row)
{
AggregatedDataWithoutKey & res = result.without_key;
if (!res)
@ -932,7 +932,7 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[0]);
}
size_t start_row = have_overflow_aggregates ? 1 : 0;
size_t start_row = overflow_row ? 1 : 0;
if (result.type == AggregatedDataVariants::KEY_64)
{

View File

@ -11,6 +11,7 @@
#include <DB/DataStreams/SplittingAggregatingBlockInputStream.h>
#include <DB/DataStreams/DistinctBlockInputStream.h>
#include <DB/DataStreams/NullBlockInputStream.h>
#include <DB/DataStreams/TotalsHavingBlockInputStream.h>
#include <DB/DataStreams/narrowBlockInputStreams.h>
#include <DB/DataStreams/copyData.h>
@ -300,6 +301,19 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Теперь составим потоки блоков, выполняющие нужные действия.
/// Нужно ли агрегировать в отдельную строку строки, не прошедшие max_rows_to_group_by.
bool aggregate_overflow_row =
need_aggregate &&
query.group_by_with_totals &&
settings.limits.max_rows_to_group_by &&
settings.limits.group_by_overflow_mode == OverflowMode::ANY &&
settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
/// Нужно ли после агрегации сразу финализироыать агрегатные функции.
bool aggregate_final =
need_aggregate &&
to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals;
if (from_stage < QueryProcessingStage::WithMergeableState
&& to_stage >= QueryProcessingStage::WithMergeableState)
{
@ -307,7 +321,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
executeWhere(streams, before_where);
if (need_aggregate)
executeAggregation(streams, before_aggregation, to_stage > QueryProcessingStage::WithMergeableState);
executeAggregation(streams, before_aggregation, aggregate_overflow_row, aggregate_final);
/** Оптимизация - при распределённой обработке запроса,
* если не указаны DISTINCT, GROUP, HAVING, ORDER, но указан LIMIT,
@ -328,14 +342,16 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
{
/// Если нужно объединить агрегированные результаты с нескольких серверов
if (from_stage == QueryProcessingStage::WithMergeableState)
executeMergeAggregated(streams);
executeMergeAggregated(streams, aggregate_overflow_row, aggregate_final);
}
if (has_having)
if (!aggregate_final)
executeTotalsAndHaving(streams, has_having, before_having, aggregate_overflow_row);
else if (has_having)
executeHaving(streams, before_having);
executeOuterExpression(streams, before_order_and_select);
if (has_order_by)
executeOrder(streams);
@ -541,7 +557,7 @@ void InterpreterSelectQuery::executeWhere(BlockInputStreams & streams, Expressio
}
void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, ExpressionActionsPtr expression, bool final)
void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, ExpressionActionsPtr expression, bool overflow_row, bool final)
{
bool is_async = settings.asynchronous && streams.size() <= settings.max_threads;
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
@ -556,49 +572,40 @@ void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, Exp
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
/** Оптимизация для случая, когда есть LIMIT, но нет HAVING и ORDER BY.
* Будем агрегировать по первым попавшимся limit_length + limit_offset ключам.
* NOTE: после этого перестаёт точно работать rows_before_limit_at_least (это нормально).
* NOTE: возможно, неправильно работает, если после GROUP BY делается arrayJoin.
* NOTE: неправильно работает, если источников несколько: один источник может отбросить строки с ключом,
* который другой источника отдаст в ответ, и для этого ключа агрегаты будут не по всем подходящим строкам.
*/
size_t limit_length = 0;
size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset);
if (query.limit_length && !query.having_expression && !query.order_expression_list
&& (!settings.limits.max_rows_to_group_by || limit_length + limit_offset < settings.limits.max_rows_to_group_by))
{
settings.limits.max_rows_to_group_by = limit_length + limit_offset;
settings.limits.group_by_overflow_mode = OverflowMode::ANY;
}
bool separate_totals = to_stage > QueryProcessingStage::WithMergeableState;
/// Если источников несколько, то выполняем параллельную агрегацию
if (streams.size() > 1)
{
if (!settings.use_splitting_aggregator || key_names.empty())
{
stream = maybeAsynchronous(
new ParallelAggregatingBlockInputStream(streams, key_names, aggregates, query.group_by_with_totals, separate_totals, final,
new ParallelAggregatingBlockInputStream(streams, key_names, aggregates, overflow_row, final,
settings.max_threads, settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous);
}
else
{
if (overflow_row)
throw Exception("Splitting aggregator cannot handle queries like this yet. "
"Please change use_splitting_aggregator, remove WITH TOTALS, "
"change group_by_overflow_mode or set totals_mode to AFTER_HAVING_EXCLUSIVE.",
ErrorCodes::NOT_IMPLEMENTED);
stream = maybeAsynchronous(
new SplittingAggregatingBlockInputStream(
new UnionBlockInputStream(streams, settings.max_threads),
key_names, aggregates, settings.max_threads, query.group_by_with_totals, separate_totals, final,
settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous);
}
streams.resize(1);
}
else
stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, key_names, aggregates, query.group_by_with_totals, separate_totals, final,
stream = maybeAsynchronous(new AggregatingBlockInputStream(stream, key_names, aggregates, overflow_row, final,
settings.limits.max_rows_to_group_by, settings.limits.group_by_overflow_mode), settings.asynchronous);
}
void InterpreterSelectQuery::executeMergeAggregated(BlockInputStreams & streams)
void InterpreterSelectQuery::executeMergeAggregated(BlockInputStreams & streams, bool overflow_row, bool final)
{
/// Если объединять нечего
if (streams.size() == 1)
@ -608,13 +615,11 @@ void InterpreterSelectQuery::executeMergeAggregated(BlockInputStreams & streams)
streams[0] = new UnionBlockInputStream(streams, settings.max_threads);
streams.resize(1);
bool separate_totals = to_stage > QueryProcessingStage::WithMergeableState;
/// Теперь объединим агрегированные блоки
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
streams[0] = maybeAsynchronous(new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, query.group_by_with_totals, separate_totals), settings.asynchronous);
streams[0] = maybeAsynchronous(new MergingAggregatedBlockInputStream(streams[0], key_names, aggregates, overflow_row, final), settings.asynchronous);
}
@ -630,6 +635,25 @@ void InterpreterSelectQuery::executeHaving(BlockInputStreams & streams, Expressi
}
void InterpreterSelectQuery::executeTotalsAndHaving(BlockInputStreams & streams, bool has_having,
ExpressionActionsPtr expression, bool overflow_row)
{
if (streams.size() > 1)
{
streams[0] = new UnionBlockInputStream(streams, settings.max_threads);
streams.resize(1);
}
Names key_names;
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
streams[0] = maybeAsynchronous(new TotalsHavingBlockInputStream(
streams[0], key_names, aggregates, overflow_row, expression,
has_having ? query.having_expression->getColumnName() : "", settings.totals_mode, .5),
settings.asynchronous);
}
void InterpreterSelectQuery::executeOuterExpression(BlockInputStreams & streams, ExpressionActionsPtr expression)
{
bool is_async = settings.asynchronous && streams.size() <= settings.max_threads;

View File

@ -423,8 +423,7 @@ void SplittingAggregator::convertToBlockThread(AggregatedDataVariants & data_var
{
try
{
Block totals;
block = convertToBlock(data_variant, false, totals, final);
block = convertToBlock(data_variant, final);
}
catch (...)
{