Fixed parametric aggregate functions in totals. [#METR-10566]

This commit is contained in:
Michael Kolupaev 2014-03-25 22:16:26 +04:00
parent 2d08cf828c
commit 262ffcd74b
13 changed files with 75 additions and 34 deletions

View File

@ -47,7 +47,7 @@ public:
nested_func->setArguments(nested_arguments); nested_func->setArguments(nested_arguments);
} }
void setParameters(const Row & params) void setParameters(const Array & params)
{ {
nested_func->setParameters(params); nested_func->setParameters(params);
} }

View File

@ -56,7 +56,7 @@ public:
type = argument; type = argument;
} }
void setParameters(const Row & params) void setParameters(const Array & params)
{ {
if (params.size() != 1) if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -130,7 +130,7 @@ public:
type = argument; type = argument;
} }
void setParameters(const Row & params) void setParameters(const Array & params)
{ {
if (params.empty()) if (params.empty())
throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

View File

@ -511,7 +511,7 @@ public:
{ {
} }
void setParameters(const Row & params) void setParameters(const Array & params)
{ {
if (params.size() != 1) if (params.size() != 1)
throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -570,7 +570,7 @@ public:
{ {
} }
void setParameters(const Row & params) void setParameters(const Array & params)
{ {
if (params.empty()) if (params.empty())
throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

View File

@ -39,9 +39,10 @@ public:
* Если параметры не предусмотрены или переданные параметры недопустимы - кинуть исключение. * Если параметры не предусмотрены или переданные параметры недопустимы - кинуть исключение.
* Если параметры есть - необходимо вызывать перед остальными вызовами, иначе - не вызывать. * Если параметры есть - необходимо вызывать перед остальными вызовами, иначе - не вызывать.
*/ */
virtual void setParameters(const Row & params) virtual void setParameters(const Array & params)
{ {
throw Exception("Aggregate function " + getName() + " doesn't allow parameters.", ErrorCodes::AGGREGATE_FUNCTION_DOESNT_ALLOW_PARAMETERS); throw Exception("Aggregate function " + getName() + " doesn't allow parameters.",
ErrorCodes::AGGREGATE_FUNCTION_DOESNT_ALLOW_PARAMETERS);
} }
/// Получить тип результата. /// Получить тип результата.

View File

@ -20,7 +20,7 @@ namespace DB
* sizeof равен размеру одного указателя. * sizeof равен размеру одного указателя.
* *
* Не exception-safe. * Не exception-safe.
* Копирование и присваивание разрушающее: исходный объект становится пустым. * Копирование не поддерживается. Перемещение опустошает исходный объект.
* То есть, использовать этот массив во многих случаях неудобно. * То есть, использовать этот массив во многих случаях неудобно.
* *
* Предназначен для ситуаций, в которых создаётся много массивов одинакового небольшого размера, * Предназначен для ситуаций, в которых создаётся много массивов одинакового небольшого размера,
@ -82,24 +82,24 @@ public:
init(size_, dont_init_elems); init(size_, dont_init_elems);
} }
/** Разрушающее копирование. /** Премещение.
*/ */
AutoArray(const AutoArray & src) AutoArray(AutoArray && src)
{ {
//std::cerr << this << " AutoArray(const AutoArray & src)" << std::endl; if (this == &src)
return;
setEmpty(); setEmpty();
data = src.data; data = src.data;
const_cast<AutoArray<T> &>(src).setEmpty(); src.setEmpty();
} }
AutoArray & operator= (const AutoArray & src) AutoArray & operator= (AutoArray && src)
{ {
//std::cerr << this << " operator=(const AutoArray & src)" << std::endl; if (this == &src)
return;
uninit(); uninit();
data = src.data; data = src.data;
const_cast<AutoArray<T> &>(src).setEmpty(); src.setEmpty();
return *this; return *this;
} }

View File

@ -11,19 +11,19 @@ namespace DB
using Poco::SharedPtr; using Poco::SharedPtr;
/** Тип - состояние агрегатной функции. /** Тип - состояние агрегатной функции.
* Параметры типа - это агрегатная функция и типы её аргументов. * Параметры типа - это агрегатная функция, типы её аргументов и её параметры (для параметрических агрегатных функций).
*/ */
class DataTypeAggregateFunction : public IDataType class DataTypeAggregateFunction : public IDataType
{ {
private: private:
AggregateFunctionPtr function; AggregateFunctionPtr function;
DataTypes argument_types; DataTypes argument_types;
Array parameters;
public: public:
DataTypeAggregateFunction(const AggregateFunctionPtr & function_, const DataTypes & argument_types_) DataTypeAggregateFunction(const AggregateFunctionPtr & function_, const DataTypes & argument_types_, const Array & parameters_)
: function(function_), argument_types(argument_types_) : function(function_), argument_types(argument_types_), parameters(parameters_)
{ {
function->setArguments(argument_types);
} }
std::string getName() const std::string getName() const
@ -31,6 +31,18 @@ public:
std::stringstream stream; std::stringstream stream;
stream << "AggregateFunction(" << function->getName(); stream << "AggregateFunction(" << function->getName();
if (!parameters.empty())
{
stream << "(";
for (size_t i = 0; i < parameters.size(); ++i)
{
if (i)
stream << ", ";
stream << apply_visitor(DB::FieldVisitorToString(), parameters[i]);
}
stream << ")";
}
for (DataTypes::const_iterator it = argument_types.begin(); it != argument_types.end(); ++it) for (DataTypes::const_iterator it = argument_types.begin(); it != argument_types.end(); ++it)
stream << ", " << (*it)->getName(); stream << ", " << (*it)->getName();
@ -38,7 +50,7 @@ public:
return stream.str(); return stream.str();
} }
DataTypePtr clone() const { return new DataTypeAggregateFunction(function, argument_types); } DataTypePtr clone() const { return new DataTypeAggregateFunction(function, argument_types, parameters); }
void serializeBinary(const Field & field, WriteBuffer & ostr) const; void serializeBinary(const Field & field, WriteBuffer & ostr) const;
void deserializeBinary(Field & field, ReadBuffer & istr) const; void deserializeBinary(Field & field, ReadBuffer & istr) const;

View File

@ -27,6 +27,7 @@ namespace DB
struct AggregateDescription struct AggregateDescription
{ {
AggregateFunctionPtr function; AggregateFunctionPtr function;
Array parameters; /// Параметры (параметрической) агрегатной функции.
ColumnNumbers arguments; ColumnNumbers arguments;
Names argument_names; /// Используются, если arguments не заданы. Names argument_names; /// Используются, если arguments не заданы.
String column_name; /// Какое имя использовать для столбца со значениями агрегатной функции String column_name; /// Какое имя использовать для столбца со значениями агрегатной функции

View File

@ -129,7 +129,7 @@ void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPt
/// Запишем данные для предыдущего визита. /// Запишем данные для предыдущего визита.
insertRows(merged_columns, merged_rows); insertRows(merged_columns, merged_rows);
current_key = next_key; current_key = std::move(next_key);
next_key.resize(description.size()); next_key.resize(description.size());
count_negative = 0; count_negative = 0;

View File

@ -90,7 +90,7 @@ void SummingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs
insertCurrentRow(merged_columns); insertCurrentRow(merged_columns);
} }
current_key = next_key; current_key = std::move(next_key);
next_key.resize(description.size()); next_key.resize(description.size());
setRow(current_row, current); setRow(current_row, current);

View File

@ -21,14 +21,15 @@
#include <DB/Parsers/ParserCreateQuery.h> #include <DB/Parsers/ParserCreateQuery.h>
#include <DB/Parsers/ASTExpressionList.h> #include <DB/Parsers/ASTExpressionList.h>
#include <DB/Parsers/ASTNameTypePair.h> #include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Parsers/ASTLiteral.h>
namespace DB namespace DB
{ {
DataTypeFactory::DataTypeFactory() DataTypeFactory::DataTypeFactory()
: fixed_string_regexp("^FixedString\\s*\\(\\s*(\\d+)\\s*\\)$"), : fixed_string_regexp(R"--(^FixedString\s*\(\s*(\d+)\s*\)$)--"),
nested_regexp("^(\\w+)\\s*\\(\\s*(.+)\\s*\\)$", Poco::RegularExpression::RE_MULTILINE | Poco::RegularExpression::RE_DOTALL) nested_regexp(R"--(^(\w+)\s*\(\s*(.+)\s*\)$)--", Poco::RegularExpression::RE_MULTILINE | Poco::RegularExpression::RE_DOTALL)
{ {
boost::assign::insert(non_parametric_data_types) boost::assign::insert(non_parametric_data_types)
("UInt8", new DataTypeUInt8) ("UInt8", new DataTypeUInt8)
@ -71,6 +72,7 @@ DataTypePtr DataTypeFactory::get(const String & name) const
String function_name; String function_name;
AggregateFunctionPtr function; AggregateFunctionPtr function;
DataTypes argument_types; DataTypes argument_types;
Array params_row;
ParserExpressionList args_parser; ParserExpressionList args_parser;
ASTPtr args_ast; ASTPtr args_ast;
@ -87,14 +89,38 @@ DataTypePtr DataTypeFactory::get(const String & name) const
throw Exception("Data type AggregateFunction requires parameters: " throw Exception("Data type AggregateFunction requires parameters: "
"name of aggregate function and list of data types for arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); "name of aggregate function and list of data types for arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs::iterator it = args_list.children.begin(); if (ASTFunction * parametric = dynamic_cast<ASTFunction *>(&*args_list.children[0]))
function_name = (*it)->getColumnName(); {
if (parametric->parameters)
throw Exception("Unexpected level of parameters to aggregate function", ErrorCodes::SYNTAX_ERROR);
function_name = parametric->name;
for (++it; it != args_list.children.end(); ++it) ASTs & parameters = dynamic_cast<ASTExpressionList &>(*parametric->arguments).children;
argument_types.push_back(get((*it)->getColumnName())); params_row.resize(parameters.size());
for (size_t i = 0; i < parameters.size(); ++i)
{
ASTLiteral * lit = dynamic_cast<ASTLiteral *>(&*parameters[i]);
if (!lit)
throw Exception("Parameters to aggregate functions must be literals",
ErrorCodes::PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS);
params_row[i] = lit->value;
}
}
else
{
function_name = args_list.children[0]->getColumnName();
}
for (size_t i = 1; i < args_list.children.size(); ++i)
argument_types.push_back(get(args_list.children[i]->getColumnName()));
function = AggregateFunctionFactory().get(function_name, argument_types); function = AggregateFunctionFactory().get(function_name, argument_types);
return new DataTypeAggregateFunction(function, argument_types); if (!params_row.empty())
function->setParameters(params_row);
function->setArguments(argument_types);
return new DataTypeAggregateFunction(function, argument_types, params_row);
} }
if (base_name == "Nested") if (base_name == "Nested")

View File

@ -86,7 +86,7 @@ void Aggregator::initialize(Block & block)
for (size_t j = 0; j < arguments_size; ++j) for (size_t j = 0; j < arguments_size; ++j)
argument_types[j] = block.getByPosition(aggregates[i].arguments[j]).type; argument_types[j] = block.getByPosition(aggregates[i].arguments[j]).type;
col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types); col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types, aggregates[i].parameters);
col.column = new ColumnAggregateFunction(aggregates[i].function); col.column = new ColumnAggregateFunction(aggregates[i].function);
sample.insert(col); sample.insert(col);

View File

@ -1004,7 +1004,7 @@ void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActions & actio
if (node->parameters) if (node->parameters)
{ {
ASTs & parameters = dynamic_cast<ASTExpressionList &>(*node->parameters).children; ASTs & parameters = dynamic_cast<ASTExpressionList &>(*node->parameters).children;
Row params_row(parameters.size()); Array params_row(parameters.size());
for (size_t i = 0; i < parameters.size(); ++i) for (size_t i = 0; i < parameters.size(); ++i)
{ {
@ -1015,6 +1015,7 @@ void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActions & actio
params_row[i] = lit->value; params_row[i] = lit->value;
} }
aggregate.parameters = params_row;
aggregate.function->setParameters(params_row); aggregate.function->setParameters(params_row);
} }

View File

@ -293,7 +293,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
settings.limits.max_rows_to_group_by && settings.limits.max_rows_to_group_by &&
settings.limits.group_by_overflow_mode == OverflowMode::ANY && settings.limits.group_by_overflow_mode == OverflowMode::ANY &&
settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE; settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE;
/// Нужно ли после агрегации сразу финализироыать агрегатные функции. /// Нужно ли после агрегации сразу финализировать агрегатные функции.
bool aggregate_final = bool aggregate_final =
need_aggregate && need_aggregate &&
to_stage > QueryProcessingStage::WithMergeableState && to_stage > QueryProcessingStage::WithMergeableState &&