This commit is contained in:
Sergey Lazarev 2014-05-21 20:14:06 +04:00
commit f4e0b3fd85
8 changed files with 324 additions and 38 deletions

View File

@ -0,0 +1,111 @@
#pragma once
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/AggregateFunctions/IAggregateFunction.h>
#include <DB/IO/ReadBufferFromString.h>
namespace DB
{
/** Не агрегатная функция, а адаптер агрегатных функций,
* Агрегатные функции с суффиксом Merge принимают в качестве аргумента DataTypeAggregateFunction (состояние агрегатной функции),
* и объединяют их при агрегации.
*/
class AggregateFunctionMerge : public IAggregateFunction
{
private:
AggregateFunctionPtr nested_func_owner;
IAggregateFunction * nested_func;
public:
AggregateFunctionMerge(AggregateFunctionPtr nested_) : nested_func_owner(nested_), nested_func(nested_func_owner.get()) {}
String getName() const
{
return nested_func->getName() + "Merge";
}
DataTypePtr getReturnType() const
{
return nested_func->getReturnType();
}
void setArguments(const DataTypes & arguments)
{
// size_t num_agruments = arguments.size();
if (arguments.size() != 1)
throw Exception("Passed " + toString(arguments.size()) + " arguments to unary aggregate function " + this->getName(),
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypeAggregateFunction * data_type = dynamic_cast<const DataTypeAggregateFunction *>(&*arguments[0]);
if (!data_type || data_type->getFunctionName() != nested_func->getName())
throw Exception("Illegal type " + arguments[0]->getName() + " of argument for aggregate function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
void setParameters(const Array & params)
{
nested_func->setParameters(params);
}
void create(AggregateDataPtr place) const
{
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const
{
nested_func->destroy(place);
}
bool hasTrivialDestructor() const
{
return nested_func->hasTrivialDestructor();
}
size_t sizeOfData() const
{
return nested_func->sizeOfData();
}
size_t alignOfData() const
{
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
Field field;
columns[0]->get(row_num, field);
ReadBufferFromString read_buffer(field.safeGet<String>());
nested_func->deserializeMerge(place, read_buffer);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
nested_func->merge(place, rhs);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
nested_func->serialize(place, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
nested_func->deserializeMerge(place, buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
nested_func->insertResultInto(place, to);
}
};
}

View File

@ -0,0 +1,108 @@
#pragma once
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/AggregateFunctions/IAggregateFunction.h>
namespace DB
{
/** Не агрегатная функция, а адаптер агрегатных функций,
* Агрегатные функции с суффиксом State отличаются от соответствующих тем, что их состояния не финализируются.
* Возвращаемый тип - DataTypeAggregateFunction. Функция insertResultInto не используется (реализация будет кидать исключение).
* Aggregator/SplittingAggregator будет проверять, что вычисляется агрегатная функция -State, и не будет вызывать insertResultInto.
*/
class AggregateFunctionState : public IAggregateFunction
{
private:
AggregateFunctionPtr nested_func_owner;
IAggregateFunction * nested_func;
DataTypes arguments;
Array params;
public:
AggregateFunctionState(AggregateFunctionPtr nested_) : nested_func_owner(nested_), nested_func(nested_func_owner.get()) {}
String getName() const
{
return nested_func->getName() + "State";
}
DataTypePtr getReturnType() const
{
return new DataTypeAggregateFunction(nested_func_owner, arguments, params);
}
void setArguments(const DataTypes & arguments_)
{
arguments = arguments_;
nested_func->setArguments(arguments);
}
void setParameters(const Array & params_)
{
params = params_;
nested_func->setParameters(params);
}
void create(AggregateDataPtr place) const
{
nested_func->create(place);
}
void destroy(AggregateDataPtr place) const
{
nested_func->destroy(place);
}
bool hasTrivialDestructor() const
{
return nested_func->hasTrivialDestructor();
}
size_t sizeOfData() const
{
return nested_func->sizeOfData();
}
size_t alignOfData() const
{
return nested_func->alignOfData();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
nested_func->add(place, columns, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
nested_func->merge(place, rhs);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
nested_func->serialize(place, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
nested_func->deserializeMerge(place, buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
throw Exception("Aggregate function " + getName() + " doesn't support insertResultInto method", ErrorCodes::NOT_IMPLEMENTED);
}
/// Для аггрегатных функции типа state никогда не нужно вызывать insertResultInto
bool isFinal() const { return false; }
};
}

View File

@ -84,6 +84,9 @@ public:
/// Вставить результат в столбец.
virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;
/// Можно ли вызывать метод insertResultInto, или всегда нужно запоминать состояние.
virtual bool isFinal() const { return true; }
};

View File

@ -78,17 +78,26 @@ public:
Field operator[](size_t n) const
{
throw Exception("Method operator[] is not supported for ColumnAggregateFunction. You must access underlying vector directly.", ErrorCodes::NOT_IMPLEMENTED);;
String buffer_string;
WriteBufferFromString buffer(buffer_string);
func->serialize(data[n], buffer);
return Field(buffer_string);
}
void get(size_t n, Field & res) const
{
throw Exception("Method get is not supported for ColumnAggregateFunction. You must access underlying vector directly.", ErrorCodes::NOT_IMPLEMENTED);;
String buffer_string;
WriteBufferFromString buffer(buffer_string);
func->serialize(data[n], buffer);
res = buffer_string;
}
StringRef getDataAt(size_t n) const
{
throw Exception("Method getDataAt is not supported for ColumnAggregateFunction. You must access underlying vector directly.", ErrorCodes::NOT_IMPLEMENTED);
String buffer_string;
WriteBufferFromString buffer(buffer_string);
func->serialize(data[n], buffer);
return StringRef(buffer_string);
}
void insertData(const char * pos, size_t length)

View File

@ -26,6 +26,8 @@ public:
{
}
std::string getFunctionName() const { return function->getName(); }
std::string getName() const
{
std::stringstream stream;
@ -50,6 +52,9 @@ public:
return stream.str();
}
DataTypePtr getReturnType() const {return function->getReturnType(); };
DataTypes getArgumentsDataTypes() const { return argument_types; }
DataTypePtr clone() const { return new DataTypeAggregateFunction(function, argument_types, parameters); }
void serializeBinary(const Field & field, WriteBuffer & ostr) const;

View File

@ -12,6 +12,8 @@
#include <DB/AggregateFunctions/AggregateFunctionQuantileTiming.h>
#include <DB/AggregateFunctions/AggregateFunctionIf.h>
#include <DB/AggregateFunctions/AggregateFunctionArray.h>
#include <DB/AggregateFunctions/AggregateFunctionState.h>
#include <DB/AggregateFunctions/AggregateFunctionMerge.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
@ -273,7 +275,29 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
return res;
}
else if (recursion_level == 0 && name.size() >= 3 && name[name.size() - 2] == 'I' && name[name.size() - 1] == 'f')
else if (recursion_level == 0 && name.size() > strlen("State") && !(strcmp(name.data() + name.size() - strlen("State"), "State")))
{
/// Для агрегатных функций вида aggState, где agg - имя другой агрегатной функции.
AggregateFunctionPtr nested = get(String(name.data(), name.size() - strlen("State")), argument_types, recursion_level + 1);
return new AggregateFunctionState(nested);
}
else if (recursion_level == 0 && name.size() > strlen("Merge") && !(strcmp(name.data() + name.size() - strlen("Merge"), "Merge")))
{
/// Для агрегатных функций вида aggMerge, где agg - имя другой агрегатной функции.
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypeAggregateFunction * function = dynamic_cast<const DataTypeAggregateFunction *>(&*argument_types[0]);
if (!function)
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
AggregateFunctionPtr nested = get(String(name.data(), name.size() - strlen("Merge")), function->getArgumentsDataTypes(), recursion_level + 1);
if (nested->getName() != function->getFunctionName())
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return new AggregateFunctionMerge(nested);
}
else if (recursion_level <= 1 && name.size() >= 3 && name[name.size() - 2] == 'I' && name[name.size() - 1] == 'f')
{
/// Для агрегатных функций вида aggIf, где agg - имя другой агрегатной функции.
DataTypes nested_dt = argument_types;
@ -281,7 +305,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
AggregateFunctionPtr nested = get(String(name.data(), name.size() - 2), nested_dt, recursion_level + 1);
return new AggregateFunctionIf(nested);
}
else if (recursion_level <= 1 && name.size() > strlen("Array") && !(strcmp(name.data() + name.size() - strlen("Array"), "Array")))
else if (recursion_level <= 2 && name.size() > strlen("Array") && !(strcmp(name.data() + name.size() - strlen("Array"), "Array")))
{
/// Для агрегатных функций вида aggArray, где agg - имя другой агрегатной функции.
size_t num_agruments = argument_types.size();
@ -294,7 +318,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
else
throw Exception("Illegal type " + argument_types[i]->getName() + " of argument #" + toString(i + 1) + " for aggregate function " + name + ". Must be array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
AggregateFunctionPtr nested = get(String(name.data(), name.size() - strlen("Array")), nested_arguments, recursion_level + 1);
AggregateFunctionPtr nested = get(String(name.data(), name.size() - strlen("Array")), nested_arguments, recursion_level + 2); /// + 2, чтобы ни один другой модификатор не мог идти перед Array
return new AggregateFunctionArray(nested);
}
else
@ -341,12 +365,18 @@ bool AggregateFunctionFactory::isAggregateFunctionName(const String & name, int
if (0 == strcmp(*it, name.data()))
return true;
/// Для агрегатных функций вида aggState, где agg - имя другой агрегатной функции.
if (recursion_level <= 0 && name.size() > strlen("State") && !(strcmp(name.data() + name.size() - strlen("State"), "State")))
return isAggregateFunctionName(String(name.data(), name.size() - strlen("State")), recursion_level + 1);
/// Для агрегатных функций вида aggMerge, где agg - имя другой агрегатной функции.
if (recursion_level <= 0 && name.size() > strlen("Merge") && !(strcmp(name.data() + name.size() - strlen("Merge"), "Merge")))
return isAggregateFunctionName(String(name.data(), name.size() - strlen("Merge")), recursion_level + 1);
/// Для агрегатных функций вида aggIf, где agg - имя другой агрегатной функции.
if (recursion_level == 0 && name.size() >= 3 && name[name.size() - 2] == 'I' && name[name.size() - 1] == 'f')
return isAggregateFunctionName(String(name.data(), name.size() - 2), 1);
if (recursion_level <= 1 && name.size() >= 3 && name[name.size() - 2] == 'I' && name[name.size() - 1] == 'f')
return isAggregateFunctionName(String(name.data(), name.size() - 2), recursion_level + 1);
/// Для агрегатных функций вида aggArray, где agg - имя другой агрегатной функции.
if (recursion_level <= 1 && name.size() > strlen("Array") && !(strcmp(name.data() + name.size() - strlen("Array"), "Array")))
return isAggregateFunctionName(String(name.data(), name.size() - strlen("Array")), 1);
if (recursion_level <= 2 && name.size() > strlen("Array") && !(strcmp(name.data() + name.size() - strlen("Array"), "Array")))
return isAggregateFunctionName(String(name.data(), name.size() - strlen("Array")), recursion_level + 2); /// + 2, чтобы ни один другой модификатор не мог идти перед Array
return false;
}

View File

@ -235,17 +235,21 @@ void Aggregator::convertToBlockImpl(
size_t start_row,
bool final) const
{
if (!final)
{
// if (!final || !aggregate_functions[i]->isFinal()) {
size_t j = start_row;
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it, ++j)
{
method.insertKeyIntoColumns(it, key_columns, keys_size, key_sizes);
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i];
if (!final || !aggregate_functions[i]->isFinal())
(*aggregate_columns[i])[j] = Method::getAggregateData(it->second) + offsets_of_aggregate_states[i];
else
aggregate_functions[i]->insertResultInto(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
}
}
/* }
else
{
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
@ -257,7 +261,7 @@ void Aggregator::convertToBlockImpl(
Method::getAggregateData(it->second) + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
}
}
}*/
}
@ -341,16 +345,17 @@ void Aggregator::destroyImpl(
for (typename Method::const_iterator it = method.data.begin(); it != method.data.end(); ++it)
{
for (size_t i = 0; i < aggregates_size; ++i)
{
char * data = Method::getAggregateData(it->second);
if (aggregate_functions[i]->isFinal())
{
char * data = Method::getAggregateData(it->second);
/** Если исключение (обычно нехватка памяти, кидается MemoryTracker-ом) возникло
* после вставки ключа в хэш-таблицу, но до создания всех состояний агрегатных функций,
* то data будет равен nullptr-у.
*/
if (nullptr != data)
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
}
/** Если исключение (обычно нехватка памяти, кидается MemoryTracker-ом) возникло
* после вставки ключа в хэш-таблицу, но до создания всех состояний агрегатных функций,
* то data будет равен nullptr-у.
*/
if (nullptr != data)
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
}
}
}
@ -538,7 +543,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
for (size_t i = 0; i < aggregates_size; ++i)
{
if (!final)
if (!final || !aggregate_functions[i]->isFinal())
{
/// Столбец ColumnAggregateFunction захватывает разделяемое владение ареной с состояниями агрегатных функций.
ColumnAggregateFunction & column_aggregate_func = static_cast<ColumnAggregateFunction &>(*res.getByPosition(i + keys_size).column);
@ -564,11 +569,10 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants, bool fi
{
AggregatedDataWithoutKey & data = data_variants.without_key;
if (!final)
for (size_t i = 0; i < aggregates_size; ++i)
for (size_t i = 0; i < aggregates_size; ++i)
if (!final || !aggregate_functions[i]->isFinal())
(*aggregate_columns[i])[0] = data + offsets_of_aggregate_states[i];
else
for (size_t i = 0; i < aggregates_size; ++i)
else
aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
if (overflow_row)
@ -783,7 +787,8 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
AggregatedDataWithoutKey & res_data = result.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
if (aggregate_functions[i]->isFinal())
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
}
if (result.type == AggregatedDataVariants::KEY_64)

View File

@ -11,6 +11,7 @@
#include <DB/Parsers/formatAST.h>
#include <DB/Storages/StorageMerge.h>
#include <DB/Storages/StorageMergeTree.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <algorithm>
#include <boost/bind.hpp>
@ -116,6 +117,7 @@ void addColumnToAST1(ASTs & columns, const ASTPtr & add_column_ptr, const ASTPtr
insert_it = std::find_if(columns.begin(), columns.end(), find_functor);
if (insert_it == columns.end())
throw Exception("Wrong column name. Cannot find column " + col_after->name + " to insert after");
++insert_it;
}
columns.insert(insert_it, add_column_ptr);
}
@ -129,18 +131,19 @@ void InterpreterAlterQuery::addColumnToAST(StoragePtr table, ASTs & columns, con
size_t dot_pos = add_column.name.find('.');
bool insert_nested_column = dot_pos != std::string::npos;
const DataTypeFactory & data_type_factory = context.getDataTypeFactory();
StringRange type_range = add_column.type->range;
String type(type_range.first, type_range.second - type_range.first);
DataTypePtr datatype = data_type_factory.get(type);
if (insert_nested_column)
{
const DataTypeFactory & data_type_factory = context.getDataTypeFactory();
StringRange type_range = add_column.type->range;
String type(type_range.first, type_range.second - type_range.first);
if (!dynamic_cast<DataTypeArray *>(data_type_factory.get(type).get()))
if (!dynamic_cast<DataTypeArray *>(datatype.get()))
{
throw Exception("Cannot add column " + add_column.name + ". Because it is not an array. Only arrays could be nested and consist '.' in their names");
}
}
if (dynamic_cast<StorageMergeTree *>(table.get()) && insert_nested_column)
if ((dynamic_cast<StorageMergeTree *>(table.get()) || dynamic_cast<StorageReplicatedMergeTree *>(table.get())) && insert_nested_column)
{
/// специальный случай для вставки nested столбцов в MergeTree
/// в MergeTree таблицах есть ASTFunction "Nested" в аргументах которой записаны столбцы
@ -156,8 +159,9 @@ void InterpreterAlterQuery::addColumnToAST(StoragePtr table, ASTs & columns, con
ASTs & nested_columns = dynamic_cast<ASTExpressionList &>(*nested_func->arguments).children;
ASTPtr new_nested_column = add_column_ptr->clone();
dynamic_cast<ASTNameTypePair &>(*new_nested_column).name = add_column.name.substr(dot_pos + 1);
ASTPtr new_nested_column_ptr = add_column_ptr->clone();
ASTNameTypePair& new_nested_column = dynamic_cast<ASTNameTypePair &>(*new_nested_column_ptr);
new_nested_column.name = add_column.name.substr(dot_pos + 1);
ASTPtr new_after_column = after_column_ptr ? after_column_ptr->clone() : nullptr;
if (new_after_column)
@ -170,7 +174,18 @@ void InterpreterAlterQuery::addColumnToAST(StoragePtr table, ASTs & columns, con
dynamic_cast<ASTIdentifier &>(*new_after_column).name = after_col->name.substr(after_dot_pos + 1);
}
addColumnToAST1(nested_columns, new_nested_column, new_after_column);
{
/// удаляем массив из типа, т.е. Array(String) -> String
ParserIdentifierWithOptionalParameters type_parser;
const char * expected;
const char * begin = new_nested_column.type->range.first + strlen("Array(");
const char * end = new_nested_column.type->range.second - static_cast<int>(strlen(")"));
if (!type_parser.parse(begin, end, new_nested_column.type, expected))
throw Exception("Fail to convert type like Array(SomeType) -> SomeType for type " + type);
}
addColumnToAST1(nested_columns, new_nested_column_ptr, new_after_column);
}
else
{