mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-01 12:01:58 +00:00
Merge branch 'patch-6' of https://github.com/hczhcz/ClickHouse into hczhcz-patch-6
This commit is contained in:
commit
88b9bc6254
@ -77,6 +77,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
|
|||||||
throw Exception("Logical error: cannot find aggregate function combinator to apply a function to Nullable arguments.", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Logical error: cannot find aggregate function combinator to apply a function to Nullable arguments.", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
DataTypes nested_types = combinator->transformArguments(type_without_low_cardinality);
|
DataTypes nested_types = combinator->transformArguments(type_without_low_cardinality);
|
||||||
|
Array nested_parameters = combinator->transformParameters(parameters);
|
||||||
|
|
||||||
AggregateFunctionPtr nested_function;
|
AggregateFunctionPtr nested_function;
|
||||||
|
|
||||||
@ -84,7 +85,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
|
|||||||
/// Combinator will check if nested_function was created.
|
/// Combinator will check if nested_function was created.
|
||||||
if (name == "count" || std::none_of(argument_types.begin(), argument_types.end(),
|
if (name == "count" || std::none_of(argument_types.begin(), argument_types.end(),
|
||||||
[](const auto & type) { return type->onlyNull(); }))
|
[](const auto & type) { return type->onlyNull(); }))
|
||||||
nested_function = getImpl(name, nested_types, parameters, recursion_level);
|
nested_function = getImpl(name, nested_types, nested_parameters, recursion_level);
|
||||||
|
|
||||||
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
|
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
|
||||||
}
|
}
|
||||||
@ -126,7 +127,10 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
|
|||||||
|
|
||||||
String nested_name = name.substr(0, name.size() - combinator->getName().size());
|
String nested_name = name.substr(0, name.size() - combinator->getName().size());
|
||||||
DataTypes nested_types = combinator->transformArguments(argument_types);
|
DataTypes nested_types = combinator->transformArguments(argument_types);
|
||||||
AggregateFunctionPtr nested_function = get(nested_name, nested_types, parameters, recursion_level + 1);
|
Array nested_parameters = combinator->transformParameters(parameters);
|
||||||
|
|
||||||
|
AggregateFunctionPtr nested_function = get(nested_name, nested_types, nested_parameters, recursion_level + 1);
|
||||||
|
|
||||||
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
|
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
107
dbms/src/AggregateFunctions/AggregateFunctionResample.cpp
Normal file
107
dbms/src/AggregateFunctions/AggregateFunctionResample.cpp
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
#include <AggregateFunctions/AggregateFunctionResample.h>
|
||||||
|
|
||||||
|
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
class AggregateFunctionCombinatorResample final : public
|
||||||
|
IAggregateFunctionCombinator
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
String getName() const override
|
||||||
|
{
|
||||||
|
return "Resample";
|
||||||
|
}
|
||||||
|
|
||||||
|
DataTypes transformArguments(const DataTypes & arguments) const override
|
||||||
|
{
|
||||||
|
if (arguments.empty())
|
||||||
|
throw Exception(
|
||||||
|
"Incorrect number of arguments for aggregate function with "
|
||||||
|
+ getName() + " suffix",
|
||||||
|
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
|
||||||
|
);
|
||||||
|
|
||||||
|
return DataTypes(arguments.begin(), arguments.end() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
Array transformParameters(const Array & params) const override
|
||||||
|
{
|
||||||
|
if (params.size() < 3)
|
||||||
|
throw Exception(
|
||||||
|
"Incorrect number of parameters for aggregate function with "
|
||||||
|
+ getName() + " suffix",
|
||||||
|
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH
|
||||||
|
);
|
||||||
|
|
||||||
|
return Array(params.begin(), params.end() - 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
AggregateFunctionPtr transformAggregateFunction(
|
||||||
|
const AggregateFunctionPtr & nested_function,
|
||||||
|
const DataTypes & arguments,
|
||||||
|
const Array & params
|
||||||
|
) const override
|
||||||
|
{
|
||||||
|
for (const Field & param : params)
|
||||||
|
{
|
||||||
|
if (
|
||||||
|
param.getType() != Field::Types::UInt64
|
||||||
|
&& param.getType() != Field::Types::Int64
|
||||||
|
)
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
WhichDataType which {
|
||||||
|
arguments.back()
|
||||||
|
};
|
||||||
|
|
||||||
|
if (
|
||||||
|
which.isNativeUInt()
|
||||||
|
|| which.isDateOrDateTime()
|
||||||
|
)
|
||||||
|
return std::make_shared<AggregateFunctionResample<UInt64>>(
|
||||||
|
nested_function,
|
||||||
|
params.front().get<UInt64>(),
|
||||||
|
params[1].get<UInt64>(),
|
||||||
|
params.back().get<UInt64>(),
|
||||||
|
arguments,
|
||||||
|
params
|
||||||
|
);
|
||||||
|
|
||||||
|
if (
|
||||||
|
which.isNativeInt()
|
||||||
|
|| which.isEnum()
|
||||||
|
|| which.isInterval()
|
||||||
|
)
|
||||||
|
return std::make_shared<AggregateFunctionResample<Int64>>(
|
||||||
|
nested_function,
|
||||||
|
params.front().get<Int64>(),
|
||||||
|
params[1].get<Int64>(),
|
||||||
|
params.back().get<Int64>(),
|
||||||
|
arguments,
|
||||||
|
params
|
||||||
|
);
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
void registerAggregateFunctionCombinatorResample(
|
||||||
|
AggregateFunctionCombinatorFactory & factory
|
||||||
|
)
|
||||||
|
{
|
||||||
|
factory.registerCombinator(
|
||||||
|
std::make_shared<AggregateFunctionCombinatorResample>()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
208
dbms/src/AggregateFunctions/AggregateFunctionResample.h
Normal file
208
dbms/src/AggregateFunctions/AggregateFunctionResample.h
Normal file
@ -0,0 +1,208 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <AggregateFunctions/IAggregateFunction.h>
|
||||||
|
#include <Columns/ColumnArray.h>
|
||||||
|
#include <DataTypes/DataTypeArray.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Key>
|
||||||
|
class AggregateFunctionResample final : public IAggregateFunctionHelper<
|
||||||
|
AggregateFunctionResample<Key>
|
||||||
|
>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
const size_t MAX_ELEMENTS = 4096;
|
||||||
|
|
||||||
|
AggregateFunctionPtr nested_function;
|
||||||
|
|
||||||
|
size_t last_col;
|
||||||
|
|
||||||
|
Key begin;
|
||||||
|
Key end;
|
||||||
|
Key step;
|
||||||
|
|
||||||
|
size_t total;
|
||||||
|
size_t aod;
|
||||||
|
size_t sod;
|
||||||
|
|
||||||
|
public:
|
||||||
|
AggregateFunctionResample(
|
||||||
|
AggregateFunctionPtr nested_function,
|
||||||
|
Key begin,
|
||||||
|
Key end,
|
||||||
|
Key step,
|
||||||
|
const DataTypes & arguments,
|
||||||
|
const Array & params
|
||||||
|
) :
|
||||||
|
IAggregateFunctionHelper<
|
||||||
|
AggregateFunctionResample<Key>
|
||||||
|
> {arguments, params},
|
||||||
|
nested_function {nested_function},
|
||||||
|
last_col {arguments.size() - 1},
|
||||||
|
begin {begin},
|
||||||
|
end {end},
|
||||||
|
step {step},
|
||||||
|
total {
|
||||||
|
static_cast<size_t>(
|
||||||
|
(end - begin + step - (step >= 0 ? 1 : -1)) / step
|
||||||
|
)
|
||||||
|
},
|
||||||
|
aod {nested_function->alignOfData()},
|
||||||
|
sod {(nested_function->sizeOfData() + aod - 1) / aod * aod}
|
||||||
|
{
|
||||||
|
// notice: argument types has been checked before
|
||||||
|
if (step == 0)
|
||||||
|
throw Exception(
|
||||||
|
"The step given in function "
|
||||||
|
+ getName() + " should not be zero",
|
||||||
|
ErrorCodes::BAD_ARGUMENTS
|
||||||
|
);
|
||||||
|
|
||||||
|
if (total > MAX_ELEMENTS)
|
||||||
|
throw Exception(
|
||||||
|
"The range given in function "
|
||||||
|
+ getName() + " contains too many elements",
|
||||||
|
ErrorCodes::BAD_ARGUMENTS
|
||||||
|
);
|
||||||
|
|
||||||
|
if ((step > 0 && end < begin) || (step < 0 && end > begin))
|
||||||
|
{
|
||||||
|
end = begin;
|
||||||
|
total = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String getName() const override
|
||||||
|
{
|
||||||
|
return nested_function->getName() + "Resample";
|
||||||
|
}
|
||||||
|
|
||||||
|
const char * getHeaderFilePath() const override
|
||||||
|
{
|
||||||
|
return __FILE__;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isState() const override
|
||||||
|
{
|
||||||
|
return nested_function->isState();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool allocatesMemoryInArena() const override
|
||||||
|
{
|
||||||
|
return nested_function->allocatesMemoryInArena();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool hasTrivialDestructor() const override
|
||||||
|
{
|
||||||
|
return nested_function->hasTrivialDestructor();
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t sizeOfData() const override
|
||||||
|
{
|
||||||
|
return total * sod;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t alignOfData() const override
|
||||||
|
{
|
||||||
|
return aod;
|
||||||
|
}
|
||||||
|
|
||||||
|
void create(AggregateDataPtr place) const override
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < total; ++i)
|
||||||
|
nested_function->create(place + i * sod);
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroy(AggregateDataPtr place) const noexcept override
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < total; ++i)
|
||||||
|
nested_function->destroy(place + i * sod);
|
||||||
|
}
|
||||||
|
|
||||||
|
void add(
|
||||||
|
AggregateDataPtr place,
|
||||||
|
const IColumn ** columns,
|
||||||
|
size_t row_num,
|
||||||
|
Arena * arena
|
||||||
|
) const override
|
||||||
|
{
|
||||||
|
Key key;
|
||||||
|
|
||||||
|
if constexpr (static_cast<Key>(-1) < 0)
|
||||||
|
key = columns[last_col]->getInt(row_num);
|
||||||
|
else
|
||||||
|
key = columns[last_col]->getUInt(row_num);
|
||||||
|
|
||||||
|
if (step > 0 && (key < begin || key >= end))
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (step < 0 && (key > begin || key <= end))
|
||||||
|
return;
|
||||||
|
|
||||||
|
size_t pos = (key - begin) / step;
|
||||||
|
|
||||||
|
nested_function->add(place + pos * sod, columns, row_num, arena);
|
||||||
|
}
|
||||||
|
|
||||||
|
void merge(
|
||||||
|
AggregateDataPtr place,
|
||||||
|
ConstAggregateDataPtr rhs,
|
||||||
|
Arena * arena
|
||||||
|
) const override
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < total; ++i)
|
||||||
|
nested_function->merge(place + i * sod, rhs + i * sod, arena);
|
||||||
|
}
|
||||||
|
|
||||||
|
void serialize(
|
||||||
|
ConstAggregateDataPtr place,
|
||||||
|
WriteBuffer & buf
|
||||||
|
) const override
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < total; ++i)
|
||||||
|
nested_function->serialize(place + i * sod, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void deserialize(
|
||||||
|
AggregateDataPtr place,
|
||||||
|
ReadBuffer & buf,
|
||||||
|
Arena * arena
|
||||||
|
) const override
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < total; ++i)
|
||||||
|
nested_function->deserialize(place + i * sod, buf, arena);
|
||||||
|
}
|
||||||
|
|
||||||
|
DataTypePtr getReturnType() const override
|
||||||
|
{
|
||||||
|
return std::make_shared<DataTypeArray>(
|
||||||
|
nested_function->getReturnType()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
void insertResultInto(
|
||||||
|
ConstAggregateDataPtr place,
|
||||||
|
IColumn & to
|
||||||
|
) const override
|
||||||
|
{
|
||||||
|
auto & col = static_cast<ColumnArray &>(to);
|
||||||
|
auto & col_offsets = static_cast<ColumnArray::ColumnOffsets &>(
|
||||||
|
col.getOffsetsColumn()
|
||||||
|
);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < total; ++i)
|
||||||
|
nested_function->insertResultInto(place + i * sod, col.getData());
|
||||||
|
|
||||||
|
col_offsets.getData().push_back(col.getData().size());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
@ -38,7 +38,19 @@ public:
|
|||||||
* get the arguments for nested function (ex: UInt64 for sum).
|
* get the arguments for nested function (ex: UInt64 for sum).
|
||||||
* If arguments are not suitable for combined function, throw an exception.
|
* If arguments are not suitable for combined function, throw an exception.
|
||||||
*/
|
*/
|
||||||
virtual DataTypes transformArguments(const DataTypes & arguments) const = 0;
|
virtual DataTypes transformArguments(const DataTypes & arguments) const
|
||||||
|
{
|
||||||
|
return arguments;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** From the parameters for combined function,
|
||||||
|
* get the parameters for nested function.
|
||||||
|
* If arguments are not suitable for combined function, throw an exception.
|
||||||
|
*/
|
||||||
|
virtual Array transformParameters(const Array & parameters) const
|
||||||
|
{
|
||||||
|
return parameters;
|
||||||
|
}
|
||||||
|
|
||||||
/** Create combined aggregate function (ex: sumIf)
|
/** Create combined aggregate function (ex: sumIf)
|
||||||
* from nested function (ex: sum)
|
* from nested function (ex: sum)
|
||||||
|
@ -38,6 +38,7 @@ void registerAggregateFunctionCombinatorForEach(AggregateFunctionCombinatorFacto
|
|||||||
void registerAggregateFunctionCombinatorState(AggregateFunctionCombinatorFactory &);
|
void registerAggregateFunctionCombinatorState(AggregateFunctionCombinatorFactory &);
|
||||||
void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory &);
|
void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory &);
|
||||||
void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &);
|
void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &);
|
||||||
|
void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory &);
|
||||||
|
|
||||||
void registerAggregateFunctionHistogram(AggregateFunctionFactory & factory);
|
void registerAggregateFunctionHistogram(AggregateFunctionFactory & factory);
|
||||||
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory);
|
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory);
|
||||||
@ -85,6 +86,7 @@ void registerAggregateFunctions()
|
|||||||
registerAggregateFunctionCombinatorState(factory);
|
registerAggregateFunctionCombinatorState(factory);
|
||||||
registerAggregateFunctionCombinatorMerge(factory);
|
registerAggregateFunctionCombinatorMerge(factory);
|
||||||
registerAggregateFunctionCombinatorNull(factory);
|
registerAggregateFunctionCombinatorNull(factory);
|
||||||
|
registerAggregateFunctionCombinatorResample(factory);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,16 @@
|
|||||||
|
[11,12,13,14,15,16]
|
||||||
|
[27,31,17]
|
||||||
|
[39,48,18]
|
||||||
|
[19,35,31,27,23,10]
|
||||||
|
[0,0,0,0,0,0]
|
||||||
|
[0.5,0.5,0]
|
||||||
|
[0.816496580927726,0.816496580927726,0]
|
||||||
|
[0,0.5,0.5,0.5,0.5,0]
|
||||||
|
[[11],[12],[13],[14],[15],[16]]
|
||||||
|
[[13,14],[15,16],[17]]
|
||||||
|
[[12,13,14],[15,16,17],[18]]
|
||||||
|
[[19],[17,18],[15,16],[13,14],[11,12],[10]]
|
||||||
|
[1,1,1,1,1,1]
|
||||||
|
[2,2,1]
|
||||||
|
[3,3,1]
|
||||||
|
[1,2,2,2,2,1]
|
16
dbms/tests/queries/0_stateless/00954_resample_combinator.sql
Normal file
16
dbms/tests/queries/0_stateless/00954_resample_combinator.sql
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
select arrayReduce('sumResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('sumResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('sumResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('sumResample(10, -1, -2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('stddevPopResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('stddevPopResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('stddevPopResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('stddevPopResample(10, -1, -2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('groupArrayResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('groupArrayResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('groupArrayResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('groupArrayResample(10, -1, -2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('uniqResample(1, 7, 1)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('uniqResample(3, 8, 2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('uniqResample(2, 9, 3)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||||
|
select arrayReduce('uniqResample(10, -1, -2)', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [-0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
Loading…
Reference in New Issue
Block a user