This commit is contained in:
Evgeniy Gatov 2014-06-04 01:07:27 +04:00
commit ad7449d850
5 changed files with 11 additions and 76 deletions

View File

@ -1,11 +1,7 @@
#pragma once #pragma once
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h> #include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/AggregateFunctions/IAggregateFunction.h> #include <DB/AggregateFunctions/IAggregateFunction.h>
#include <DB/IO/ReadBufferFromString.h>
namespace DB namespace DB
@ -13,8 +9,9 @@ namespace DB
/** Не агрегатная функция, а адаптер агрегатных функций, /** Не агрегатная функция, а адаптер агрегатных функций,
* Агрегатные функции с суффиксом Merge принимают в качестве аргумента DataTypeAggregateFunction (состояние агрегатной функции), * Агрегатные функции с суффиксом Merge принимают в качестве аргумента DataTypeAggregateFunction
* и объединяют их при агрегации. * (состояние агрегатной функции, полученное ранее с помощью применения агрегатной функции с суффиксом State)
* и объединяют их при агрегации.
*/ */
class AggregateFunctionMerge : public IAggregateFunction class AggregateFunctionMerge : public IAggregateFunction
@ -38,12 +35,12 @@ public:
void setArguments(const DataTypes & arguments) void setArguments(const DataTypes & arguments)
{ {
// size_t num_agruments = arguments.size();
if (arguments.size() != 1) if (arguments.size() != 1)
throw Exception("Passed " + toString(arguments.size()) + " arguments to unary aggregate function " + this->getName(), throw Exception("Passed " + toString(arguments.size()) + " arguments to unary aggregate function " + this->getName(),
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypeAggregateFunction * data_type = dynamic_cast<const DataTypeAggregateFunction *>(&*arguments[0]); const DataTypeAggregateFunction * data_type = dynamic_cast<const DataTypeAggregateFunction *>(&*arguments[0]);
if (!data_type || data_type->getFunctionName() != nested_func->getName()) if (!data_type || data_type->getFunctionName() != nested_func->getName())
throw Exception("Illegal type " + arguments[0]->getName() + " of argument for aggregate function " + getName(), throw Exception("Illegal type " + arguments[0]->getName() + " of argument for aggregate function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
@ -81,7 +78,7 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{ {
nested_func->merge(place, columns[0]->getDataAt(row_num).data); nested_func->merge(place, static_cast<const ColumnAggregateFunction &>(*columns[0]).getData()[row_num]);
} }
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const

View File

@ -1,8 +1,5 @@
#pragma once #pragma once
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h> #include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/AggregateFunctions/IAggregateFunction.h> #include <DB/AggregateFunctions/IAggregateFunction.h>
#include <DB/Columns/ColumnAggregateFunction.h> #include <DB/Columns/ColumnAggregateFunction.h>
@ -14,8 +11,7 @@ namespace DB
/** Не агрегатная функция, а адаптер агрегатных функций, /** Не агрегатная функция, а адаптер агрегатных функций,
* Агрегатные функции с суффиксом State отличаются от соответствующих тем, что их состояния не финализируются. * Агрегатные функции с суффиксом State отличаются от соответствующих тем, что их состояния не финализируются.
* Возвращаемый тип - DataTypeAggregateFunction. Функция insertResultInto не используется (реализация будет кидать исключение). * Возвращаемый тип - DataTypeAggregateFunction.
* Aggregator/SplittingAggregator будет проверять, что вычисляется агрегатная функция -State, и не будет вызывать insertResultInto.
*/ */
class AggregateFunctionState : public IAggregateFunction class AggregateFunctionState : public IAggregateFunction

View File

@ -62,18 +62,6 @@ public:
this->data(place).sum += tmp; this->data(place).sum += tmp;
} }
void serializeText(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
writeText(this->data(place).sum, buf);
}
void deserializeMergeText(AggregateDataPtr place, ReadBuffer & buf) const
{
typename NearestFieldType<T>::Type tmp;
readText(tmp, buf);
this->data(place).sum += tmp;
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{ {
static_cast<ColumnVector<typename NearestFieldType<T>::Type> &>(to).getData().push_back(this->data(place).sum); static_cast<ColumnVector<typename NearestFieldType<T>::Type> &>(to).getData().push_back(this->data(place).sum);

View File

@ -116,42 +116,6 @@ public:
} }
} }
void serializeText(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
const Data & d = this->data(place);
if (unlikely(d.value.isNull()))
{
writeText(false, buf);
}
else
{
writeText(true, buf);
type->serializeText(this->data(place).value, buf);
}
}
void deserializeMergeText(AggregateDataPtr place, ReadBuffer & buf) const
{
Data & d = this->data(place);
bool is_not_null = false;
readText(is_not_null, buf);
if (is_not_null)
{
if (!d.value.isNull())
{
Field value_;
type->deserializeText(value_, buf);
if (Traits::better(value_, d.value))
d.value = value_;
}
else
type->deserializeText(d.value, buf);
}
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{ {
if (unlikely(this->data(place).value.isNull())) if (unlikely(this->data(place).value.isNull()))

View File

@ -76,28 +76,18 @@ public:
/// Объединить состояние с другим состоянием. /// Объединить состояние с другим состоянием.
virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const = 0; virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const = 0;
/// Сериализовать состояние (например, для передачи по сети). Нельзя сериализовывать "пустое" состояние. /// Сериализовать состояние (например, для передачи по сети).
virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0; virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0;
/// Десериализовать состояние и объединить своё состояние с ним. /// Десериализовать состояние и объединить своё состояние с ним.
virtual void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const = 0; virtual void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const = 0;
/// Сериализовать состояние в текстовом виде (а не в бинарном, как в функции serialize). Нельзя сериализовывать "пустое" состояние.
virtual void serializeText(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
throw Exception("Method serializeText is not supported for " + getName() + ".", ErrorCodes::NOT_IMPLEMENTED);
}
/// Десериализовать текстовое состояние и объединить своё состояние с ним.
virtual void deserializeMergeText(AggregateDataPtr place, ReadBuffer & buf) const
{
throw Exception("Method deserializeMergeText is not supported for " + getName() + ".", ErrorCodes::NOT_IMPLEMENTED);
}
/// Вставить результат в столбец. /// Вставить результат в столбец.
virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0; virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;
/// Аггрегатная функция или состояние аггрегатной функции. /** Возвращает true для агрегатных функций типа -State.
* Они выполняются как другие агрегатные функции, но не финализируются (возвращают состояние агрегации, которое может быть объединено с другим).
*/
virtual bool isState() const { return false; } virtual bool isState() const { return false; }
}; };