dbms: improved performance for queries with large aggregation result [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2013-02-03 23:11:21 +00:00
parent 26a73dae15
commit 3891f4a230
11 changed files with 74 additions and 92 deletions

View File

@ -10,6 +10,10 @@ namespace DB
/** Позволяет создать агрегатную функцию по её имени.
*
* Чтобы создать большое количество экземпляров агрегатных функций
* для агрегации и последующей вставки в ColumnAggregateFunction,
* создайте один объект - "прототип", и затем используйте метод cloneEmpty.
*/
class AggregateFunctionFactory
{

View File

@ -69,7 +69,4 @@ typedef SharedPtr<IAggregateFunction> AggregateFunctionPtr;
typedef IAggregateFunction* AggregateFunctionPlainPtr;
typedef AutoArray<AggregateFunctionPlainPtr> AggregateFunctionsPlainPtrs;
template <> struct TypeName<AggregateFunctionPtr> { static std::string get() { return "AggregateFunctionPtr"; } };
template <> struct NearestFieldType<AggregateFunctionPtr> { typedef AggregateFunctionPtr Type; };
}

View File

@ -9,10 +9,21 @@ namespace DB
{
/** Столбец, хранящий состояния агрегатных функций.
* Для оптимизации, агрегатные функции хранятся по обычным указателям, а не shared_ptr-ам.
* Столбец захватывает владение всеми агрегатными функциями, которые в него переданы
* (уничтожает их в дестркуторе с помощью delete).
* Это значит, что вставляемые агрегатные функции должны быть выделены с помощью new,
* и не могут быть захвачены каком-либо smart-ptr-ом.
*/
class ColumnAggregateFunction : public ColumnVector<AggregateFunctionPtr>
class ColumnAggregateFunction : public ColumnVector<AggregateFunctionPlainPtr>
{
public:
~ColumnAggregateFunction()
{
for (size_t i = 0, s = data.size(); i < s; ++i)
delete data[i];
}
std::string getName() const { return "ColumnAggregateFunction"; }
ColumnPtr cloneEmpty() const { return new ColumnAggregateFunction; };
@ -54,7 +65,7 @@ public:
void insert(const Field & x)
{
data.push_back(DB::get<const AggregateFunctionPtr &>(x));
data.push_back(DB::get<AggregateFunctionPlainPtr>(x));
}
int compareAt(size_t n, size_t m, const IColumn & rhs_) const

View File

@ -11,50 +11,21 @@
namespace DB
{
/** Специализация - заглушка, которая ничего не делает для агрегатных функций.
*/
template <>
class FieldVisitorConvertToNumber<SharedPtr<IAggregateFunction> >
: public StaticVisitor<SharedPtr<IAggregateFunction> >
namespace detail
{
public:
typedef SharedPtr<IAggregateFunction> T;
T operator() (const Null & x) const
{
throw Exception("Cannot convert NULL to Aggregate Function", ErrorCodes::CANNOT_CONVERT_TYPE);
}
T operator() (const String & x) const
{
throw Exception("Cannot convert String to Aggregate Function", ErrorCodes::CANNOT_CONVERT_TYPE);
}
T operator() (const Array & x) const
{
throw Exception("Cannot convert Array to Aggregate Function", ErrorCodes::CANNOT_CONVERT_TYPE);
}
T operator() (const SharedPtr<IAggregateFunction> & x) const
template <typename T>
T convertUInt64To(UInt64 x)
{
return x;
}
T operator() (const UInt64 & x) const
template <>
inline IAggregateFunction * convertUInt64To<IAggregateFunction *>(UInt64 x)
{
throw Exception("Cannot convert UInt64 to Aggregate Function", ErrorCodes::CANNOT_CONVERT_TYPE);
throw Exception("Logical error", ErrorCodes::LOGICAL_ERROR);
}
T operator() (const Int64 & x) const
{
throw Exception("Cannot convert Int64 to Aggregate Function", ErrorCodes::CANNOT_CONVERT_TYPE);
}
T operator() (const Float64 & x) const
{
throw Exception("Cannot convert Float64 to Aggregate Function", ErrorCodes::CANNOT_CONVERT_TYPE);
}
};
}
/** Шаблон столбцов, которые используют для хранения std::vector.
@ -121,7 +92,8 @@ public:
void insert(const Field & x)
{
data.push_back(apply_visitor(FieldVisitorConvertToNumber<typename NearestFieldType<T>::Type>(), x));
/// Это будет работать для всех числовых типов.
data.push_back(detail::convertUInt64To<T>(DB::get<UInt64>(x)));
}
void insertFrom(const IColumn & src, size_t n)

View File

@ -50,12 +50,12 @@ public:
UInt64 = 1,
Int64 = 2,
Float64 = 3,
AggregateFunction = 4, /// Состояние агрегатной функции
/// не POD типы
String = 16,
AggregateFunction = 17, /// Состояние агрегатной функции
Array = 18,
Array = 17,
};
static const int MIN_NON_POD = 16;
@ -68,8 +68,8 @@ public:
case UInt64: return "UInt64";
case Int64: return "Int64";
case Float64: return "Float64";
case AggregateFunction: return "AggregateFunctionPlainPtr";
case String: return "String";
case AggregateFunction: return "AggregateFunctionPtr";
case Array: return "Array";
default:
@ -200,8 +200,8 @@ public:
case Types::UInt64: return get<UInt64>() < rhs.get<UInt64>();
case Types::Int64: return get<Int64>() < rhs.get<Int64>();
case Types::Float64: return get<Float64>() < rhs.get<Float64>();
case Types::AggregateFunction: return get<AggregateFunctionPlainPtr>() < rhs.get<AggregateFunctionPlainPtr>();
case Types::String: return get<String>() < rhs.get<String>();
case Types::AggregateFunction: return get<AggregateFunctionPtr>() < rhs.get<AggregateFunctionPtr>();
case Types::Array: return get<Array>() < rhs.get<Array>();
default:
@ -227,8 +227,8 @@ public:
case Types::UInt64: return get<UInt64>() <= rhs.get<UInt64>();
case Types::Int64: return get<Int64>() <= rhs.get<Int64>();
case Types::Float64: return get<Float64>() <= rhs.get<Float64>();
case Types::AggregateFunction: return get<AggregateFunctionPlainPtr>() <= rhs.get<AggregateFunctionPlainPtr>();
case Types::String: return get<String>() <= rhs.get<String>();
case Types::AggregateFunction: return get<AggregateFunctionPtr>() <= rhs.get<AggregateFunctionPtr>();
case Types::Array: return get<Array>() <= rhs.get<Array>();
default:
@ -252,8 +252,8 @@ public:
case Types::UInt64:
case Types::Int64:
case Types::Float64: return get<UInt64>() == rhs.get<UInt64>();
case Types::AggregateFunction: return get<AggregateFunctionPlainPtr>() == rhs.get<AggregateFunctionPlainPtr>();
case Types::String: return get<String>() == rhs.get<String>();
case Types::AggregateFunction: return get<AggregateFunctionPtr>() == rhs.get<AggregateFunctionPtr>();
case Types::Array: return get<Array>() == rhs.get<Array>();
default:
@ -267,7 +267,7 @@ public:
}
typedef SharedPtr<IAggregateFunction> AggregateFunctionPtr;
typedef IAggregateFunction * AggregateFunctionPlainPtr;
typedef std::vector<Field> Array;
private:
@ -278,8 +278,8 @@ private:
BOOST_STATIC_ASSERT(storage_size >= sizeof(UInt64));
BOOST_STATIC_ASSERT(storage_size >= sizeof(Int64));
BOOST_STATIC_ASSERT(storage_size >= sizeof(Float64));
BOOST_STATIC_ASSERT(storage_size >= sizeof(AggregateFunctionPlainPtr));
BOOST_STATIC_ASSERT(storage_size >= sizeof(String));
BOOST_STATIC_ASSERT(storage_size >= sizeof(AggregateFunctionPtr));
BOOST_STATIC_ASSERT(storage_size >= sizeof(Array));
char storage[storage_size] __attribute__((aligned(8)));
@ -311,8 +311,8 @@ private:
case Types::UInt64: create(x.get<UInt64>()); break;
case Types::Int64: create(x.get<Int64>()); break;
case Types::Float64: create(x.get<Float64>()); break;
case Types::AggregateFunction: create(x.get<AggregateFunctionPlainPtr>()); break;
case Types::String: create(x.get<String>()); break;
case Types::AggregateFunction: create(x.get<AggregateFunctionPtr>()); break;
case Types::Array: create(x.get<Array>()); break;
}
}
@ -342,9 +342,6 @@ private:
case Types::String:
destroy<String>();
break;
case Types::AggregateFunction:
destroy<AggregateFunctionPtr>();
break;
case Types::Array:
destroy<Array>();
break;
@ -362,21 +359,21 @@ private:
};
template <> struct Field::TypeToEnum<Null> { static const Types::Which value = Types::Null; };
template <> struct Field::TypeToEnum<UInt64> { static const Types::Which value = Types::UInt64; };
template <> struct Field::TypeToEnum<Int64> { static const Types::Which value = Types::Int64; };
template <> struct Field::TypeToEnum<Float64> { static const Types::Which value = Types::Float64; };
template <> struct Field::TypeToEnum<String> { static const Types::Which value = Types::String; };
template <> struct Field::TypeToEnum<Field::AggregateFunctionPtr> { static const Types::Which value = Types::AggregateFunction; };
template <> struct Field::TypeToEnum<Field::Array> { static const Types::Which value = Types::Array; };
template <> struct Field::TypeToEnum<Null> { static const Types::Which value = Types::Null; };
template <> struct Field::TypeToEnum<UInt64> { static const Types::Which value = Types::UInt64; };
template <> struct Field::TypeToEnum<Int64> { static const Types::Which value = Types::Int64; };
template <> struct Field::TypeToEnum<Float64> { static const Types::Which value = Types::Float64; };
template <> struct Field::TypeToEnum<String> { static const Types::Which value = Types::String; };
template <> struct Field::TypeToEnum<Field::AggregateFunctionPlainPtr> { static const Types::Which value = Types::AggregateFunction; };
template <> struct Field::TypeToEnum<Field::Array> { static const Types::Which value = Types::Array; };
template <> struct Field::EnumToType<Field::Types::Null> { typedef Null Type; };
template <> struct Field::EnumToType<Field::Types::UInt64> { typedef UInt64 Type; };
template <> struct Field::EnumToType<Field::Types::Int64> { typedef Int64 Type; };
template <> struct Field::EnumToType<Field::Types::Float64> { typedef Float64 Type; };
template <> struct Field::EnumToType<Field::Types::String> { typedef String Type; };
template <> struct Field::EnumToType<Field::Types::AggregateFunction> { typedef AggregateFunctionPtr Type; };
template <> struct Field::EnumToType<Field::Types::Array> { typedef Array Type; };
template <> struct Field::EnumToType<Field::Types::Null> { typedef Null Type; };
template <> struct Field::EnumToType<Field::Types::UInt64> { typedef UInt64 Type; };
template <> struct Field::EnumToType<Field::Types::Int64> { typedef Int64 Type; };
template <> struct Field::EnumToType<Field::Types::Float64> { typedef Float64 Type; };
template <> struct Field::EnumToType<Field::Types::String> { typedef String Type; };
template <> struct Field::EnumToType<Field::Types::AggregateFunction> { typedef AggregateFunctionPlainPtr Type; };
template <> struct Field::EnumToType<Field::Types::Array> { typedef Array Type; };
template <typename T>
@ -424,8 +421,8 @@ typename Visitor::ResultType apply_visitor_impl(Visitor & visitor, F & field)
case Field::Types::UInt64: return visitor(field.template get<UInt64>());
case Field::Types::Int64: return visitor(field.template get<Int64>());
case Field::Types::Float64: return visitor(field.template get<Float64>());
case Field::Types::AggregateFunction: return visitor(field.template get<Field::AggregateFunctionPlainPtr>());
case Field::Types::String: return visitor(field.template get<String>());
case Field::Types::AggregateFunction: return visitor(field.template get<Field::AggregateFunctionPtr>());
case Field::Types::Array: return visitor(field.template get<Field::Array>());
default:
@ -471,8 +468,8 @@ typename Visitor::ResultType apply_binary_visitor_impl2(Visitor & visitor, F1 &
case Field::Types::UInt64: return visitor(field1, field2.template get<UInt64>());
case Field::Types::Int64: return visitor(field1, field2.template get<Int64>());
case Field::Types::Float64: return visitor(field1, field2.template get<Float64>());
case Field::Types::AggregateFunction: return visitor(field1, field2.template get<Field::AggregateFunctionPlainPtr>());
case Field::Types::String: return visitor(field1, field2.template get<String>());
case Field::Types::AggregateFunction: return visitor(field1, field2.template get<Field::AggregateFunctionPtr>());
case Field::Types::Array: return visitor(field1, field2.template get<Field::Array>());
default:
@ -489,8 +486,8 @@ typename Visitor::ResultType apply_binary_visitor_impl1(Visitor & visitor, F1 &
case Field::Types::UInt64: return apply_binary_visitor_impl2(visitor, field1.template get<UInt64>(), field2);
case Field::Types::Int64: return apply_binary_visitor_impl2(visitor, field1.template get<Int64>(), field2);
case Field::Types::Float64: return apply_binary_visitor_impl2(visitor, field1.template get<Float64>(), field2);
case Field::Types::AggregateFunction: return apply_binary_visitor_impl2(visitor, field1.template get<Field::AggregateFunctionPlainPtr>(), field2);
case Field::Types::String: return apply_binary_visitor_impl2(visitor, field1.template get<String>(), field2);
case Field::Types::AggregateFunction: return apply_binary_visitor_impl2(visitor, field1.template get<Field::AggregateFunctionPtr>(), field2);
case Field::Types::Array: return apply_binary_visitor_impl2(visitor, field1.template get<Field::Array>(), field2);
default:
@ -560,7 +557,7 @@ public:
String operator() (const UInt64 & x) const { return "UInt64_" + Poco::NumberFormatter::format(x); }
String operator() (const Int64 & x) const { return "Int64_" + Poco::NumberFormatter::format(x); }
String operator() (const Float64 & x) const { return "Float64_" + Poco::NumberFormatter::format(x); }
String operator() (const SharedPtr<IAggregateFunction> & x) const { return "AggregateFunction"; }
String operator() (const Field::AggregateFunctionPlainPtr & x) const { return "AggregateFunction"; }
String operator() (const String & x) const
{
@ -594,7 +591,7 @@ public:
String operator() (const UInt64 & x) const { return Poco::NumberFormatter::format(x); }
String operator() (const Int64 & x) const { return Poco::NumberFormatter::format(x); }
String operator() (const Float64 & x) const { return Poco::NumberFormatter::format(x); }
String operator() (const SharedPtr<IAggregateFunction> & x) const { return "AggregateFunction"; }
String operator() (const Field::AggregateFunctionPlainPtr & x) const { return "AggregateFunction"; }
String operator() (const String & x) const
{
@ -641,9 +638,9 @@ public:
throw Exception("Cannot convert Array to " + TypeName<T>::get(), ErrorCodes::CANNOT_CONVERT_TYPE);
}
T operator() (const SharedPtr<IAggregateFunction> & x) const
T operator() (const Field::AggregateFunctionPlainPtr & x) const
{
throw Exception("Cannot convert AggregateFunctionPtr to " + TypeName<T>::get(), ErrorCodes::CANNOT_CONVERT_TYPE);
throw Exception("Cannot convert AggregateFunctionPlainPtr to " + TypeName<T>::get(), ErrorCodes::CANNOT_CONVERT_TYPE);
}
T operator() (const UInt64 & x) const { return x; }
@ -667,6 +664,7 @@ template <> struct NearestFieldType<Float64> { typedef Float64 Type; };
template <> struct NearestFieldType<String> { typedef String Type; };
template <> struct NearestFieldType<Array> { typedef Array Type; };
template <> struct NearestFieldType<bool> { typedef UInt64 Type; };
template <> struct NearestFieldType<IAggregateFunction*> { typedef IAggregateFunction* Type; };
}

View File

@ -31,6 +31,8 @@ typedef double Float64;
typedef std::string String;
typedef std::vector<String> Strings;
class IAggregateFunction;
template <typename T> struct IsNumber { static const bool value = false; };
@ -60,5 +62,6 @@ template <> struct TypeName<Int64> { static std::string get() { return "Int64"
template <> struct TypeName<Float32> { static std::string get() { return "Float32"; } };
template <> struct TypeName<Float64> { static std::string get() { return "Float64"; } };
template <> struct TypeName<String> { static std::string get() { return "String"; } };
template <> struct TypeName<IAggregateFunction*> { static std::string get() { return "AggregateFunctionPlainPtr"; } };
}

View File

@ -44,7 +44,7 @@ public:
return new DataTypeString;
}
DataTypePtr operator() (const AggregateFunctionPtr & x) const
DataTypePtr operator() (const AggregateFunctionPlainPtr & x) const
{
throw Exception("Cannot get DataType for AggregateFunction Field", ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -26,7 +26,7 @@ int main(int argc, char ** argv)
field = DB::Null();
std::cerr << DB::apply_visitor(to_string, field) << std::endl;
field = DB::Field::AggregateFunctionPtr(NULL);
field = DB::Field::AggregateFunctionPlainPtr(NULL);
std::cerr << DB::apply_visitor(to_string, field) << std::endl;
DB::Field field2;
@ -37,7 +37,7 @@ int main(int argc, char ** argv)
array.push_back(DB::UInt64(123));
array.push_back(DB::Int64(-123));
array.push_back(DB::String("Hello"));
array.push_back(DB::Field::AggregateFunctionPtr(NULL));
array.push_back(DB::Field::AggregateFunctionPlainPtr(NULL));
field = array;
std::cerr << DB::apply_visitor(to_string, field) << std::endl;

View File

@ -15,15 +15,12 @@ using Poco::SharedPtr;
void DataTypeAggregateFunction::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
const AggregateFunctionPtr & value = get<const AggregateFunctionPtr &>(field);
value->serialize(ostr);
get<AggregateFunctionPlainPtr>(field)->serialize(ostr);
}
void DataTypeAggregateFunction::deserializeBinary(Field & field, ReadBuffer & istr) const
{
AggregateFunctionPtr value = function->cloneEmpty();
value->deserializeMerge(istr);
field = value;
throw Exception("Deserialization of individual aggregate functions is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
void DataTypeAggregateFunction::serializeBinary(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
@ -95,7 +92,7 @@ ColumnPtr DataTypeAggregateFunction::createColumn() const
ColumnPtr DataTypeAggregateFunction::createConstColumn(size_t size, const Field & field) const
{
return new ColumnConst<AggregateFunctionPtr>(size, get<AggregateFunctionPtr>(field));
throw Exception("Const column with aggregate function is not supported", ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -455,7 +455,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[0] = AggregateFunctionPtr(data[i]);
(*aggregate_columns[i])[0] = data[i];
}
else if (data_variants.type == AggregatedDataVariants::KEY_64)
{
@ -474,7 +474,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
first_column.insert(it->first);
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = AggregateFunctionPtr(it->second[i]);
(*aggregate_columns[i])[j] = it->second[i];
}
}
else if (data_variants.type == AggregatedDataVariants::KEY_STRING)
@ -488,7 +488,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
first_column.insert(String(it->first.data, it->first.size)); /// Здесь можно ускорить, сделав метод insertFrom(const char *, size_t size).
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = AggregateFunctionPtr(it->second[i]);
(*aggregate_columns[i])[j] = it->second[i];
}
}
else if (data_variants.type == AggregatedDataVariants::HASHED)
@ -502,7 +502,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
key_columns[i]->insert(it->second.first[i]);
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = AggregateFunctionPtr(it->second.second[i]);
(*aggregate_columns[i])[j] = it->second.second[i];
}
}
else if (data_variants.type == AggregatedDataVariants::GENERIC)
@ -515,7 +515,7 @@ Block Aggregator::convertToBlock(AggregatedDataVariants & data_variants)
key_columns[i]->insert(it->first[i]);
for (size_t i = 0; i < aggregates_size; ++i)
(*aggregate_columns[i])[j] = AggregateFunctionPtr(it->second[i]);
(*aggregate_columns[i])[j] = it->second[i];
}
}
else

View File

@ -46,7 +46,7 @@
* PS. Измеряйте всё сами, а то я почти запутался.
*/
#define USE_AUTO_ARRAY 0
#define USE_AUTO_ARRAY 1
#define IMITATE_UPDATES 0