Merge remote-tracking branch 'upstream/master' into fix26

This commit is contained in:
proller 2019-04-24 16:51:11 +03:00
commit 6170ccb50c
9 changed files with 177 additions and 38 deletions

View File

@ -9,55 +9,97 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
namespace namespace
{ {
/// Substitute return type for Date and DateTime /// Substitute return type for Date and DateTime
class AggregateFunctionGroupUniqArrayDate : public AggregateFunctionGroupUniqArray<DataTypeDate::FieldType> template <typename has_limit>
class AggregateFunctionGroupUniqArrayDate : public AggregateFunctionGroupUniqArray<DataTypeDate::FieldType, has_limit>
{ {
public: public:
AggregateFunctionGroupUniqArrayDate(const DataTypePtr & argument_type) : AggregateFunctionGroupUniqArray<DataTypeDate::FieldType>(argument_type) {} AggregateFunctionGroupUniqArrayDate(const DataTypePtr & argument_type, UInt64 max_elems_ = std::numeric_limits<UInt64>::max()) : AggregateFunctionGroupUniqArray<DataTypeDate::FieldType, has_limit>(argument_type, max_elems_) {}
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDate>()); } DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDate>()); }
}; };
class AggregateFunctionGroupUniqArrayDateTime : public AggregateFunctionGroupUniqArray<DataTypeDateTime::FieldType> template <typename has_limit>
class AggregateFunctionGroupUniqArrayDateTime : public AggregateFunctionGroupUniqArray<DataTypeDateTime::FieldType, has_limit>
{ {
public: public:
AggregateFunctionGroupUniqArrayDateTime(const DataTypePtr & argument_type) : AggregateFunctionGroupUniqArray<DataTypeDateTime::FieldType>(argument_type) {} AggregateFunctionGroupUniqArrayDateTime(const DataTypePtr & argument_type, UInt64 max_elems_ = std::numeric_limits<UInt64>::max()) : AggregateFunctionGroupUniqArray<DataTypeDateTime::FieldType, has_limit>(argument_type, max_elems_) {}
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>()); } DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>()); }
}; };
template <typename has_limit, typename ... TArgs>
static IAggregateFunction * createWithExtraTypes(const DataTypePtr & argument_type) static IAggregateFunction * createWithExtraTypes(const DataTypePtr & argument_type, TArgs && ... args)
{ {
WhichDataType which(argument_type); WhichDataType which(argument_type);
if (which.idx == TypeIndex::Date) return new AggregateFunctionGroupUniqArrayDate(argument_type); if (which.idx == TypeIndex::Date) return new AggregateFunctionGroupUniqArrayDate<has_limit>(argument_type, std::forward<TArgs>(args)...);
else if (which.idx == TypeIndex::DateTime) return new AggregateFunctionGroupUniqArrayDateTime(argument_type); else if (which.idx == TypeIndex::DateTime) return new AggregateFunctionGroupUniqArrayDateTime<has_limit>(argument_type, std::forward<TArgs>(args)...);
else else
{ {
/// Check that we can use plain version of AggreagteFunctionGroupUniqArrayGeneric /// Check that we can use plain version of AggreagteFunctionGroupUniqArrayGeneric
if (argument_type->isValueUnambiguouslyRepresentedInContiguousMemoryRegion()) if (argument_type->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
return new AggreagteFunctionGroupUniqArrayGeneric<true>(argument_type); return new AggreagteFunctionGroupUniqArrayGeneric<true, has_limit>(argument_type, std::forward<TArgs>(args)...);
else else
return new AggreagteFunctionGroupUniqArrayGeneric<false>(argument_type); return new AggreagteFunctionGroupUniqArrayGeneric<false, has_limit>(argument_type, std::forward<TArgs>(args)...);
} }
} }
template <typename has_limit, typename ... TArgs>
inline AggregateFunctionPtr createAggregateFunctionGroupUniqArrayImpl(const std::string & name, const DataTypePtr & argument_type, TArgs ... args)
{
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionGroupUniqArray, has_limit, const DataTypePtr &, TArgs...>(*argument_type, argument_type, std::forward<TArgs>(args)...));
if (!res)
res = AggregateFunctionPtr(createWithExtraTypes<has_limit>(argument_type, std::forward<TArgs>(args)...));
if (!res)
throw Exception("Illegal type " + argument_type->getName() +
" of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return res;
}
AggregateFunctionPtr createAggregateFunctionGroupUniqArray(const std::string & name, const DataTypes & argument_types, const Array & parameters) AggregateFunctionPtr createAggregateFunctionGroupUniqArray(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{ {
assertNoParameters(name, parameters);
assertUnary(name, argument_types); assertUnary(name, argument_types);
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionGroupUniqArray>(*argument_types[0], argument_types[0])); bool limit_size = false;
UInt64 max_elems = std::numeric_limits<UInt64>::max();
if (!res) if (parameters.empty())
res = AggregateFunctionPtr(createWithExtraTypes(argument_types[0])); {
// no limit
}
else if (parameters.size() == 1)
{
auto type = parameters[0].getType();
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
if (!res) if ((type == Field::Types::Int64 && parameters[0].get<Int64>() < 0) ||
throw Exception("Illegal type " + argument_types[0]->getName() + (type == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
" of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); throw Exception("Parameter for aggregate function " + name + " should be positive number", ErrorCodes::BAD_ARGUMENTS);
return res; limit_size = true;
max_elems = parameters[0].get<UInt64>();
}
else
throw Exception("Incorrect number of parameters for aggregate function " + name + ", should be 0 or 1",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!limit_size)
return createAggregateFunctionGroupUniqArrayImpl<std::false_type>(name, argument_types[0]);
else
return createAggregateFunctionGroupUniqArrayImpl<std::true_type>(name, argument_types[0], max_elems);
} }
} }

View File

@ -36,16 +36,21 @@ struct AggregateFunctionGroupUniqArrayData
/// Puts all values to the hash set. Returns an array of unique values. Implemented for numeric types. /// Puts all values to the hash set. Returns an array of unique values. Implemented for numeric types.
template <typename T> template <typename T, typename Tlimit_num_elem>
class AggregateFunctionGroupUniqArray class AggregateFunctionGroupUniqArray
: public IAggregateFunctionDataHelper<AggregateFunctionGroupUniqArrayData<T>, AggregateFunctionGroupUniqArray<T>> : public IAggregateFunctionDataHelper<AggregateFunctionGroupUniqArrayData<T>, AggregateFunctionGroupUniqArray<T, Tlimit_num_elem>>
{ {
static constexpr bool limit_num_elems = Tlimit_num_elem::value;
UInt64 max_elems;
private: private:
using State = AggregateFunctionGroupUniqArrayData<T>; using State = AggregateFunctionGroupUniqArrayData<T>;
public: public:
AggregateFunctionGroupUniqArray(const DataTypePtr & argument_type) AggregateFunctionGroupUniqArray(const DataTypePtr & argument_type, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<AggregateFunctionGroupUniqArrayData<T>, AggregateFunctionGroupUniqArray<T>>({argument_type}, {}) {} : IAggregateFunctionDataHelper<AggregateFunctionGroupUniqArrayData<T>,
AggregateFunctionGroupUniqArray<T, Tlimit_num_elem>>({argument_type}, {}),
max_elems(max_elems_) {}
String getName() const override { return "groupUniqArray"; } String getName() const override { return "groupUniqArray"; }
@ -56,12 +61,27 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{ {
if (limit_num_elems && this->data(place).value.size() >= max_elems)
return;
this->data(place).value.insert(static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num]); this->data(place).value.insert(static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num]);
} }
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{ {
if (!limit_num_elems)
this->data(place).value.merge(this->data(rhs).value); this->data(place).value.merge(this->data(rhs).value);
else
{
auto & cur_set = this->data(place).value;
auto & rhs_set = this->data(rhs).value;
for (auto & rhs_elem : rhs_set)
{
if (cur_set.size() >= max_elems)
return;
cur_set.insert(rhs_elem.getValue());
}
}
} }
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
@ -111,25 +131,43 @@ struct AggreagteFunctionGroupUniqArrayGenericData
Set value; Set value;
}; };
/// Helper function for deserialize and insert for the class AggreagteFunctionGroupUniqArrayGeneric
template <bool is_plain_column>
static StringRef getSerializationImpl(const IColumn & column, size_t row_num, Arena & arena);
template <bool is_plain_column>
static void deserializeAndInsertImpl(StringRef str, IColumn & data_to);
/** Template parameter with true value should be used for columns that store their elements in memory continuously. /** Template parameter with true value should be used for columns that store their elements in memory continuously.
* For such columns groupUniqArray() can be implemented more efficiently (especially for small numeric arrays). * For such columns groupUniqArray() can be implemented more efficiently (especially for small numeric arrays).
*/ */
template <bool is_plain_column = false> template <bool is_plain_column = false, typename Tlimit_num_elem = std::false_type>
class AggreagteFunctionGroupUniqArrayGeneric class AggreagteFunctionGroupUniqArrayGeneric
: public IAggregateFunctionDataHelper<AggreagteFunctionGroupUniqArrayGenericData, AggreagteFunctionGroupUniqArrayGeneric<is_plain_column>> : public IAggregateFunctionDataHelper<AggreagteFunctionGroupUniqArrayGenericData, AggreagteFunctionGroupUniqArrayGeneric<is_plain_column, Tlimit_num_elem>>
{ {
DataTypePtr & input_data_type; DataTypePtr & input_data_type;
static constexpr bool limit_num_elems = Tlimit_num_elem::value;
UInt64 max_elems;
using State = AggreagteFunctionGroupUniqArrayGenericData; using State = AggreagteFunctionGroupUniqArrayGenericData;
static StringRef getSerialization(const IColumn & column, size_t row_num, Arena & arena); static StringRef getSerialization(const IColumn & column, size_t row_num, Arena & arena)
{
return getSerializationImpl<is_plain_column>(column, row_num, arena);
}
static void deserializeAndInsert(StringRef str, IColumn & data_to); static void deserializeAndInsert(StringRef str, IColumn & data_to)
{
return deserializeAndInsertImpl<is_plain_column>(str, data_to);
}
public: public:
AggreagteFunctionGroupUniqArrayGeneric(const DataTypePtr & input_data_type) AggreagteFunctionGroupUniqArrayGeneric(const DataTypePtr & input_data_type, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<AggreagteFunctionGroupUniqArrayGenericData, AggreagteFunctionGroupUniqArrayGeneric<is_plain_column>>({input_data_type}, {}) : IAggregateFunctionDataHelper<AggreagteFunctionGroupUniqArrayGenericData, AggreagteFunctionGroupUniqArrayGeneric<is_plain_column, Tlimit_num_elem>>({input_data_type}, {})
, input_data_type(this->argument_types[0]) {} , input_data_type(this->argument_types[0])
, max_elems(max_elems_) {}
String getName() const override { return "groupUniqArray"; } String getName() const override { return "groupUniqArray"; }
@ -174,7 +212,10 @@ public:
bool inserted; bool inserted;
State::Set::iterator it; State::Set::iterator it;
if (limit_num_elems && set.size() >= max_elems)
return;
StringRef str_serialized = getSerialization(*columns[0], row_num, *arena); StringRef str_serialized = getSerialization(*columns[0], row_num, *arena);
set.emplace(str_serialized, it, inserted); set.emplace(str_serialized, it, inserted);
if constexpr (!is_plain_column) if constexpr (!is_plain_column)
@ -198,6 +239,8 @@ public:
State::Set::iterator it; State::Set::iterator it;
for (auto & rhs_elem : rhs_set) for (auto & rhs_elem : rhs_set)
{ {
if (limit_num_elems && cur_set.size() >= max_elems)
return ;
cur_set.emplace(rhs_elem.getValue(), it, inserted); cur_set.emplace(rhs_elem.getValue(), it, inserted);
if (inserted) if (inserted)
{ {
@ -229,31 +272,30 @@ public:
template <> template <>
inline StringRef AggreagteFunctionGroupUniqArrayGeneric<false>::getSerialization(const IColumn & column, size_t row_num, Arena & arena) inline StringRef getSerializationImpl<false>(const IColumn & column, size_t row_num, Arena & arena)
{ {
const char * begin = nullptr; const char * begin = nullptr;
return column.serializeValueIntoArena(row_num, arena, begin); return column.serializeValueIntoArena(row_num, arena, begin);
} }
template <> template <>
inline StringRef AggreagteFunctionGroupUniqArrayGeneric<true>::getSerialization(const IColumn & column, size_t row_num, Arena &) inline StringRef getSerializationImpl<true>(const IColumn & column, size_t row_num, Arena &)
{ {
return column.getDataAt(row_num); return column.getDataAt(row_num);
} }
template <> template <>
inline void AggreagteFunctionGroupUniqArrayGeneric<false>::deserializeAndInsert(StringRef str, IColumn & data_to) inline void deserializeAndInsertImpl<false>(StringRef str, IColumn & data_to)
{ {
data_to.deserializeAndInsertFromArena(str.data); data_to.deserializeAndInsertFromArena(str.data);
} }
template <> template <>
inline void AggreagteFunctionGroupUniqArrayGeneric<true>::deserializeAndInsert(StringRef str, IColumn & data_to) inline void deserializeAndInsertImpl<true>(StringRef str, IColumn & data_to)
{ {
data_to.insertData(str.data, str.size); data_to.insertData(str.data, str.size);
} }
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE #undef AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE
} }

View File

@ -18,3 +18,23 @@
1000 1000
1000 1000
1000 1000
10
10
10
10
10
10
10
10
10
10
1000
1000
1000
1000
1000
1000
1000
1000
1000
1000

View File

@ -6,5 +6,7 @@ INSERT INTO test.group_uniq_str SELECT 5 as id, toString(number % 100) as v FROM
SELECT length(groupUniqArray(v)) FROM test.group_uniq_str GROUP BY id ORDER BY id; SELECT length(groupUniqArray(v)) FROM test.group_uniq_str GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(v)) FROM remote('127.0.0.{2,3,4,5}', 'test', 'group_uniq_str') GROUP BY id ORDER BY id; SELECT length(groupUniqArray(v)) FROM remote('127.0.0.{2,3,4,5}', 'test', 'group_uniq_str') GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(10)(v)) FROM test.group_uniq_str GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(10000)(v)) FROM test.group_uniq_str GROUP BY id ORDER BY id;
DROP TABLE IF EXISTS test.group_uniq_str; DROP TABLE IF EXISTS test.group_uniq_str;

View File

@ -18,3 +18,23 @@
20001 20001
20001 20001
20001 20001
10
10
10
10
10
10
10
10
10
10
20001
20001
20001
20001
20001
20001
20001
20001
20001
20001

View File

@ -5,5 +5,8 @@ CREATE TABLE test.group_uniq_arr_int ENGINE = Memory AS
SELECT length(groupUniqArray(v)) FROM test.group_uniq_arr_int GROUP BY id ORDER BY id; SELECT length(groupUniqArray(v)) FROM test.group_uniq_arr_int GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(v)) FROM remote('127.0.0.{2,3,4,5}', 'test', 'group_uniq_arr_int') GROUP BY id ORDER BY id; SELECT length(groupUniqArray(v)) FROM remote('127.0.0.{2,3,4,5}', 'test', 'group_uniq_arr_int') GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(10)(v)) FROM test.group_uniq_arr_int GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(100000)(v)) FROM test.group_uniq_arr_int GROUP BY id ORDER BY id;
DROP TABLE IF EXISTS test.group_uniq_arr_int; DROP TABLE IF EXISTS test.group_uniq_arr_int;

View File

@ -1 +1,5 @@
['2016-12-16'] ['2016-12-16 12:00:00'] ['2016-12-16'] ['2016-12-16 12:00:00']
2 2 3 3
1 1 1 1
3 3
1 1

View File

@ -1,5 +1,8 @@
DROP TABLE IF EXISTS grop_uniq_array_date; DROP TABLE IF EXISTS grop_uniq_array_date;
CREATE TABLE grop_uniq_array_date (d Date, dt DateTime) ENGINE = Memory; CREATE TABLE grop_uniq_array_date (d Date, dt DateTime, id Integer) ENGINE = Memory;
INSERT INTO grop_uniq_array_date VALUES (toDate('2016-12-16'), toDateTime('2016-12-16 12:00:00')) (toDate('2016-12-16'), toDateTime('2016-12-16 12:00:00')); INSERT INTO grop_uniq_array_date VALUES (toDate('2016-12-16'), toDateTime('2016-12-16 12:00:00'), 1) (toDate('2016-12-16'), toDateTime('2016-12-16 12:00:00'), 1);
SELECT groupUniqArray(d), groupUniqArray(dt) FROM grop_uniq_array_date; SELECT groupUniqArray(d), groupUniqArray(dt) FROM grop_uniq_array_date;
INSERT INTO grop_uniq_array_date VALUES (toDate('2016-12-17'), toDateTime('2016-12-17 12:00:00'), 1), (toDate('2016-12-18'), toDateTime('2016-12-18 12:00:00'), 1), (toDate('2016-12-16'), toDateTime('2016-12-16 12:00:00'), 2);
SELECT length(groupUniqArray(2)(d)), length(groupUniqArray(2)(dt)), length(groupUniqArray(d)), length(groupUniqArray(dt)) FROM grop_uniq_array_date GROUP BY id ORDER BY id;
SELECT length(groupUniqArray(10000)(d)), length(groupUniqArray(10000)(dt)) FROM grop_uniq_array_date GROUP BY id ORDER BY id;
DROP TABLE IF EXISTS grop_uniq_array_date; DROP TABLE IF EXISTS grop_uniq_array_date;

View File

@ -369,10 +369,13 @@ Optional parameters:
- The default value for substituting in empty positions. - The default value for substituting in empty positions.
- The length of the resulting array. This allows you to receive arrays of the same size for all the aggregate keys. When using this parameter, the default value must be specified. - The length of the resulting array. This allows you to receive arrays of the same size for all the aggregate keys. When using this parameter, the default value must be specified.
## groupUniqArray(x) ## groupUniqArray(x), groupUniqArray(max_size)(x)
Creates an array from different argument values. Memory consumption is the same as for the `uniqExact` function. Creates an array from different argument values. Memory consumption is the same as for the `uniqExact` function.
The second version (with the `max_size` parameter) limits the size of the resulting array to `max_size` elements.
For example, `groupUniqArray (1) (x)` is equivalent to `[any (x)]`.
## quantile(level)(x) ## quantile(level)(x)
Approximates the `level` quantile. `level` is a constant, a floating-point number from 0 to 1. Approximates the `level` quantile. `level` is a constant, a floating-point number from 0 to 1.