mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Merge pull request #9598 from hczhcz/patch-0310
Add function arrayReduceInRanges
This commit is contained in:
commit
719f060d6b
@ -5,12 +5,10 @@
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <AggregateFunctions/AggregateFunctionState.h>
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
|
||||
#include <Common/AlignedBuffer.h>
|
||||
#include <Common/Arena.h>
|
||||
|
||||
#include <ext/scope_guard.h>
|
||||
@ -108,7 +106,7 @@ DataTypePtr FunctionArrayReduce::getReturnTypeImpl(const ColumnsWithTypeAndName
|
||||
|
||||
void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
|
||||
{
|
||||
IAggregateFunction & agg_func = *aggregate_function.get();
|
||||
IAggregateFunction & agg_func = *aggregate_function;
|
||||
std::unique_ptr<Arena> arena = std::make_unique<Arena>();
|
||||
|
||||
/// Aggregate functions do not support constant columns. Therefore, we materialize them.
|
||||
@ -132,7 +130,7 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum
|
||||
else if (const ColumnConst * const_arr = checkAndGetColumnConst<ColumnArray>(col))
|
||||
{
|
||||
materialized_columns.emplace_back(const_arr->convertToFullColumn());
|
||||
const auto & materialized_arr = typeid_cast<const ColumnArray &>(*materialized_columns.back().get());
|
||||
const auto & materialized_arr = typeid_cast<const ColumnArray &>(*materialized_columns.back());
|
||||
aggregate_arguments_vec[i] = &materialized_arr.getData();
|
||||
offsets_i = &materialized_arr.getOffsets();
|
||||
}
|
||||
|
394
dbms/src/Functions/array/arrayReduceInRanges.cpp
Normal file
394
dbms/src/Functions/array/arrayReduceInRanges.cpp
Normal file
@ -0,0 +1,394 @@
|
||||
#include <Functions/IFunctionImpl.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/ColumnAggregateFunction.h>
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <AggregateFunctions/AggregateFunctionState.h>
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
|
||||
#include <Common/Arena.h>
|
||||
|
||||
#include <ext/scope_guard.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
|
||||
/** Applies an aggregate function to value ranges in the array.
|
||||
* The function does what arrayReduce do on a structure similar to segment tree.
|
||||
* Space complexity: n * log(n)
|
||||
*
|
||||
* arrayReduceInRanges('agg', indices, lengths, arr1, ...)
|
||||
*/
|
||||
class FunctionArrayReduceInRanges : public IFunction
|
||||
{
|
||||
public:
|
||||
static const size_t minimum_step = 64;
|
||||
static constexpr auto name = "arrayReduceInRanges";
|
||||
static FunctionPtr create(const Context &) { return std::make_shared<FunctionArrayReduceInRanges>(); }
|
||||
|
||||
String getName() const override { return name; }
|
||||
|
||||
bool isVariadic() const override { return true; }
|
||||
size_t getNumberOfArguments() const override { return 0; }
|
||||
|
||||
bool useDefaultImplementationForConstants() const override { return true; }
|
||||
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override;
|
||||
|
||||
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override;
|
||||
|
||||
private:
|
||||
/// lazy initialization in getReturnTypeImpl
|
||||
/// TODO: init in OverloadResolver
|
||||
mutable AggregateFunctionPtr aggregate_function;
|
||||
};
|
||||
|
||||
|
||||
DataTypePtr FunctionArrayReduceInRanges::getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const
|
||||
{
|
||||
/// The first argument is a constant string with the name of the aggregate function
|
||||
/// (possibly with parameters in parentheses, for example: "quantile(0.99)").
|
||||
|
||||
if (arguments.size() < 3)
|
||||
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
|
||||
+ toString(arguments.size()) + ", should be at least 3.",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
const ColumnConst * aggregate_function_name_column = checkAndGetColumnConst<ColumnString>(arguments[0].column.get());
|
||||
if (!aggregate_function_name_column)
|
||||
throw Exception("First argument for function " + getName() + " must be constant string: name of aggregate function.",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
const DataTypeArray * ranges_type_array = checkAndGetDataType<DataTypeArray>(arguments[1].type.get());
|
||||
if (!ranges_type_array)
|
||||
throw Exception("Second argument for function " + getName() + " must be an array of ranges.",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
const DataTypeTuple * ranges_type_tuple = checkAndGetDataType<DataTypeTuple>(ranges_type_array->getNestedType().get());
|
||||
if (!ranges_type_tuple || ranges_type_tuple->getElements().size() != 2)
|
||||
throw Exception("Each array element in the second argument for function " + getName() + " must be a tuple (index, length).",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
if (!isNativeInteger(ranges_type_tuple->getElements()[0]))
|
||||
throw Exception("First tuple member in the second argument for function " + getName() + " must be ints or uints.",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
if (!WhichDataType(ranges_type_tuple->getElements()[1]).isNativeUInt())
|
||||
throw Exception("Second tuple member in the second argument for function " + getName() + " must be uints.",
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
DataTypes argument_types(arguments.size() - 2);
|
||||
for (size_t i = 2, size = arguments.size(); i < size; ++i)
|
||||
{
|
||||
const DataTypeArray * arg = checkAndGetDataType<DataTypeArray>(arguments[i].type.get());
|
||||
if (!arg)
|
||||
throw Exception("Argument " + toString(i) + " for function " + getName() + " must be an array but it has type "
|
||||
+ arguments[i].type->getName() + ".", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
|
||||
argument_types[i - 2] = arg->getNestedType();
|
||||
}
|
||||
|
||||
if (!aggregate_function)
|
||||
{
|
||||
String aggregate_function_name_with_params = aggregate_function_name_column->getValue<String>();
|
||||
|
||||
if (aggregate_function_name_with_params.empty())
|
||||
throw Exception("First argument for function " + getName() + " (name of aggregate function) cannot be empty.",
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
String aggregate_function_name;
|
||||
Array params_row;
|
||||
getAggregateFunctionNameAndParametersArray(aggregate_function_name_with_params,
|
||||
aggregate_function_name, params_row, "function " + getName());
|
||||
|
||||
aggregate_function = AggregateFunctionFactory::instance().get(aggregate_function_name, argument_types, params_row);
|
||||
}
|
||||
|
||||
return std::make_shared<DataTypeArray>(aggregate_function->getReturnType());
|
||||
}
|
||||
|
||||
|
||||
void FunctionArrayReduceInRanges::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
|
||||
{
|
||||
IAggregateFunction & agg_func = *aggregate_function;
|
||||
std::unique_ptr<Arena> arena = std::make_unique<Arena>();
|
||||
|
||||
/// Aggregate functions do not support constant columns. Therefore, we materialize them.
|
||||
std::vector<ColumnPtr> materialized_columns;
|
||||
|
||||
/// Handling ranges
|
||||
|
||||
const IColumn * ranges_col_array = block.getByPosition(arguments[1]).column.get();
|
||||
const IColumn * ranges_col_tuple = nullptr;
|
||||
const ColumnArray::Offsets * ranges_offsets = nullptr;
|
||||
if (const ColumnArray * arr = checkAndGetColumn<ColumnArray>(ranges_col_array))
|
||||
{
|
||||
ranges_col_tuple = &arr->getData();
|
||||
ranges_offsets = &arr->getOffsets();
|
||||
}
|
||||
else if (const ColumnConst * const_arr = checkAndGetColumnConst<ColumnArray>(ranges_col_array))
|
||||
{
|
||||
materialized_columns.emplace_back(const_arr->convertToFullColumn());
|
||||
const auto & materialized_arr = typeid_cast<const ColumnArray &>(*materialized_columns.back());
|
||||
ranges_col_tuple = &materialized_arr.getData();
|
||||
ranges_offsets = &materialized_arr.getOffsets();
|
||||
}
|
||||
else
|
||||
throw Exception("Illegal column " + ranges_col_array->getName() + " as argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
const IColumn & indices_col = static_cast<const ColumnTuple *>(ranges_col_tuple)->getColumn(0);
|
||||
const IColumn & lengths_col = static_cast<const ColumnTuple *>(ranges_col_tuple)->getColumn(1);
|
||||
|
||||
/// Handling arguments
|
||||
/// The code is mostly copied from `arrayReduce`. Maybe create a utility header?
|
||||
|
||||
const size_t num_arguments_columns = arguments.size() - 2;
|
||||
|
||||
std::vector<const IColumn *> aggregate_arguments_vec(num_arguments_columns);
|
||||
const ColumnArray::Offsets * offsets = nullptr;
|
||||
|
||||
for (size_t i = 0; i < num_arguments_columns; ++i)
|
||||
{
|
||||
const IColumn * col = block.getByPosition(arguments[i + 2]).column.get();
|
||||
|
||||
const ColumnArray::Offsets * offsets_i = nullptr;
|
||||
if (const ColumnArray * arr = checkAndGetColumn<ColumnArray>(col))
|
||||
{
|
||||
aggregate_arguments_vec[i] = &arr->getData();
|
||||
offsets_i = &arr->getOffsets();
|
||||
}
|
||||
else if (const ColumnConst * const_arr = checkAndGetColumnConst<ColumnArray>(col))
|
||||
{
|
||||
materialized_columns.emplace_back(const_arr->convertToFullColumn());
|
||||
const auto & materialized_arr = typeid_cast<const ColumnArray &>(*materialized_columns.back());
|
||||
aggregate_arguments_vec[i] = &materialized_arr.getData();
|
||||
offsets_i = &materialized_arr.getOffsets();
|
||||
}
|
||||
else
|
||||
throw Exception("Illegal column " + col->getName() + " as argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
if (i == 0)
|
||||
offsets = offsets_i;
|
||||
else if (*offsets_i != *offsets)
|
||||
throw Exception("Lengths of all arrays passed to " + getName() + " must be equal.",
|
||||
ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
|
||||
}
|
||||
const IColumn ** aggregate_arguments = aggregate_arguments_vec.data();
|
||||
|
||||
/// Handling results
|
||||
|
||||
MutableColumnPtr result_holder = block.getByPosition(result).type->createColumn();
|
||||
ColumnArray * result_arr = static_cast<ColumnArray *>(result_holder.get());
|
||||
IColumn & result_data = result_arr->getData();
|
||||
|
||||
result_arr->getOffsets().insert(ranges_offsets->begin(), ranges_offsets->end());
|
||||
|
||||
/// AggregateFunction's states should be inserted into column using specific way
|
||||
auto res_col_aggregate_function = typeid_cast<ColumnAggregateFunction *>(&result_data);
|
||||
|
||||
if (!res_col_aggregate_function && agg_func.isState())
|
||||
throw Exception("State function " + agg_func.getName() + " inserts results into non-state column "
|
||||
+ block.getByPosition(result).type->getName(), ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
/// Perform the aggregation
|
||||
|
||||
size_t begin = 0;
|
||||
size_t end = 0;
|
||||
size_t ranges_begin = 0;
|
||||
size_t ranges_end = 0;
|
||||
|
||||
for (size_t i = 0; i < input_rows_count; ++i)
|
||||
{
|
||||
begin = end;
|
||||
end = (*offsets)[i];
|
||||
ranges_begin = ranges_end;
|
||||
ranges_end = (*ranges_offsets)[i];
|
||||
|
||||
/// We will allocate pre-aggregation places for each `minimum_place << level` rows.
|
||||
/// The value of `level` starts from 0, and it will never exceed the number of bits in a `size_t`.
|
||||
/// We calculate the offset (and thus size) of those places in each level.
|
||||
size_t place_offsets[sizeof(size_t) * 8];
|
||||
size_t place_total = 0;
|
||||
{
|
||||
size_t place_in_level = (end - begin) / minimum_step;
|
||||
|
||||
place_offsets[0] = place_in_level;
|
||||
for (size_t level = 0; place_in_level; ++level)
|
||||
{
|
||||
place_in_level >>= 1;
|
||||
place_total = place_offsets[level] + place_in_level;
|
||||
place_offsets[level + 1] = place_total;
|
||||
}
|
||||
}
|
||||
|
||||
PODArray<AggregateDataPtr> places(place_total);
|
||||
for (size_t j = 0; j < place_total; ++j)
|
||||
{
|
||||
places[j] = arena->alignedAlloc(agg_func.sizeOfData(), agg_func.alignOfData());
|
||||
try
|
||||
{
|
||||
agg_func.create(places[j]);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
for (size_t k = 0; k < j; ++k)
|
||||
agg_func.destroy(places[k]);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
SCOPE_EXIT({
|
||||
for (size_t j = 0; j < place_total; ++j)
|
||||
agg_func.destroy(places[j]);
|
||||
});
|
||||
|
||||
auto true_func = &agg_func;
|
||||
/// Unnest consecutive trailing -State combinators
|
||||
while (auto func = typeid_cast<AggregateFunctionState *>(true_func))
|
||||
true_func = func->getNestedFunction().get();
|
||||
|
||||
/// Pre-aggregate to the initial level
|
||||
for (size_t j = 0; j < place_offsets[0]; ++j)
|
||||
{
|
||||
size_t local_begin = j * minimum_step;
|
||||
size_t local_end = (j + 1) * minimum_step;
|
||||
|
||||
for (size_t k = local_begin; k < local_end; ++k)
|
||||
true_func->add(places[j], aggregate_arguments, begin + k, arena.get());
|
||||
}
|
||||
|
||||
/// Pre-aggregate to the higher levels by merging
|
||||
{
|
||||
size_t place_in_level = place_offsets[0] >> 1;
|
||||
size_t place_begin = 0;
|
||||
|
||||
for (size_t level = 0; place_in_level; ++level)
|
||||
{
|
||||
size_t next_place_begin = place_offsets[level];
|
||||
|
||||
for (size_t j = 0; j < place_in_level; ++j)
|
||||
{
|
||||
true_func->merge(places[next_place_begin + j], places[place_begin + (j << 1)], arena.get());
|
||||
true_func->merge(places[next_place_begin + j], places[place_begin + (j << 1) + 1], arena.get());
|
||||
}
|
||||
|
||||
place_in_level >>= 1;
|
||||
place_begin = next_place_begin;
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t j = ranges_begin; j < ranges_end; ++j)
|
||||
{
|
||||
size_t local_begin = 0;
|
||||
size_t local_end = 0;
|
||||
|
||||
{
|
||||
Int64 index = indices_col.getInt(j);
|
||||
UInt64 length = lengths_col.getUInt(j);
|
||||
|
||||
/// Keep the same as in arraySlice
|
||||
|
||||
if (index > 0)
|
||||
{
|
||||
local_begin = index - 1;
|
||||
if (local_begin + length < end - begin)
|
||||
local_end = local_begin + length;
|
||||
else
|
||||
local_end = end - begin;
|
||||
}
|
||||
else if (index < 0)
|
||||
{
|
||||
if (end - begin + index > 0)
|
||||
local_begin = end - begin + index;
|
||||
else
|
||||
local_begin = 0;
|
||||
|
||||
if (local_begin + length < end - begin)
|
||||
local_end = local_begin + length;
|
||||
else
|
||||
local_end = end - begin;
|
||||
}
|
||||
}
|
||||
|
||||
size_t place_begin = (local_begin + minimum_step - 1) / minimum_step;
|
||||
size_t place_end = local_end / minimum_step;
|
||||
|
||||
AggregateDataPtr place = arena->alignedAlloc(agg_func.sizeOfData(), agg_func.alignOfData());
|
||||
agg_func.create(place);
|
||||
|
||||
SCOPE_EXIT({
|
||||
agg_func.destroy(place);
|
||||
});
|
||||
|
||||
if (place_begin < place_end)
|
||||
{
|
||||
/// In this case, we can use pre-aggregated data.
|
||||
|
||||
/// Aggregate rows before
|
||||
for (size_t k = local_begin; k < place_begin * minimum_step; ++k)
|
||||
true_func->add(place, aggregate_arguments, begin + k, arena.get());
|
||||
|
||||
/// Aggregate using pre-aggretated data
|
||||
{
|
||||
size_t level = 0;
|
||||
size_t place_curr = place_begin;
|
||||
|
||||
while (place_curr < place_end)
|
||||
{
|
||||
while (((place_curr >> level) & 1) == 0 && place_curr + (2 << level) <= place_end)
|
||||
level += 1;
|
||||
while (place_curr + (1 << level) > place_end)
|
||||
level -= 1;
|
||||
|
||||
size_t place_offset = 0;
|
||||
if (level)
|
||||
place_offset = place_offsets[level - 1];
|
||||
|
||||
true_func->merge(place, places[place_offset + (place_curr >> level)], arena.get());
|
||||
place_curr += 1 << level;
|
||||
}
|
||||
}
|
||||
|
||||
/// Aggregate rows after
|
||||
for (size_t k = place_end * minimum_step; k < local_end; ++k)
|
||||
true_func->add(place, aggregate_arguments, begin + k, arena.get());
|
||||
}
|
||||
else
|
||||
{
|
||||
/// In this case, we can not use pre-aggregated data.
|
||||
|
||||
for (size_t k = local_begin; k < local_end; ++k)
|
||||
true_func->add(place, aggregate_arguments, begin + k, arena.get());
|
||||
}
|
||||
|
||||
if (!res_col_aggregate_function)
|
||||
agg_func.insertResultInto(place, result_data);
|
||||
else
|
||||
res_col_aggregate_function->insertFrom(place);
|
||||
}
|
||||
}
|
||||
|
||||
block.getByPosition(result).column = std::move(result_holder);
|
||||
}
|
||||
|
||||
|
||||
void registerFunctionArrayReduceInRanges(FunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<FunctionArrayReduceInRanges>();
|
||||
}
|
||||
|
||||
}
|
@ -33,6 +33,7 @@ void registerFunctionArrayFlatten(FunctionFactory &);
|
||||
void registerFunctionArrayWithConstant(FunctionFactory &);
|
||||
void registerFunctionArrayZip(FunctionFactory &);
|
||||
void registerFunctionArrayAUC(FunctionFactory &);
|
||||
void registerFunctionArrayReduceInRanges(FunctionFactory &);
|
||||
|
||||
void registerFunctionsArray(FunctionFactory & factory)
|
||||
{
|
||||
@ -53,6 +54,7 @@ void registerFunctionsArray(FunctionFactory & factory)
|
||||
registerFunctionArraySlice(factory);
|
||||
registerFunctionArrayReverse(factory);
|
||||
registerFunctionArrayReduce(factory);
|
||||
registerFunctionArrayReduceInRanges(factory);
|
||||
registerFunctionRange(factory);
|
||||
registerFunctionsEmptyArray(factory);
|
||||
registerFunctionEmptyArrayToSingle(factory);
|
||||
|
16
dbms/tests/performance/array_reduce.xml
Normal file
16
dbms/tests/performance/array_reduce.xml
Normal file
@ -0,0 +1,16 @@
|
||||
<test>
|
||||
|
||||
<stop_conditions>
|
||||
<all_of>
|
||||
<total_time_ms>10000</total_time_ms>
|
||||
</all_of>
|
||||
</stop_conditions>
|
||||
|
||||
|
||||
<query>SELECT arrayReduce('count', range(100000000))</query>
|
||||
<query>SELECT arrayReduce('sum', range(100000000))</query>
|
||||
<query>SELECT arrayReduceInRanges('count', [(1, 100000000)], range(100000000))</query>
|
||||
<query>SELECT arrayReduceInRanges('sum', [(1, 100000000)], range(100000000))</query>
|
||||
<query>SELECT arrayReduceInRanges('count', arrayZip(range(1000000), range(1000000)), range(100000000))[123456]</query>
|
||||
<query>SELECT arrayReduceInRanges('sum', arrayZip(range(1000000), range(1000000)), range(100000000))[123456]</query>
|
||||
</test>
|
@ -0,0 +1,22 @@
|
||||
[['a','b','c'],['b','c','d'],['c','d','e']]
|
||||
[0,0,0,0,0,0,0,100,300,0,200,400,0,300,700,0,500,400,0,600,700,0,900,400]
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
@ -0,0 +1,32 @@
|
||||
SELECT
|
||||
arrayReduceInRanges(
|
||||
'groupArray',
|
||||
[(1, 3), (2, 3), (3, 3)],
|
||||
['a', 'b', 'c', 'd', 'e']
|
||||
);
|
||||
|
||||
SELECT
|
||||
arrayReduceInRanges(
|
||||
'sum',
|
||||
[
|
||||
(-6, 0), (-4, 0), (-2, 0), (0, 0), (2, 0), (4, 0),
|
||||
(-6, 1), (-4, 1), (-2, 1), (0, 1), (2, 1), (4, 1),
|
||||
(-6, 2), (-4, 2), (-2, 2), (0, 2), (2, 2), (4, 2),
|
||||
(-6, 3), (-4, 3), (-2, 3), (0, 3), (2, 3), (4, 3)
|
||||
],
|
||||
[100, 200, 300, 400]
|
||||
);
|
||||
|
||||
WITH
|
||||
arrayMap(x -> x + 1, range(50)) as data
|
||||
SELECT
|
||||
arrayReduceInRanges('groupArray', [(a, c), (b, d)], data) =
|
||||
[arraySlice(data, a, c), arraySlice(data, b, d)]
|
||||
FROM (
|
||||
SELECT
|
||||
cityHash64(number + 100) % 40 as a,
|
||||
cityHash64(number + 200) % 60 as b,
|
||||
cityHash64(number + 300) % 20 as c,
|
||||
cityHash64(number + 400) % 30 as d
|
||||
FROM numbers(20)
|
||||
);
|
@ -804,17 +804,30 @@ SELECT
|
||||
└──────────────┴───────────┘
|
||||
```
|
||||
|
||||
## arrayReduce(agg\_func, arr1, …) {#array-functions-arrayreduce}
|
||||
## arrayReduce {#arrayreduce}
|
||||
|
||||
Applies an aggregate function to array elements and returns its result. The name of the aggregation function is passed as a string in single quotes `'max'`, `'sum'`. When using parametric aggregate functions, the parameter is indicated after the function name in parentheses `'uniqUpTo(6)'`.
|
||||
|
||||
Example:
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
```sql
|
||||
arrayReduce(agg_func, arr1, arr2, ..., arrN)
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
* `agg_func` — The name of an aggregate function which should be a constant [string](../../data_types/string.md).
|
||||
* `arr` — Any number of [array](../../data_types/array.md) type columns as the parameters of the aggregation function.
|
||||
|
||||
**Returned value**
|
||||
|
||||
**Example**
|
||||
|
||||
```sql
|
||||
SELECT arrayReduce('max', [1, 2, 3])
|
||||
```
|
||||
|
||||
``` text
|
||||
```text
|
||||
┌─arrayReduce('max', [1, 2, 3])─┐
|
||||
│ 3 │
|
||||
└───────────────────────────────┘
|
||||
@ -822,13 +835,11 @@ SELECT arrayReduce('max', [1, 2, 3])
|
||||
|
||||
If an aggregate function takes multiple arguments, then this function must be applied to multiple arrays of the same size.
|
||||
|
||||
Example:
|
||||
|
||||
``` sql
|
||||
```sql
|
||||
SELECT arrayReduce('maxIf', [3, 5], [1, 0])
|
||||
```
|
||||
|
||||
``` text
|
||||
```text
|
||||
┌─arrayReduce('maxIf', [3, 5], [1, 0])─┐
|
||||
│ 3 │
|
||||
└──────────────────────────────────────┘
|
||||
@ -836,17 +847,51 @@ SELECT arrayReduce('maxIf', [3, 5], [1, 0])
|
||||
|
||||
Example with a parametric aggregate function:
|
||||
|
||||
``` sql
|
||||
```sql
|
||||
SELECT arrayReduce('uniqUpTo(3)', [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
|
||||
```
|
||||
|
||||
``` text
|
||||
```text
|
||||
┌─arrayReduce('uniqUpTo(3)', [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])─┐
|
||||
│ 4 │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## arrayReverse(arr) {#array_functions-arrayreverse}
|
||||
## arrayReduceInRanges {#arrayreduceinranges}
|
||||
|
||||
Applies an aggregate function to array elements in given ranges and returns an array containing the result corresponding to each range. The function will return the same result as multiple `arrayReduce(agg_func, arraySlice(arr1, index, length), ...)`.
|
||||
|
||||
**Syntax**
|
||||
|
||||
```sql
|
||||
arrayReduceInRanges(agg_func, ranges, arr1, arr2, ..., arrN)
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
* `agg_func` — The name of an aggregate function which should be a constant [string](../../data_types/string.md).
|
||||
* `ranges` — The ranges to aggretate which should be an [array](../../data_types/array.md) of [tuples](../../data_types/tuple.md) which containing the index and the length of each range.
|
||||
* `arr` — Any number of [array](../../data_types/array.md) type columns as the parameters of the aggregation function.
|
||||
|
||||
**Returned value**
|
||||
|
||||
**Example**
|
||||
|
||||
```sql
|
||||
SELECT arrayReduceInRanges(
|
||||
'sum',
|
||||
[(1, 5), (2, 3), (3, 4), (4, 4)],
|
||||
[1000000, 200000, 30000, 4000, 500, 60, 7]
|
||||
) AS res
|
||||
```
|
||||
|
||||
```text
|
||||
┌─res─────────────────────────┐
|
||||
│ [1234500,234000,34560,4567] │
|
||||
└─────────────────────────────┘
|
||||
```
|
||||
|
||||
## arrayReverse(arr) {#arrayreverse}
|
||||
|
||||
Returns an array of the same size as the original array containing the elements in reverse order.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user