Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
alesapin 2019-06-25 15:26:45 +03:00
commit 60960f3340
7 changed files with 334 additions and 3 deletions

View File

@ -77,6 +77,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
throw Exception("Logical error: cannot find aggregate function combinator to apply a function to Nullable arguments.", ErrorCodes::LOGICAL_ERROR);
DataTypes nested_types = combinator->transformArguments(type_without_low_cardinality);
Array nested_parameters = combinator->transformParameters(parameters);
AggregateFunctionPtr nested_function;
@ -84,7 +85,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
/// Combinator will check if nested_function was created.
if (name == "count" || std::none_of(argument_types.begin(), argument_types.end(),
[](const auto & type) { return type->onlyNull(); }))
nested_function = getImpl(name, nested_types, parameters, recursion_level);
nested_function = getImpl(name, nested_types, nested_parameters, recursion_level);
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
}
@ -126,7 +127,10 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
String nested_name = name.substr(0, name.size() - combinator->getName().size());
DataTypes nested_types = combinator->transformArguments(argument_types);
AggregateFunctionPtr nested_function = get(nested_name, nested_types, parameters, recursion_level + 1);
Array nested_parameters = combinator->transformParameters(parameters);
AggregateFunctionPtr nested_function = get(nested_name, nested_types, nested_parameters, recursion_level + 1);
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
}

View File

@ -0,0 +1,99 @@
#include <AggregateFunctions/AggregateFunctionResample.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
class AggregateFunctionCombinatorResample final : public IAggregateFunctionCombinator
{
public:
String getName() const override
{
return "Resample";
}
DataTypes transformArguments(const DataTypes & arguments) const override
{
if (arguments.empty())
throw Exception("Incorrect number of arguments for aggregate function with "
+ getName() + " suffix",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return DataTypes(arguments.begin(), arguments.end() - 1);
}
Array transformParameters(const Array & params) const override
{
if (params.size() < 3)
throw Exception("Incorrect number of parameters for aggregate function with "
+ getName() + " suffix",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return Array(params.begin(), params.end() - 3);
}
AggregateFunctionPtr transformAggregateFunction(
const AggregateFunctionPtr & nested_function,
const DataTypes & arguments,
const Array & params) const override
{
WhichDataType which{arguments.back()};
if (which.isNativeUInt() || which.isDateOrDateTime())
{
UInt64 begin = params[params.size() - 3].safeGet<UInt64>();
UInt64 end = params[params.size() - 2].safeGet<UInt64>();
UInt64 step = params[params.size() - 1].safeGet<UInt64>();
return std::make_shared<AggregateFunctionResample<UInt64>>(
nested_function,
begin,
end,
step,
arguments,
params);
}
if (which.isNativeInt() || which.isEnum() || which.isInterval())
{
Int64 begin, end;
// notice: UInt64 -> Int64 may lead to overflow
if (!params[params.size() - 3].tryGet<Int64>(begin))
begin = params[params.size() - 3].safeGet<UInt64>();
if (!params[params.size() - 2].tryGet<Int64>(end))
end = params[params.size() - 2].safeGet<UInt64>();
UInt64 step = params[params.size() - 1].safeGet<UInt64>();
return std::make_shared<AggregateFunctionResample<Int64>>(
nested_function,
begin,
end,
step,
arguments,
params);
}
throw Exception(
"Illegal types of argument for aggregate function " + getName()
+ ", the type of the last argument should be native integer or integer-like",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
};
void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory & factory)
{
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorResample>());
}
}

View File

@ -0,0 +1,182 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypeArray.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
template <typename Key>
class AggregateFunctionResample final : public IAggregateFunctionHelper<AggregateFunctionResample<Key>>
{
private:
const size_t MAX_ELEMENTS = 4096;
AggregateFunctionPtr nested_function;
size_t last_col;
Key begin;
Key end;
size_t step;
size_t total;
size_t aod;
size_t sod;
public:
AggregateFunctionResample(
AggregateFunctionPtr nested_function,
Key begin,
Key end,
size_t step,
const DataTypes & arguments,
const Array & params)
: IAggregateFunctionHelper<AggregateFunctionResample<Key>>{arguments, params}
, nested_function{nested_function}
, last_col{arguments.size() - 1}
, begin{begin}
, end{end}
, step{step}
, total{0}
, aod{nested_function->alignOfData()}
, sod{(nested_function->sizeOfData() + aod - 1) / aod * aod}
{
// notice: argument types has been checked before
if (step == 0)
throw Exception("The step given in function "
+ getName() + " should not be zero",
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (end < begin)
total = 0;
else
total = (end - begin + step - 1) / step;
if (total > MAX_ELEMENTS)
throw Exception("The range given in function "
+ getName() + " contains too many elements",
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
}
String getName() const override
{
return nested_function->getName() + "Resample";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
bool isState() const override
{
return nested_function->isState();
}
bool allocatesMemoryInArena() const override
{
return nested_function->allocatesMemoryInArena();
}
bool hasTrivialDestructor() const override
{
return nested_function->hasTrivialDestructor();
}
size_t sizeOfData() const override
{
return total * sod;
}
size_t alignOfData() const override
{
return aod;
}
void create(AggregateDataPtr place) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->create(place + i * sod);
}
void destroy(AggregateDataPtr place) const noexcept override
{
for (size_t i = 0; i < total; ++i)
nested_function->destroy(place + i * sod);
}
void add(
AggregateDataPtr place,
const IColumn ** columns,
size_t row_num,
Arena * arena) const override
{
Key key;
if constexpr (static_cast<Key>(-1) < 0)
key = columns[last_col]->getInt(row_num);
else
key = columns[last_col]->getUInt(row_num);
if (key < begin || key >= end)
return;
size_t pos = (key - begin) / step;
nested_function->add(place + pos * sod, columns, row_num, arena);
}
void merge(
AggregateDataPtr place,
ConstAggregateDataPtr rhs,
Arena * arena) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->merge(place + i * sod, rhs + i * sod, arena);
}
void serialize(
ConstAggregateDataPtr place,
WriteBuffer & buf) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->serialize(place + i * sod, buf);
}
void deserialize(
AggregateDataPtr place,
ReadBuffer & buf,
Arena * arena) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->deserialize(place + i * sod, buf, arena);
}
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeArray>(nested_function->getReturnType());
}
void insertResultInto(
ConstAggregateDataPtr place,
IColumn & to) const override
{
auto & col = static_cast<ColumnArray &>(to);
auto & col_offsets = static_cast<ColumnArray::ColumnOffsets &>(col.getOffsetsColumn());
for (size_t i = 0; i < total; ++i)
nested_function->insertResultInto(place + i * sod, col.getData());
col_offsets.getData().push_back(col.getData().size());
}
};
}

View File

@ -38,7 +38,19 @@ public:
* get the arguments for nested function (ex: UInt64 for sum).
* If arguments are not suitable for combined function, throw an exception.
*/
virtual DataTypes transformArguments(const DataTypes & arguments) const = 0;
virtual DataTypes transformArguments(const DataTypes & arguments) const
{
return arguments;
}
/** From the parameters for combined function,
* get the parameters for nested function.
* If arguments are not suitable for combined function, throw an exception.
*/
virtual Array transformParameters(const Array & parameters) const
{
return parameters;
}
/** Create combined aggregate function (ex: sumIf)
* from nested function (ex: sum)

View File

@ -39,6 +39,7 @@ void registerAggregateFunctionCombinatorForEach(AggregateFunctionCombinatorFacto
void registerAggregateFunctionCombinatorState(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionHistogram(AggregateFunctionFactory & factory);
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory);
@ -87,6 +88,7 @@ void registerAggregateFunctions()
registerAggregateFunctionCombinatorState(factory);
registerAggregateFunctionCombinatorMerge(factory);
registerAggregateFunctionCombinatorNull(factory);
registerAggregateFunctionCombinatorResample(factory);
}
}

View File

@ -0,0 +1,16 @@
[11,12,13,14,15,16]
[27,31,17]
[39,48,18]
[39,48,18]
[0,0,0,0,0,0]
[0.5,0.5,0]
[0.816496580927726,0.816496580927726,0]
[0.816496580927726,0.816496580927726,0]
[[11],[12],[13],[14],[15],[16]]
[[13,14],[15,16],[17]]
[[12,13,14],[15,16,17],[18]]
[[12,13,14],[15,16,17],[18]]
[1,1,1,1,1,1]
[2,2,1]
[3,3,1]
[3,3,1]

View File

@ -0,0 +1,16 @@
select arrayReduce('sumResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('sumResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('sumResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('sumResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('stddevPopResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('stddevPopResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('stddevPopResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('stddevPopResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('groupArrayResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('groupArrayResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('groupArrayResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('groupArrayResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('uniqResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('uniqResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('uniqResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
select arrayReduce('uniqResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);