dbms: improved performance [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2013-06-25 14:16:16 +00:00
parent 878856a62b
commit 0931ac57ae
6 changed files with 227 additions and 227 deletions

View File

@ -12,18 +12,19 @@ namespace DB
{
template <typename T>
struct AggregateFunctionAvgData
{
Float64 sum;
T sum;
UInt64 count;
AggregateFunctionAvgData() : sum(0), count(0) {}
};
/// Считает арифметическое среднее значение чисел. Параметром шаблона может быть UInt64, Int64 или Float64.
/// Считает арифметическое среднее значение чисел.
template <typename T>
class AggregateFunctionAvg : public IUnaryAggregateFunction<AggregateFunctionAvgData>
class AggregateFunctionAvg : public IUnaryAggregateFunction<AggregateFunctionAvgData<typename NearestFieldType<T>::Type> >
{
public:
String getName() const { return "avg"; }
@ -44,35 +45,35 @@ public:
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
data(place).sum += get<const T &>(column[row_num]);
++data(place).count;
this->data(place).sum += static_cast<const ColumnVector<T> &>(column).getData()[row_num];
++this->data(place).count;
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
data(place).sum += data(rhs).sum;
data(place).count += data(rhs).count;
this->data(place).sum += this->data(rhs).sum;
this->data(place).count += this->data(rhs).count;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
writeFloatBinary(data(place).sum, buf);
writeVarUInt(data(place).count, buf);
writeBinary(this->data(place).sum, buf);
writeVarUInt(this->data(place).count, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
Float64 tmp_sum = 0;
typename NearestFieldType<T>::Type tmp_sum = 0;
UInt64 tmp_count = 0;
readFloatBinary(tmp_sum, buf);
readBinary(tmp_sum, buf);
readVarUInt(tmp_count, buf);
data(place).sum += tmp_sum;
data(place).count += tmp_count;
this->data(place).sum += tmp_sum;
this->data(place).count += tmp_count;
}
Field getResult(ConstAggregateDataPtr place) const
{
return data(place).sum / data(place).count;
return static_cast<Float64>(this->data(place).sum) / this->data(place).count;
}
};
@ -82,7 +83,7 @@ public:
* avgIf(x, cond) эквивалентно sum(cond ? x : 0) / sum(cond).
*/
template <typename T>
class AggregateFunctionAvgIf : public IAggregateFunctionHelper<AggregateFunctionAvgData>
class AggregateFunctionAvgIf : public IAggregateFunctionHelper<AggregateFunctionAvgData<typename NearestFieldType<T>::Type> >
{
public:
String getName() const { return "avgIf"; }
@ -106,38 +107,38 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
if (columns[1]->getDataAt(row_num).data[0])
if (static_cast<const ColumnUInt8 &>(*columns[1]).getData()[row_num])
{
data(place).sum += get<const T &>((*columns[0])[row_num]);
++data(place).count;
this->data(place).sum += static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
++this->data(place).count;
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
data(place).sum += data(rhs).sum;
data(place).count += data(rhs).count;
this->data(place).sum += this->data(rhs).sum;
this->data(place).count += this->data(rhs).count;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
writeFloatBinary(data(place).sum, buf);
writeVarUInt(data(place).count, buf);
writeBinary(this->data(place).sum, buf);
writeVarUInt(this->data(place).count, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
Float64 tmp_sum = 0;
typename NearestFieldType<T>::Type tmp_sum = 0;
UInt64 tmp_count = 0;
readFloatBinary(tmp_sum, buf);
readBinary(tmp_sum, buf);
readVarUInt(tmp_count, buf);
data(place).sum += tmp_sum;
data(place).count += tmp_count;
this->data(place).sum += tmp_sum;
this->data(place).count += tmp_count;
}
Field getResult(ConstAggregateDataPtr place) const
{
return data(place).sum / data(place).count;
return static_cast<Float64>(this->data(place).sum) / this->data(place).count;
}
};

View File

@ -66,9 +66,7 @@ public:
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
this->data(place).sample.insert(
static_cast<typename NearestFieldType<ArgumentFieldType>::Type>(
*reinterpret_cast<const ArgumentFieldType *>(column.getDataAt(row_num).data)));
this->data(place).sample.insert(static_cast<const ColumnVector<ArgumentFieldType> &>(column).getData()[row_num]);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const

View File

@ -11,30 +11,6 @@
namespace DB
{
template <typename T> struct AggregateFunctionSumTraits;
template <> struct AggregateFunctionSumTraits<UInt64>
{
static DataTypePtr getReturnType() { return new DataTypeUInt64; }
static void write(UInt64 x, WriteBuffer & buf) { writeVarUInt(x, buf); }
static void read(UInt64 & x, ReadBuffer & buf) { readVarUInt(x, buf); }
};
template <> struct AggregateFunctionSumTraits<Int64>
{
static DataTypePtr getReturnType() { return new DataTypeInt64; }
static void write(Int64 x, WriteBuffer & buf) { writeVarInt(x, buf); }
static void read(Int64 & x, ReadBuffer & buf) { readVarInt(x, buf); }
};
template <> struct AggregateFunctionSumTraits<Float64>
{
static DataTypePtr getReturnType() { return new DataTypeFloat64; }
static void write(Float64 x, WriteBuffer & buf) { writeFloatBinary(x, buf); }
static void read(Float64 & x, ReadBuffer & buf) { readFloatBinary(x, buf); }
};
template <typename T>
struct AggregateFunctionSumData
{
@ -44,9 +20,9 @@ struct AggregateFunctionSumData
};
/// Считает сумму чисел. Параметром шаблона может быть UInt64, Int64 или Float64.
/// Считает сумму чисел.
template <typename T>
class AggregateFunctionSum : public IUnaryAggregateFunction<AggregateFunctionSumData<T> >
class AggregateFunctionSum : public IUnaryAggregateFunction<AggregateFunctionSumData<typename NearestFieldType<T>::Type> >
{
public:
String getName() const { return "sum"; }
@ -54,7 +30,7 @@ public:
DataTypePtr getReturnType() const
{
return AggregateFunctionSumTraits<T>::getReturnType();
return new typename DataTypeFromFieldType<typename NearestFieldType<T>::Type>::Type;
}
void setArgument(const DataTypePtr & argument)
@ -67,7 +43,7 @@ public:
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
this->data(place).sum += get<const T &>(column[row_num]);
this->data(place).sum += static_cast<const ColumnVector<T> &>(column).getData()[row_num];
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
@ -77,13 +53,13 @@ public:
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
AggregateFunctionSumTraits<T>::write(this->data(place).sum, buf);
writeBinary(this->data(place).sum, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
T tmp;
AggregateFunctionSumTraits<T>::read(tmp, buf);
typename NearestFieldType<T>::Type tmp;
readBinary(tmp, buf);
this->data(place).sum += tmp;
}
@ -96,7 +72,7 @@ public:
/// Считает сумму чисел при выполнении условия. sumIf(x, cond) эквивалентно sum(cond ? x : 0).
template <typename T>
class AggregateFunctionSumIf : public IAggregateFunctionHelper<AggregateFunctionSumData<T> >
class AggregateFunctionSumIf : public IAggregateFunctionHelper<AggregateFunctionSumData<typename NearestFieldType<T>::Type> >
{
public:
String getName() const { return "sumIf"; }
@ -104,7 +80,7 @@ public:
DataTypePtr getReturnType() const
{
return AggregateFunctionSumTraits<T>::getReturnType();
return new typename DataTypeFromFieldType<typename NearestFieldType<T>::Type>::Type;
}
void setArguments(const DataTypes & arguments)
@ -120,8 +96,8 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
if (columns[1]->getDataAt(row_num).data[0])
this->data(place).sum += get<const T &>((*columns[0])[row_num]);
if (static_cast<const ColumnUInt8 &>(*columns[1]).getData()[row_num])
this->data(place).sum += static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
@ -131,13 +107,13 @@ public:
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
AggregateFunctionSumTraits<T>::write(this->data(place).sum, buf);
writeBinary(this->data(place).sum, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
T tmp;
AggregateFunctionSumTraits<T>::read(tmp, buf);
typename NearestFieldType<T>::Type tmp;
readBinary(tmp, buf);
this->data(place).sum += tmp;
}

View File

@ -18,16 +18,19 @@ namespace DB
{
template <typename T> struct AggregateFunctionUniqTraits;
template <> struct AggregateFunctionUniqTraits<UInt64>
template <typename T> struct AggregateFunctionUniqTraits
{
static UInt64 hash(UInt64 x) { return x; }
static UInt64 hash(T x) { return x; }
};
template <> struct AggregateFunctionUniqTraits<Int64>
template <> struct AggregateFunctionUniqTraits<Float32>
{
static UInt64 hash(Int64 x) { return x; }
static UInt64 hash(Float32 x)
{
UInt64 res = 0;
memcpy(reinterpret_cast<char *>(&res), reinterpret_cast<char *>(&x), sizeof(x));
return res;
}
};
template <> struct AggregateFunctionUniqTraits<Float64>
@ -40,12 +43,6 @@ template <> struct AggregateFunctionUniqTraits<Float64>
}
};
template <> struct AggregateFunctionUniqTraits<String>
{
/// Имейте ввиду, что вычисление приближённое.
static UInt64 hash(const String & x) { return CityHash64(x.data(), x.size()); }
};
struct AggregateFunctionUniqData
{
@ -72,10 +69,9 @@ public:
{
}
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
data(place).set.insert(AggregateFunctionUniqTraits<T>::hash(get<const T &>(column[row_num])));
data(place).set.insert(AggregateFunctionUniqTraits<T>::hash(static_cast<const ColumnVector<T> &>(column).getData()[row_num]));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
@ -101,6 +97,14 @@ public:
}
};
template <>
inline void AggregateFunctionUniq<String>::addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
/// Имейте ввиду, что вычисление приближённое.
StringRef value = column.getDataAt(row_num);
data(place).set.insert(CityHash64(value.data, value.size));
}
/** То же самое, но выводит состояние вычислений в строке в текстовом виде.
* Используется, если какой-то внешней программе (сейчас это )
@ -154,8 +158,8 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
if (columns[1]->getDataAt(row_num).data[0])
data(place).set.insert(AggregateFunctionUniqTraits<T>::hash(get<const T &>((*columns[0])[row_num])));
if (static_cast<const ColumnUInt8 &>(*columns[1]).getData()[row_num])
data(place).set.insert(AggregateFunctionUniqTraits<T>::hash(static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num]));
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
@ -181,4 +185,15 @@ public:
}
};
template <>
inline void AggregateFunctionUniqIf<String>::add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
if (static_cast<const ColumnUInt8 &>(*columns[1]).getData()[row_num])
{
/// Имейте ввиду, что вычисление приближённое.
StringRef value = columns[0]->getDataAt(row_num);
data(place).set.insert(CityHash64(value.data, value.size));
}
}
}

View File

@ -15,6 +15,8 @@
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeFixedString.h>
namespace DB
@ -26,6 +28,45 @@ AggregateFunctionFactory::AggregateFunctionFactory()
}
/** Создать агрегатную функцию с числовым типом в параметре шаблона, в зависимости от типа аргумента.
*/
template<template <typename> class AggregateFunctionTemplate>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type)
{
if (dynamic_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<UInt8>;
else if (dynamic_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<UInt16>;
else if (dynamic_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<UInt32>;
else if (dynamic_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<UInt64>;
else if (dynamic_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Int8>;
else if (dynamic_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Int16>;
else if (dynamic_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Int32>;
else if (dynamic_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Int64>;
else if (dynamic_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Float32>;
else if (dynamic_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Float64>;
else
return NULL;
}
/** Создать агрегатную функцию с числовым типом в параметре шаблона, в зависимости от имени типа, расположенном в type_id.
*/
template<template <typename> class AggregateFunctionTemplate>
static IAggregateFunction * createWithNumericType(const String & type_id, size_t prefix_length)
{
if (0 == type_id.compare(prefix_length, strlen("UInt8"), "UInt8")) return new AggregateFunctionTemplate<UInt8>;
else if (0 == type_id.compare(prefix_length, strlen("UInt16"), "UInt16")) return new AggregateFunctionTemplate<UInt16>;
else if (0 == type_id.compare(prefix_length, strlen("UInt32"), "UInt32")) return new AggregateFunctionTemplate<UInt32>;
else if (0 == type_id.compare(prefix_length, strlen("UInt64"), "UInt64")) return new AggregateFunctionTemplate<UInt64>;
else if (0 == type_id.compare(prefix_length, strlen("Int8"), "Int8")) return new AggregateFunctionTemplate<Int8>;
else if (0 == type_id.compare(prefix_length, strlen("Int16"), "Int16")) return new AggregateFunctionTemplate<Int16>;
else if (0 == type_id.compare(prefix_length, strlen("Int32"), "Int32")) return new AggregateFunctionTemplate<Int32>;
else if (0 == type_id.compare(prefix_length, strlen("Int64"), "Int64")) return new AggregateFunctionTemplate<Int64>;
else if (0 == type_id.compare(prefix_length, strlen("Float32"), "Float32")) return new AggregateFunctionTemplate<Float32>;
else if (0 == type_id.compare(prefix_length, strlen("Float64"), "Float64")) return new AggregateFunctionTemplate<Float64>;
else
return NULL;
}
AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const DataTypes & argument_types) const
{
if (name == "count")
@ -45,157 +86,130 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String argument_type_name = argument_types[0]->getName();
if (argument_type_name == "UInt8" || argument_type_name == "UInt16"
|| argument_type_name == "UInt32" || argument_type_name == "UInt64")
return new AggregateFunctionSum<UInt64>;
else if (argument_type_name == "Int8" || argument_type_name == "Int16"
|| argument_type_name == "Int32" || argument_type_name == "Int64")
return new AggregateFunctionSum<Int64>;
else if (argument_type_name == "Float32" || argument_type_name == "Float64")
return new AggregateFunctionSum<Float64>;
else
throw Exception("Illegal type " + argument_type_name + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionSum>(*argument_types[0]);
if (!res)
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
else if (name == "sumIf")
{
if (argument_types.size() != 2)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String argument_type_name = argument_types[0]->getName();
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionSumIf>(*argument_types[0]);
if (argument_type_name == "UInt8" || argument_type_name == "UInt16"
|| argument_type_name == "UInt32" || argument_type_name == "UInt64")
return new AggregateFunctionSumIf<UInt64>;
else if (argument_type_name == "Int8" || argument_type_name == "Int16"
|| argument_type_name == "Int32" || argument_type_name == "Int64")
return new AggregateFunctionSumIf<Int64>;
else if (argument_type_name == "Float32" || argument_type_name == "Float64")
return new AggregateFunctionSumIf<Float64>;
else
throw Exception("Illegal type " + argument_type_name + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!res)
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
else if (name == "avg")
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String argument_type_name = argument_types[0]->getName();
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionAvg>(*argument_types[0]);
if (argument_type_name == "UInt8" || argument_type_name == "UInt16"
|| argument_type_name == "UInt32" || argument_type_name == "UInt64")
return new AggregateFunctionAvg<UInt64>;
else if (argument_type_name == "Int8" || argument_type_name == "Int16"
|| argument_type_name == "Int32" || argument_type_name == "Int64")
return new AggregateFunctionAvg<Int64>;
else if (argument_type_name == "Float32" || argument_type_name == "Float64")
return new AggregateFunctionAvg<Float64>;
else
throw Exception("Illegal type " + argument_type_name + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!res)
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
else if (name == "avgIf")
{
if (argument_types.size() != 2)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String argument_type_name = argument_types[0]->getName();
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionAvgIf>(*argument_types[0]);
if (argument_type_name == "UInt8" || argument_type_name == "UInt16"
|| argument_type_name == "UInt32" || argument_type_name == "UInt64")
return new AggregateFunctionAvgIf<UInt64>;
else if (argument_type_name == "Int8" || argument_type_name == "Int16"
|| argument_type_name == "Int32" || argument_type_name == "Int64")
return new AggregateFunctionAvgIf<Int64>;
else if (argument_type_name == "Float32" || argument_type_name == "Float64")
return new AggregateFunctionAvgIf<Float64>;
else
throw Exception("Illegal type " + argument_type_name + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!res)
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
else if (name == "uniq")
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String argument_type_name = argument_types[0]->getName();
const IDataType & argument_type = *argument_types[0];
if (argument_type_name == "UInt8" || argument_type_name == "UInt16"
|| argument_type_name == "UInt32" || argument_type_name == "UInt64"
|| argument_type_name == "Date" || argument_type_name == "DateTime")
return new AggregateFunctionUniq<UInt64>;
else if (argument_type_name == "Int8" || argument_type_name == "Int16"
|| argument_type_name == "Int32" || argument_type_name == "Int64")
return new AggregateFunctionUniq<Int64>;
else if (argument_type_name == "Float32" || argument_type_name == "Float64")
return new AggregateFunctionUniq<Float64>;
else if (argument_type_name == "String" || 0 == argument_type_name.compare(0, strlen("FixedString"), "FixedString"))
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniq>(*argument_types[0]);
if (res)
return res;
else if (dynamic_cast<const DataTypeDate *>(&argument_type))
return new AggregateFunctionUniq<DataTypeDate::FieldType>;
else if (dynamic_cast<const DataTypeDateTime*>(&argument_type))
return new AggregateFunctionUniq<DataTypeDateTime::FieldType>;
else if (dynamic_cast<const DataTypeString*>(&argument_type) || dynamic_cast<const DataTypeFixedString*>(&argument_type))
return new AggregateFunctionUniq<String>;
else
throw Exception("Illegal type " + argument_type_name + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (name == "uniqIf")
{
if (argument_types.size() != 2)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String argument_type_name = argument_types[0]->getName();
const IDataType & argument_type = *argument_types[0];
if (argument_type_name == "UInt8" || argument_type_name == "UInt16"
|| argument_type_name == "UInt32" || argument_type_name == "UInt64"
|| argument_type_name == "Date" || argument_type_name == "DateTime")
return new AggregateFunctionUniqIf<UInt64>;
else if (argument_type_name == "Int8" || argument_type_name == "Int16"
|| argument_type_name == "Int32" || argument_type_name == "Int64")
return new AggregateFunctionUniqIf<Int64>;
else if (argument_type_name == "Float32" || argument_type_name == "Float64")
return new AggregateFunctionUniqIf<Float64>;
else if (argument_type_name == "String" || 0 == argument_type_name.compare(0, strlen("FixedString"), "FixedString"))
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniqIf>(*argument_types[0]);
if (res)
return res;
else if (dynamic_cast<const DataTypeDate *>(&argument_type))
return new AggregateFunctionUniqIf<DataTypeDate::FieldType>;
else if (dynamic_cast<const DataTypeDateTime*>(&argument_type))
return new AggregateFunctionUniqIf<DataTypeDateTime::FieldType>;
else if (dynamic_cast<const DataTypeString*>(&argument_type) || dynamic_cast<const DataTypeFixedString*>(&argument_type))
return new AggregateFunctionUniqIf<String>;
else
throw Exception("Illegal type " + argument_type_name + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (name == "uniqState")
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String argument_type_name = argument_types[0]->getName();
const IDataType & argument_type = *argument_types[0];
if (argument_type_name == "UInt8" || argument_type_name == "UInt16"
|| argument_type_name == "UInt32" || argument_type_name == "UInt64"
|| argument_type_name == "Date" || argument_type_name == "DateTime")
return new AggregateFunctionUniqState<UInt64>;
else if (argument_type_name == "Int8" || argument_type_name == "Int16"
|| argument_type_name == "Int32" || argument_type_name == "Int64")
return new AggregateFunctionUniqState<Int64>;
else if (argument_type_name == "Float32" || argument_type_name == "Float64")
return new AggregateFunctionUniqState<Float64>;
else if (argument_type_name == "String" || 0 == argument_type_name.compare(0, strlen("FixedString"), "FixedString"))
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniqState>(*argument_types[0]);
if (res)
return res;
else if (dynamic_cast<const DataTypeDate *>(&argument_type))
return new AggregateFunctionUniqState<DataTypeDate::FieldType>;
else if (dynamic_cast<const DataTypeDateTime*>(&argument_type))
return new AggregateFunctionUniqState<DataTypeDateTime::FieldType>;
else if (dynamic_cast<const DataTypeString*>(&argument_type) || dynamic_cast<const DataTypeFixedString*>(&argument_type))
return new AggregateFunctionUniqState<String>;
else
throw Exception("Illegal type " + argument_type_name + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else if (name == "median" || name == "quantile")
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String argument_type_name = argument_types[0]->getName();
if (argument_type_name == "UInt8") return new AggregateFunctionQuantile<UInt8>;
else if (argument_type_name == "UInt16") return new AggregateFunctionQuantile<UInt16>;
else if (argument_type_name == "UInt32") return new AggregateFunctionQuantile<UInt32>;
else if (argument_type_name == "UInt64") return new AggregateFunctionQuantile<UInt64>;
else if (argument_type_name == "Int8") return new AggregateFunctionQuantile<Int8>;
else if (argument_type_name == "Int16") return new AggregateFunctionQuantile<Int16>;
else if (argument_type_name == "Int32") return new AggregateFunctionQuantile<Int32>;
else if (argument_type_name == "Int64") return new AggregateFunctionQuantile<Int64>;
else if (argument_type_name == "Float32") return new AggregateFunctionQuantile<Float32>;
else if (argument_type_name == "Float64") return new AggregateFunctionQuantile<Float64>;
else if (argument_type_name == "Date") return new AggregateFunctionQuantile<DataTypeDate::FieldType, false>;
else if (argument_type_name == "DateTime") return new AggregateFunctionQuantile<DataTypeDateTime::FieldType, false>;
const IDataType & argument_type = *argument_types[0];
if (dynamic_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionQuantile<UInt8>;
else if (dynamic_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionQuantile<UInt16>;
else if (dynamic_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionQuantile<UInt32>;
else if (dynamic_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionQuantile<UInt64>;
else if (dynamic_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionQuantile<Int8>;
else if (dynamic_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionQuantile<Int16>;
else if (dynamic_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionQuantile<Int32>;
else if (dynamic_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionQuantile<Int64>;
else if (dynamic_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionQuantile<Float32>;
else if (dynamic_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionQuantile<Float64>;
else if (dynamic_cast<const DataTypeDate *>(&argument_type)) return new AggregateFunctionQuantile<DataTypeDate::FieldType, false>;
else if (dynamic_cast<const DataTypeDateTime*>(&argument_type)) return new AggregateFunctionQuantile<DataTypeDateTime::FieldType, false>;
else
throw Exception("Illegal type " + argument_type_name + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
else
throw Exception("Unknown aggregate function " + name, ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
@ -218,56 +232,46 @@ AggregateFunctionPtr AggregateFunctionFactory::getByTypeID(const String & type_i
return new AggregateFunctionGroupArray;
else if (0 == type_id.compare(0, strlen("sum_"), "sum_"))
{
if (0 == type_id.compare(strlen("sum_"), strlen("UInt64"), "UInt64"))
return new AggregateFunctionSum<UInt64>;
else if (0 == type_id.compare(strlen("sum_"), strlen("Int64"), "Int64"))
return new AggregateFunctionSum<Int64>;
else if (0 == type_id.compare(strlen("sum_"), strlen("Float64"), "Float64"))
return new AggregateFunctionSum<Float64>;
else
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionSum>(type_id, strlen("sum_"));
if (!res)
throw Exception("Unknown type id of aggregate function " + type_id, ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
return res;
}
else if (0 == type_id.compare(0, strlen("sumIf_"), "sumIf_"))
{
if (0 == type_id.compare(strlen("sumIf_"), strlen("UInt64"), "UInt64"))
return new AggregateFunctionSumIf<UInt64>;
else if (0 == type_id.compare(strlen("sumIf_"), strlen("Int64"), "Int64"))
return new AggregateFunctionSumIf<Int64>;
else if (0 == type_id.compare(strlen("sumIf_"), strlen("Float64"), "Float64"))
return new AggregateFunctionSumIf<Float64>;
else
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionSumIf>(type_id, strlen("sumIf_"));
if (!res)
throw Exception("Unknown type id of aggregate function " + type_id, ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
return res;
}
else if (0 == type_id.compare(0, strlen("avg_"), "avg_"))
{
if (0 == type_id.compare(strlen("avg_"), strlen("UInt64"), "UInt64"))
return new AggregateFunctionAvg<UInt64>;
else if (0 == type_id.compare(strlen("avg_"), strlen("Int64"), "Int64"))
return new AggregateFunctionAvg<Int64>;
else if (0 == type_id.compare(strlen("avg_"), strlen("Float64"), "Float64"))
return new AggregateFunctionAvg<Float64>;
else
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionAvg>(type_id, strlen("avg_"));
if (!res)
throw Exception("Unknown type id of aggregate function " + type_id, ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
return res;
}
else if (0 == type_id.compare(0, strlen("avgIf_"), "avgIf_"))
{
if (0 == type_id.compare(strlen("avgIf_"), strlen("UInt64"), "UInt64"))
return new AggregateFunctionAvgIf<UInt64>;
else if (0 == type_id.compare(strlen("avgIf_"), strlen("Int64"), "Int64"))
return new AggregateFunctionAvgIf<Int64>;
else if (0 == type_id.compare(strlen("avgIf_"), strlen("Float64"), "Float64"))
return new AggregateFunctionAvgIf<Float64>;
else
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionAvgIf>(type_id, strlen("avgIf_"));
if (!res)
throw Exception("Unknown type id of aggregate function " + type_id, ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
return res;
}
else if (0 == type_id.compare(0, strlen("uniq_"), "uniq_"))
{
if (0 == type_id.compare(strlen("uniq_"), strlen("UInt64"), "UInt64"))
return new AggregateFunctionUniq<UInt64>;
else if (0 == type_id.compare(strlen("uniq_"), strlen("Int64"), "Int64"))
return new AggregateFunctionUniq<Int64>;
else if (0 == type_id.compare(strlen("uniq_"), strlen("Float64"), "Float64"))
return new AggregateFunctionUniq<Float64>;
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniq>(type_id, strlen("uniq_"));
if (res)
return res;
else if (0 == type_id.compare(strlen("uniq_"), strlen("String"), "String"))
return new AggregateFunctionUniq<String>;
else
@ -275,25 +279,21 @@ AggregateFunctionPtr AggregateFunctionFactory::getByTypeID(const String & type_i
}
else if (0 == type_id.compare(0, strlen("uniqIf_"), "uniqIf_"))
{
if (0 == type_id.compare(strlen("uniqIf_"), strlen("UInt64"), "UInt64"))
return new AggregateFunctionUniqIf<UInt64>;
else if (0 == type_id.compare(strlen("uniqIf_"), strlen("Int64"), "Int64"))
return new AggregateFunctionUniqIf<Int64>;
else if (0 == type_id.compare(strlen("uniqIf_"), strlen("Float64"), "Float64"))
return new AggregateFunctionUniqIf<Float64>;
else if (0 == type_id.compare(strlen("uniqIf_"), strlen("String"), "String"))
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniqIf>(type_id, strlen("uniqIf_"));
if (res)
return res;
else if (0 == type_id.compare(strlen("uniq_"), strlen("String"), "String"))
return new AggregateFunctionUniqIf<String>;
else
throw Exception("Unknown type id of aggregate function " + type_id, ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION);
}
else if (0 == type_id.compare(0, strlen("uniqState_"), "uniqState_"))
{
if (0 == type_id.compare(strlen("uniqState_"), strlen("UInt64"), "UInt64"))
return new AggregateFunctionUniqState<UInt64>;
else if (0 == type_id.compare(strlen("uniqState_"), strlen("Int64"), "Int64"))
return new AggregateFunctionUniqState<Int64>;
else if (0 == type_id.compare(strlen("uniqState_"), strlen("Float64"), "Float64"))
return new AggregateFunctionUniqState<Float64>;
AggregateFunctionPtr res = createWithNumericType<AggregateFunctionUniqState>(type_id, strlen("uniqState_"));
if (res)
return res;
else if (0 == type_id.compare(strlen("uniqState_"), strlen("String"), "String"))
return new AggregateFunctionUniqState<String>;
else

View File

@ -185,9 +185,19 @@ void Aggregator::execute(BlockInputStreamPtr stream, AggregatedDataVariants & re
key_columns[i] = block.getByPosition(keys[i]).column;
for (size_t i = 0; i < aggregates_size; ++i)
{
for (size_t j = 0; j < aggregate_columns[i].size(); ++j)
{
aggregate_columns[i][j] = block.getByPosition(aggregates[i].arguments[j]).column;
/** Агрегатные функции рассчитывают, что в них передаются полноценные столбцы.
* Поэтому, стобцы-константы не разрешены в качестве аргументов агрегатных функций.
*/
if (aggregate_columns[i][j]->isConst())
throw Exception("Constants is not allowed as arguments of aggregate functions", ErrorCodes::ILLEGAL_COLUMN);
}
}
size_t rows = block.rows();
/// Каким способом выполнять агрегацию?