From 262ffcd74b85bccdc13bd18c340fc41ed81e99ca Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Tue, 25 Mar 2014 22:16:26 +0400 Subject: [PATCH] Fixed parametric aggregate functions in totals. [#METR-10566] --- .../AggregateFunctions/AggregateFunctionIf.h | 2 +- .../AggregateFunctionQuantile.h | 4 +- .../AggregateFunctionQuantileTiming.h | 4 +- .../AggregateFunctions/IAggregateFunction.h | 5 ++- dbms/include/DB/Common/AutoArray.h | 20 +++++----- .../DB/DataTypes/DataTypeAggregateFunction.h | 22 +++++++--- dbms/include/DB/Interpreters/Aggregator.h | 1 + .../CollapsingSortedBlockInputStream.cpp | 2 +- .../SummingSortedBlockInputStream.cpp | 2 +- dbms/src/DataTypes/DataTypeFactory.cpp | 40 +++++++++++++++---- dbms/src/Interpreters/Aggregator.cpp | 2 +- dbms/src/Interpreters/ExpressionAnalyzer.cpp | 3 +- .../Interpreters/InterpreterSelectQuery.cpp | 2 +- 13 files changed, 75 insertions(+), 34 deletions(-) diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionIf.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionIf.h index 296331f56bb..c9936106426 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionIf.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionIf.h @@ -47,7 +47,7 @@ public: nested_func->setArguments(nested_arguments); } - void setParameters(const Row & params) + void setParameters(const Array & params) { nested_func->setParameters(params); } diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h index f9eb53d4afa..1c12f2d48bb 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantile.h @@ -56,7 +56,7 @@ public: type = argument; } - void setParameters(const Row & params) + void setParameters(const Array & params) { if (params.size() != 1) throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -130,7 +130,7 @@ public: type = argument; } - void setParameters(const Row & params) + void setParameters(const Array & params) { if (params.empty()) throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); diff --git a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h index 51107b85f83..06ae5222c41 100644 --- a/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h +++ b/dbms/include/DB/AggregateFunctions/AggregateFunctionQuantileTiming.h @@ -511,7 +511,7 @@ public: { } - void setParameters(const Row & params) + void setParameters(const Array & params) { if (params.size() != 1) throw Exception("Aggregate function " + getName() + " requires exactly one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); @@ -570,7 +570,7 @@ public: { } - void setParameters(const Row & params) + void setParameters(const Array & params) { if (params.empty()) throw Exception("Aggregate function " + getName() + " requires at least one parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); diff --git a/dbms/include/DB/AggregateFunctions/IAggregateFunction.h b/dbms/include/DB/AggregateFunctions/IAggregateFunction.h index d0c3653ad83..b6063508d7a 100644 --- a/dbms/include/DB/AggregateFunctions/IAggregateFunction.h +++ b/dbms/include/DB/AggregateFunctions/IAggregateFunction.h @@ -39,9 +39,10 @@ public: * Если параметры не предусмотрены или переданные параметры недопустимы - кинуть исключение. * Если параметры есть - необходимо вызывать перед остальными вызовами, иначе - не вызывать. */ - virtual void setParameters(const Row & params) + virtual void setParameters(const Array & params) { - throw Exception("Aggregate function " + getName() + " doesn't allow parameters.", ErrorCodes::AGGREGATE_FUNCTION_DOESNT_ALLOW_PARAMETERS); + throw Exception("Aggregate function " + getName() + " doesn't allow parameters.", + ErrorCodes::AGGREGATE_FUNCTION_DOESNT_ALLOW_PARAMETERS); } /// Получить тип результата. diff --git a/dbms/include/DB/Common/AutoArray.h b/dbms/include/DB/Common/AutoArray.h index 43f31ad2309..454e430efdd 100644 --- a/dbms/include/DB/Common/AutoArray.h +++ b/dbms/include/DB/Common/AutoArray.h @@ -20,7 +20,7 @@ namespace DB * sizeof равен размеру одного указателя. * * Не exception-safe. - * Копирование и присваивание разрушающее: исходный объект становится пустым. + * Копирование не поддерживается. Перемещение опустошает исходный объект. * То есть, использовать этот массив во многих случаях неудобно. * * Предназначен для ситуаций, в которых создаётся много массивов одинакового небольшого размера, @@ -82,24 +82,24 @@ public: init(size_, dont_init_elems); } - /** Разрушающее копирование. + /** Премещение. */ - AutoArray(const AutoArray & src) + AutoArray(AutoArray && src) { - //std::cerr << this << " AutoArray(const AutoArray & src)" << std::endl; - + if (this == &src) + return; setEmpty(); data = src.data; - const_cast &>(src).setEmpty(); + src.setEmpty(); } - AutoArray & operator= (const AutoArray & src) + AutoArray & operator= (AutoArray && src) { - //std::cerr << this << " operator=(const AutoArray & src)" << std::endl; - + if (this == &src) + return; uninit(); data = src.data; - const_cast &>(src).setEmpty(); + src.setEmpty(); return *this; } diff --git a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h index bc7ffcf8d28..e3d87a4c65c 100644 --- a/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h +++ b/dbms/include/DB/DataTypes/DataTypeAggregateFunction.h @@ -11,19 +11,19 @@ namespace DB using Poco::SharedPtr; /** Тип - состояние агрегатной функции. - * Параметры типа - это агрегатная функция и типы её аргументов. + * Параметры типа - это агрегатная функция, типы её аргументов и её параметры (для параметрических агрегатных функций). */ class DataTypeAggregateFunction : public IDataType { private: AggregateFunctionPtr function; DataTypes argument_types; + Array parameters; public: - DataTypeAggregateFunction(const AggregateFunctionPtr & function_, const DataTypes & argument_types_) - : function(function_), argument_types(argument_types_) + DataTypeAggregateFunction(const AggregateFunctionPtr & function_, const DataTypes & argument_types_, const Array & parameters_) + : function(function_), argument_types(argument_types_), parameters(parameters_) { - function->setArguments(argument_types); } std::string getName() const @@ -31,6 +31,18 @@ public: std::stringstream stream; stream << "AggregateFunction(" << function->getName(); + if (!parameters.empty()) + { + stream << "("; + for (size_t i = 0; i < parameters.size(); ++i) + { + if (i) + stream << ", "; + stream << apply_visitor(DB::FieldVisitorToString(), parameters[i]); + } + stream << ")"; + } + for (DataTypes::const_iterator it = argument_types.begin(); it != argument_types.end(); ++it) stream << ", " << (*it)->getName(); @@ -38,7 +50,7 @@ public: return stream.str(); } - DataTypePtr clone() const { return new DataTypeAggregateFunction(function, argument_types); } + DataTypePtr clone() const { return new DataTypeAggregateFunction(function, argument_types, parameters); } void serializeBinary(const Field & field, WriteBuffer & ostr) const; void deserializeBinary(Field & field, ReadBuffer & istr) const; diff --git a/dbms/include/DB/Interpreters/Aggregator.h b/dbms/include/DB/Interpreters/Aggregator.h index 3cd9464fbab..b324bfbbbb3 100644 --- a/dbms/include/DB/Interpreters/Aggregator.h +++ b/dbms/include/DB/Interpreters/Aggregator.h @@ -27,6 +27,7 @@ namespace DB struct AggregateDescription { AggregateFunctionPtr function; + Array parameters; /// Параметры (параметрической) агрегатной функции. ColumnNumbers arguments; Names argument_names; /// Используются, если arguments не заданы. String column_name; /// Какое имя использовать для столбца со значениями агрегатной функции diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 028da3692e4..2fbdeddf913 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -129,7 +129,7 @@ void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPt /// Запишем данные для предыдущего визита. insertRows(merged_columns, merged_rows); - current_key = next_key; + current_key = std::move(next_key); next_key.resize(description.size()); count_negative = 0; diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index c49f05349f4..48b100049e0 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -90,7 +90,7 @@ void SummingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs insertCurrentRow(merged_columns); } - current_key = next_key; + current_key = std::move(next_key); next_key.resize(description.size()); setRow(current_row, current); diff --git a/dbms/src/DataTypes/DataTypeFactory.cpp b/dbms/src/DataTypes/DataTypeFactory.cpp index b5b812426f5..6fe29be3efe 100644 --- a/dbms/src/DataTypes/DataTypeFactory.cpp +++ b/dbms/src/DataTypes/DataTypeFactory.cpp @@ -21,14 +21,15 @@ #include #include #include +#include namespace DB { DataTypeFactory::DataTypeFactory() - : fixed_string_regexp("^FixedString\\s*\\(\\s*(\\d+)\\s*\\)$"), - nested_regexp("^(\\w+)\\s*\\(\\s*(.+)\\s*\\)$", Poco::RegularExpression::RE_MULTILINE | Poco::RegularExpression::RE_DOTALL) + : fixed_string_regexp(R"--(^FixedString\s*\(\s*(\d+)\s*\)$)--"), + nested_regexp(R"--(^(\w+)\s*\(\s*(.+)\s*\)$)--", Poco::RegularExpression::RE_MULTILINE | Poco::RegularExpression::RE_DOTALL) { boost::assign::insert(non_parametric_data_types) ("UInt8", new DataTypeUInt8) @@ -71,6 +72,7 @@ DataTypePtr DataTypeFactory::get(const String & name) const String function_name; AggregateFunctionPtr function; DataTypes argument_types; + Array params_row; ParserExpressionList args_parser; ASTPtr args_ast; @@ -87,14 +89,38 @@ DataTypePtr DataTypeFactory::get(const String & name) const throw Exception("Data type AggregateFunction requires parameters: " "name of aggregate function and list of data types for arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); - ASTs::iterator it = args_list.children.begin(); - function_name = (*it)->getColumnName(); + if (ASTFunction * parametric = dynamic_cast(&*args_list.children[0])) + { + if (parametric->parameters) + throw Exception("Unexpected level of parameters to aggregate function", ErrorCodes::SYNTAX_ERROR); + function_name = parametric->name; - for (++it; it != args_list.children.end(); ++it) - argument_types.push_back(get((*it)->getColumnName())); + ASTs & parameters = dynamic_cast(*parametric->arguments).children; + params_row.resize(parameters.size()); + + for (size_t i = 0; i < parameters.size(); ++i) + { + ASTLiteral * lit = dynamic_cast(&*parameters[i]); + if (!lit) + throw Exception("Parameters to aggregate functions must be literals", + ErrorCodes::PARAMETERS_TO_AGGREGATE_FUNCTIONS_MUST_BE_LITERALS); + + params_row[i] = lit->value; + } + } + else + { + function_name = args_list.children[0]->getColumnName(); + } + + for (size_t i = 1; i < args_list.children.size(); ++i) + argument_types.push_back(get(args_list.children[i]->getColumnName())); function = AggregateFunctionFactory().get(function_name, argument_types); - return new DataTypeAggregateFunction(function, argument_types); + if (!params_row.empty()) + function->setParameters(params_row); + function->setArguments(argument_types); + return new DataTypeAggregateFunction(function, argument_types, params_row); } if (base_name == "Nested") diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 71a7b14e060..df5c9f58e34 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -86,7 +86,7 @@ void Aggregator::initialize(Block & block) for (size_t j = 0; j < arguments_size; ++j) argument_types[j] = block.getByPosition(aggregates[i].arguments[j]).type; - col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types); + col.type = new DataTypeAggregateFunction(aggregates[i].function, argument_types, aggregates[i].parameters); col.column = new ColumnAggregateFunction(aggregates[i].function); sample.insert(col); diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index 4d06a422031..38aaeabd661 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -1004,7 +1004,7 @@ void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActions & actio if (node->parameters) { ASTs & parameters = dynamic_cast(*node->parameters).children; - Row params_row(parameters.size()); + Array params_row(parameters.size()); for (size_t i = 0; i < parameters.size(); ++i) { @@ -1015,6 +1015,7 @@ void ExpressionAnalyzer::getAggregatesImpl(ASTPtr ast, ExpressionActions & actio params_row[i] = lit->value; } + aggregate.parameters = params_row; aggregate.function->setParameters(params_row); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 518a42c2f39..009ca1ca9a3 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -293,7 +293,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() settings.limits.max_rows_to_group_by && settings.limits.group_by_overflow_mode == OverflowMode::ANY && settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE; - /// Нужно ли после агрегации сразу финализироыать агрегатные функции. + /// Нужно ли после агрегации сразу финализировать агрегатные функции. bool aggregate_final = need_aggregate && to_stage > QueryProcessingStage::WithMergeableState &&