dbms: improved performance of aggregate functions min, max, any, anyLast [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-08-18 09:45:41 +04:00
parent 6a3f204705
commit 4a3d9082f7
6 changed files with 522 additions and 326 deletions

View File

@ -1,97 +0,0 @@
#pragma once
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/AggregateFunctions/IUnaryAggregateFunction.h>
namespace DB
{
struct AggregateFunctionAnyData
{
Field value;
};
/// Берёт первое попавшееся значение
class AggregateFunctionAny final : public IUnaryAggregateFunction<AggregateFunctionAnyData, AggregateFunctionAny>
{
private:
DataTypePtr type;
public:
String getName() const { return "any"; }
DataTypePtr getReturnType() const
{
return type;
}
void setArgument(const DataTypePtr & argument)
{
type = argument;
}
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
Data & d = data(place);
if (!d.value.isNull())
return;
column.get(row_num, d.value);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
Data & d = data(place);
if (d.value.isNull())
d.value = data(rhs).value;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
const Data & d = data(place);
if (unlikely(d.value.isNull()))
{
writeBinary(false, buf);
}
else
{
writeBinary(true, buf);
type->serializeBinary(data(place).value, buf);
}
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
Data & d = data(place);
bool is_not_null = false;
readBinary(is_not_null, buf);
if (is_not_null)
{
Field tmp;
type->deserializeBinary(tmp, buf);
if (d.value.isNull())
d.value = tmp;
}
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
if (unlikely(data(place).value.isNull()))
to.insertDefault();
else
to.insert(data(place).value);
}
};
}

View File

@ -1,83 +0,0 @@
#pragma once
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/AggregateFunctions/IUnaryAggregateFunction.h>
namespace DB
{
struct AggregateFunctionAnyLastData
{
Field value;
};
/// Берёт последнее попавшееся значение
class AggregateFunctionAnyLast final : public IUnaryAggregateFunction<AggregateFunctionAnyLastData, AggregateFunctionAnyLast>
{
private:
DataTypePtr type;
public:
String getName() const { return "anyLast"; }
DataTypePtr getReturnType() const
{
return type;
}
void setArgument(const DataTypePtr & argument)
{
type = argument;
}
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
column.get(row_num, data(place).value);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
if (!data(rhs).value.isNull())
data(place).value = data(rhs).value;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
const Data & d = data(place);
if (unlikely(d.value.isNull()))
{
writeBinary(false, buf);
}
else
{
writeBinary(true, buf);
type->serializeBinary(data(place).value, buf);
}
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
bool is_not_null = false;
readBinary(is_not_null, buf);
if (is_not_null)
type->deserializeBinary(data(place).value, buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
if (unlikely(data(place).value.isNull()))
to.insertDefault();
else
to.insert(data(place).value);
}
};
}

View File

@ -1,132 +0,0 @@
#pragma once
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/AggregateFunctions/IUnaryAggregateFunction.h>
namespace DB
{
struct AggregateFunctionMinTraits
{
static bool better(const Field & lhs, const Field & rhs) { return lhs < rhs; }
static String name() { return "min"; }
};
struct AggregateFunctionMaxTraits
{
static bool better(const Field & lhs, const Field & rhs) { return lhs > rhs; }
static String name() { return "max"; }
};
struct AggregateFunctionsMinMaxData
{
Field value;
};
/// Берёт минимальное (или максимальное) значение. Если таких много - то первое попавшееся из них.
template <typename Traits>
class AggregateFunctionsMinMax final : public IUnaryAggregateFunction<AggregateFunctionsMinMaxData, AggregateFunctionsMinMax<Traits> >
{
private:
typedef typename IAggregateFunctionHelper<AggregateFunctionsMinMaxData>::Data Data;
DataTypePtr type;
public:
String getName() const { return Traits::name(); }
DataTypePtr getReturnType() const
{
return type;
}
void setArgument(const DataTypePtr & argument)
{
type = argument;
}
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
Field value;
column.get(row_num, value);
Data & d = this->data(place);
if (!d.value.isNull())
{
if (Traits::better(value, d.value))
d.value = value;
}
else
d.value = value;
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
Data & d = this->data(place);
const Data & d_rhs = this->data(rhs);
if (!d.value.isNull())
{
if (Traits::better(d_rhs.value, d.value))
d.value = d_rhs.value;
}
else
d.value = d_rhs.value;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
const Data & d = this->data(place);
if (unlikely(d.value.isNull()))
{
writeBinary(false, buf);
}
else
{
writeBinary(true, buf);
type->serializeBinary(this->data(place).value, buf);
}
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
Data & d = this->data(place);
bool is_not_null = false;
readBinary(is_not_null, buf);
if (is_not_null)
{
if (!d.value.isNull())
{
Field value_;
type->deserializeBinary(value_, buf);
if (Traits::better(value_, d.value))
d.value = value_;
}
else
type->deserializeBinary(d.value, buf);
}
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
if (unlikely(this->data(place).value.isNull()))
to.insertDefault();
else
to.insert(this->data(place).value);
}
};
typedef AggregateFunctionsMinMax<AggregateFunctionMinTraits> AggregateFunctionMin;
typedef AggregateFunctionsMinMax<AggregateFunctionMaxTraits> AggregateFunctionMax;
}

View File

@ -0,0 +1,476 @@
#pragma once
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/Columns/ColumnVector.h>
#include <DB/Columns/ColumnString.h>
#include <DB/AggregateFunctions/IUnaryAggregateFunction.h>
namespace DB
{
/** Агрегатные функции, запоминающие одно какое-либо переданное значение.
* Например, min, max, any, anyLast.
*/
/// Для числовых значений.
template <typename T>
struct SingleValueDataFixed
{
typedef SingleValueDataFixed<T> Self;
bool has_value = false; /// Надо запомнить, было ли передано хотя бы одно значение. Это нужно для AggregateFunctionIf.
T value;
bool has() const
{
return has_value;
}
void insertResultInto(IColumn & to) const
{
if (has())
static_cast<ColumnVector<T> &>(to).getData().push_back(value);
else
static_cast<ColumnVector<T> &>(to).insertDefault();
}
void write(WriteBuffer & buf, const IDataType & data_type) const
{
writeBinary(has(), buf);
if (has())
writeBinary(value, buf);
}
void read(ReadBuffer & buf, const IDataType & data_type)
{
readBinary(has_value, buf);
if (has())
readBinary(value, buf);
}
void change(const IColumn & column, size_t row_num)
{
has_value = true;
value = static_cast<const ColumnVector<T> &>(column).getData()[row_num];
}
void change(const Self & to)
{
has_value = true;
value = to.value;
}
void changeFirstTime(const IColumn & column, size_t row_num)
{
if (!has())
change(column, row_num);
}
void changeFirstTime(const Self & to)
{
if (!has())
change(to);
}
void changeIfLess(const IColumn & column, size_t row_num)
{
if (!has() || static_cast<const ColumnVector<T> &>(column).getData()[row_num] < value)
change(column, row_num);
}
void changeIfLess(const Self & to)
{
if (!has() || to.value < value)
change(to);
}
void changeIfGreater(const IColumn & column, size_t row_num)
{
if (!has() || static_cast<const ColumnVector<T> &>(column).getData()[row_num] > value)
change(column, row_num);
}
void changeIfGreater(const Self & to)
{
if (!has() || to.value > value)
change(to);
}
};
/** Для строк. Короткие строки хранятся в самой структуре, а длинные выделяются отдельно.
* NOTE Могло бы подойти также для массивов чисел.
*/
struct __attribute__((__packed__)) SingleValueDataString
{
typedef SingleValueDataString Self;
Int32 size = -1; /// -1 обозначает, что значения нет.
static constexpr Int32 AUTOMATIC_STORAGE_SIZE = 64;
static constexpr Int32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size);
union
{
char small_data[MAX_SMALL_STRING_SIZE]; /// Включая завершающий ноль.
char * large_data;
};
~SingleValueDataString()
{
if (size > MAX_SMALL_STRING_SIZE)
free(large_data);
}
bool has() const
{
return size >= 0;
}
const char * getData() const
{
return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data;
}
StringRef getStringRef() const
{
return StringRef(getData(), size);
}
void insertResultInto(IColumn & to) const
{
if (has())
static_cast<ColumnString &>(to).insertDataWithTerminatingZero(getData(), size);
else
static_cast<ColumnString &>(to).insertDefault();
}
void write(WriteBuffer & buf, const IDataType & data_type) const
{
writeBinary(size, buf);
if (has())
buf.write(getData(), size);
}
void read(ReadBuffer & buf, const IDataType & data_type)
{
Int32 rhs_size;
readBinary(rhs_size, buf);
if (rhs_size >= 0)
{
if (rhs_size <= MAX_SMALL_STRING_SIZE)
{
if (size > MAX_SMALL_STRING_SIZE)
free(large_data);
size = rhs_size;
buf.read(small_data, size);
}
else
{
if (size < rhs_size)
{
if (size > MAX_SMALL_STRING_SIZE)
free(large_data);
large_data = reinterpret_cast<char *>(malloc(rhs_size));
}
size = rhs_size;
buf.read(large_data, size);
}
}
else
{
if (size > MAX_SMALL_STRING_SIZE)
free(large_data);
size = rhs_size;
}
}
void changeImpl(StringRef value)
{
Int32 value_size = value.size;
if (value_size <= MAX_SMALL_STRING_SIZE)
{
if (size > MAX_SMALL_STRING_SIZE)
free(large_data);
size = value_size;
memcpy(small_data, value.data, size);
}
else
{
if (size < value_size)
{
if (size > MAX_SMALL_STRING_SIZE)
free(large_data);
large_data = reinterpret_cast<char *>(malloc(value.size));
}
size = value_size;
memcpy(large_data, value.data, size);
}
}
void change(const IColumn & column, size_t row_num)
{
changeImpl(static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num));
}
void change(const Self & to)
{
changeImpl(to.getStringRef());
}
void changeFirstTime(const IColumn & column, size_t row_num)
{
if (!has())
change(column, row_num);
}
void changeFirstTime(const Self & to)
{
if (!has())
change(to);
}
void changeIfLess(const IColumn & column, size_t row_num)
{
if (!has() || static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) < getStringRef())
change(column, row_num);
}
void changeIfLess(const Self & to)
{
if (!has() || to.getStringRef() < getStringRef())
change(to);
}
void changeIfGreater(const IColumn & column, size_t row_num)
{
if (!has() || static_cast<const ColumnString &>(column).getDataAtWithTerminatingZero(row_num) > getStringRef())
change(column, row_num);
}
void changeIfGreater(const Self & to)
{
if (!has() || to.getStringRef() > getStringRef())
change(to);
}
};
/// Для любых других типов значений.
struct SingleValueDataGeneric
{
typedef SingleValueDataGeneric Self;
Field value;
bool has() const
{
return !value.isNull();
}
void insertResultInto(IColumn & to) const
{
if (has())
to.insert(value);
else
to.insertDefault();
}
void write(WriteBuffer & buf, const IDataType & data_type) const
{
if (!value.isNull())
{
writeBinary(true, buf);
data_type.serializeBinary(value, buf);
}
else
writeBinary(false, buf);
}
void read(ReadBuffer & buf, const IDataType & data_type)
{
bool is_not_null;
readBinary(is_not_null, buf);
if (is_not_null)
data_type.deserializeBinary(value, buf);
}
void change(const IColumn & column, size_t row_num)
{
column.get(row_num, value);
}
void change(const Self & to)
{
value = to.value;
}
void changeFirstTime(const IColumn & column, size_t row_num)
{
if (!has())
change(column, row_num);
}
void changeFirstTime(const Self & to)
{
if (!has())
change(to);
}
void changeIfLess(const IColumn & column, size_t row_num)
{
if (!has())
change(column, row_num);
else
{
Field new_value;
column.get(row_num, new_value);
if (new_value < value)
value = new_value;
}
}
void changeIfLess(const Self & to)
{
if (!has() || to.value < value)
change(to);
}
void changeIfGreater(const IColumn & column, size_t row_num)
{
if (!has())
change(column, row_num);
else
{
Field new_value;
column.get(row_num, new_value);
if (new_value > value)
value = new_value;
}
}
void changeIfGreater(const Self & to)
{
if (!has() || to.value > value)
change(to);
}
};
/** То, чем отличаются друг от другая агрегатные функции min, max, any, anyLast
* (условием, при котором сохранённое значение заменяется на новое,
* а также, конечно, именем).
*/
template <typename Data>
struct AggregateFunctionMinData : Data
{
typedef AggregateFunctionMinData<Data> Self;
void changeIfBetter(const IColumn & column, size_t row_num) { this->changeIfLess(column, row_num); }
void changeIfBetter(const Self & to) { this->changeIfLess(to); }
static const char * name() { return "min"; }
};
template <typename Data>
struct AggregateFunctionMaxData : Data
{
typedef AggregateFunctionMaxData<Data> Self;
void changeIfBetter(const IColumn & column, size_t row_num) { this->changeIfGreater(column, row_num); }
void changeIfBetter(const Self & to) { this->changeIfGreater(to); }
static const char * name() { return "max"; }
};
template <typename Data>
struct AggregateFunctionAnyData : Data
{
typedef AggregateFunctionAnyData<Data> Self;
void changeIfBetter(const IColumn & column, size_t row_num) { this->changeFirstTime(column, row_num); }
void changeIfBetter(const Self & to) { this->changeFirstTime(to); }
static const char * name() { return "any"; }
};
template <typename Data>
struct AggregateFunctionAnyLastData : Data
{
typedef AggregateFunctionAnyLastData<Data> Self;
void changeIfBetter(const IColumn & column, size_t row_num) { this->change(column, row_num); }
void changeIfBetter(const Self & to) { this->change(to); }
static const char * name() { return "anyLast"; }
};
template <typename Data>
class AggregateFunctionsSingleValue final : public IUnaryAggregateFunction<Data, AggregateFunctionsSingleValue<Data> >
{
private:
DataTypePtr type;
public:
String getName() const { return Data::name(); }
DataTypePtr getReturnType() const
{
return type;
}
void setArgument(const DataTypePtr & argument)
{
type = argument;
}
void addOne(AggregateDataPtr place, const IColumn & column, size_t row_num) const
{
this->data(place).changeIfBetter(column, row_num);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
this->data(place).changeIfBetter(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
this->data(place).write(buf, *type.get());
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
Data rhs; /// Для строчек не очень оптимально, так как может делаться одна лишняя аллокация.
rhs.read(buf, *type.get());
this->data(place).changeIfBetter(rhs);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
this->data(place).insertResultInto(to);
}
};
}

View File

@ -26,7 +26,7 @@ struct StringRef
typedef std::vector<StringRef> StringRefs;
inline bool operator==(StringRef lhs, StringRef rhs)
inline bool operator== (StringRef lhs, StringRef rhs)
{
/// Так почему-то быстрее, чем return lhs.size == rhs.size && 0 == memcmp(lhs.data, rhs.data, lhs.size);
@ -40,18 +40,21 @@ inline bool operator==(StringRef lhs, StringRef rhs)
return true;
}
inline bool operator!=(StringRef lhs, StringRef rhs)
inline bool operator!= (StringRef lhs, StringRef rhs)
{
return !(lhs == rhs);
}
inline bool operator<(StringRef lhs, StringRef rhs)
inline bool operator< (StringRef lhs, StringRef rhs)
{
int cmp = memcmp(lhs.data, rhs.data, std::min(lhs.size, rhs.size));
if (cmp == 0)
return lhs.size < rhs.size;
else
return cmp < 0;
return cmp < 0 || (cmp == 0 && lhs.size < rhs.size);
}
inline bool operator> (StringRef lhs, StringRef rhs)
{
int cmp = memcmp(lhs.data, rhs.data, std::min(lhs.size, rhs.size));
return cmp > 0 || (cmp == 0 && lhs.size > rhs.size);
}

View File

@ -1,9 +1,7 @@
#include <DB/AggregateFunctions/AggregateFunctionCount.h>
#include <DB/AggregateFunctions/AggregateFunctionSum.h>
#include <DB/AggregateFunctions/AggregateFunctionAvg.h>
#include <DB/AggregateFunctions/AggregateFunctionAny.h>
#include <DB/AggregateFunctions/AggregateFunctionAnyLast.h>
#include <DB/AggregateFunctions/AggregateFunctionsMinMax.h>
#include <DB/AggregateFunctions/AggregateFunctionsMinMaxAny.h>
#include <DB/AggregateFunctions/AggregateFunctionsArgMinMax.h>
#include <DB/AggregateFunctions/AggregateFunctionUniq.h>
#include <DB/AggregateFunctions/AggregateFunctionUniqUpTo.h>
@ -69,6 +67,7 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
return nullptr;
}
template<template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type)
{
@ -87,18 +86,48 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
}
/// min, max, any, anyLast
template<template <typename> class AggregateFunctionTemplate, template <typename> class Data>
static IAggregateFunction * createAggregateFunctionSingleValue(const String & name, const DataTypes & argument_types)
{
if (argument_types.size() != 1)
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const IDataType & argument_type = *argument_types[0];
if (typeid_cast<const DataTypeUInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<UInt8>>>;
else if (typeid_cast<const DataTypeUInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<UInt16>>>;
else if (typeid_cast<const DataTypeUInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<UInt32>>>;
else if (typeid_cast<const DataTypeUInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<UInt64>>>;
else if (typeid_cast<const DataTypeInt8 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<Int8>>>;
else if (typeid_cast<const DataTypeInt16 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<Int16>>>;
else if (typeid_cast<const DataTypeInt32 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<Int32>>>;
else if (typeid_cast<const DataTypeInt64 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<Int64>>>;
else if (typeid_cast<const DataTypeFloat32 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<Float32>>>;
else if (typeid_cast<const DataTypeFloat64 *>(&argument_type)) return new AggregateFunctionTemplate<Data<SingleValueDataFixed<Float64>>>;
else if (typeid_cast<const DataTypeDate *>(&argument_type))
return new AggregateFunctionTemplate<Data<SingleValueDataFixed<DataTypeDate::FieldType>>>;
else if (typeid_cast<const DataTypeDateTime*>(&argument_type))
return new AggregateFunctionTemplate<Data<SingleValueDataFixed<DataTypeDateTime::FieldType>>>;
else if (typeid_cast<const DataTypeString*>(&argument_type))
return new AggregateFunctionTemplate<Data<SingleValueDataString>>;
else
return new AggregateFunctionTemplate<Data<SingleValueDataGeneric>>;
}
AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const DataTypes & argument_types, int recursion_level) const
{
if (name == "count")
return new AggregateFunctionCount;
else if (name == "any")
return new AggregateFunctionAny;
return createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyData>(name, argument_types);
else if (name == "anyLast")
return new AggregateFunctionAnyLast;
return createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionAnyLastData>(name, argument_types);
else if (name == "min")
return new AggregateFunctionMin;
return createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMinData>(name, argument_types);
else if (name == "max")
return new AggregateFunctionMax;
return createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMaxData>(name, argument_types);
else if (name == "argMin")
return new AggregateFunctionArgMin;
else if (name == "argMax")