mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #34055 from palegre-tiny/groupSortedArray
Add groupSortedArray() function
This commit is contained in:
commit
f055d7b692
@ -0,0 +1,48 @@
|
||||
---
|
||||
toc_priority: 108
|
||||
---
|
||||
|
||||
# groupArraySorted {#groupArraySorted}
|
||||
|
||||
Returns an array with the first N items in ascending order.
|
||||
|
||||
``` sql
|
||||
groupArraySorted(N)(column)
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `N` – The number of elements to return.
|
||||
|
||||
If the parameter is omitted, default value 10 is used.
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `column` – The value.
|
||||
- `expr` — Optional. The field or expresion to sort by. If not set values are sorted by themselves.
|
||||
|
||||
**Example**
|
||||
|
||||
Gets the first 10 numbers:
|
||||
|
||||
``` sql
|
||||
SELECT groupArraySorted(10)(number) FROM numbers(100)
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─groupArraySorted(10)(number)─┐
|
||||
│ [0,1,2,3,4,5,6,7,8,9] │
|
||||
└──────────────────────────────┘
|
||||
```
|
||||
|
||||
Or the last 10:
|
||||
|
||||
``` sql
|
||||
SELECT groupArraySorted(10)(number, -number) FROM numbers(100)
|
||||
```
|
||||
|
||||
``` text
|
||||
┌─groupArraySorted(10)(number, negate(number))─┐
|
||||
│ [99,98,97,96,95,94,93,92,91,90] │
|
||||
└──────────────────────────────────────────────┘
|
||||
```
|
@ -35,6 +35,7 @@ ClickHouse-specific aggregate functions:
|
||||
- [groupArrayInsertAt](../../../sql-reference/aggregate-functions/reference/grouparrayinsertat.md)
|
||||
- [groupArrayMovingAvg](../../../sql-reference/aggregate-functions/reference/grouparraymovingavg.md)
|
||||
- [groupArrayMovingSum](../../../sql-reference/aggregate-functions/reference/grouparraymovingsum.md)
|
||||
- [groupArraySorted](../../../sql-reference/aggregate-functions/reference/grouparraysorted.md)
|
||||
- [groupBitAnd](../../../sql-reference/aggregate-functions/reference/groupbitand.md)
|
||||
- [groupBitOr](../../../sql-reference/aggregate-functions/reference/groupbitor.md)
|
||||
- [groupBitXor](../../../sql-reference/aggregate-functions/reference/groupbitxor.md)
|
||||
|
147
src/AggregateFunctions/AggregateFunctionGroupArraySorted.cpp
Normal file
147
src/AggregateFunctions/AggregateFunctionGroupArraySorted.cpp
Normal file
@ -0,0 +1,147 @@
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <AggregateFunctions/AggregateFunctionGroupArraySorted.h>
|
||||
#include <AggregateFunctions/FactoryHelpers.h>
|
||||
#include <AggregateFunctions/Helpers.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Common/FieldVisitorConvertToNumber.h>
|
||||
|
||||
|
||||
static inline constexpr UInt64 GROUP_SORTED_ARRAY_MAX_SIZE = 0xFFFFFF;
|
||||
static inline constexpr UInt64 GROUP_SORTED_ARRAY_DEFAULT_THRESHOLD = 10;
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
struct Settings;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
template <typename T, bool expr_sorted, typename TColumnB, bool is_plain_b>
|
||||
class AggregateFunctionGroupArraySortedNumeric : public AggregateFunctionGroupArraySorted<T, false, expr_sorted, TColumnB, is_plain_b>
|
||||
{
|
||||
using AggregateFunctionGroupArraySorted<T, false, expr_sorted, TColumnB, is_plain_b>::AggregateFunctionGroupArraySorted;
|
||||
};
|
||||
|
||||
template <typename T, bool expr_sorted, typename TColumnB, bool is_plain_b>
|
||||
class AggregateFunctionGroupArraySortedFieldType
|
||||
: public AggregateFunctionGroupArraySorted<typename T::FieldType, false, expr_sorted, TColumnB, is_plain_b>
|
||||
{
|
||||
using AggregateFunctionGroupArraySorted<typename T::FieldType, false, expr_sorted, TColumnB, is_plain_b>::
|
||||
AggregateFunctionGroupArraySorted;
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(std::make_shared<T>()); }
|
||||
};
|
||||
|
||||
template <template <typename, bool, typename, bool> class AggregateFunctionTemplate, typename TColumnA, bool expr_sorted, typename TColumnB, bool is_plain_b, typename... TArgs>
|
||||
AggregateFunctionPtr
|
||||
createAggregateFunctionGroupArraySortedTypedFinal(TArgs && ... args)
|
||||
{
|
||||
return AggregateFunctionPtr(new AggregateFunctionTemplate<TColumnA, expr_sorted, TColumnB, is_plain_b>(std::forward<TArgs>(args)...));
|
||||
}
|
||||
|
||||
template <bool expr_sorted = false, typename TColumnB = UInt64, bool is_plain_b = false>
|
||||
AggregateFunctionPtr
|
||||
createAggregateFunctionGroupArraySortedTyped(const DataTypes & argument_types, const Array & params, UInt64 threshold)
|
||||
{
|
||||
#define DISPATCH(A, C, B) \
|
||||
if (which.idx == TypeIndex::A) \
|
||||
return createAggregateFunctionGroupArraySortedTypedFinal<C, B, expr_sorted, TColumnB, is_plain_b>(threshold, argument_types, params);
|
||||
#define DISPATCH_NUMERIC(A) DISPATCH(A, AggregateFunctionGroupArraySortedNumeric, A)
|
||||
WhichDataType which(argument_types[0]);
|
||||
FOR_NUMERIC_TYPES(DISPATCH_NUMERIC)
|
||||
DISPATCH(Enum8, AggregateFunctionGroupArraySortedNumeric, Int8)
|
||||
DISPATCH(Enum16, AggregateFunctionGroupArraySortedNumeric, Int16)
|
||||
DISPATCH(Date, AggregateFunctionGroupArraySortedFieldType, DataTypeDate)
|
||||
DISPATCH(DateTime, AggregateFunctionGroupArraySortedFieldType, DataTypeDateTime)
|
||||
#undef DISPATCH
|
||||
#undef DISPATCH_NUMERIC
|
||||
|
||||
if (argument_types[0]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
|
||||
{
|
||||
return AggregateFunctionPtr(new AggregateFunctionGroupArraySorted<StringRef, true, expr_sorted, TColumnB, is_plain_b>(
|
||||
threshold, argument_types, params));
|
||||
}
|
||||
else
|
||||
{
|
||||
return AggregateFunctionPtr(new AggregateFunctionGroupArraySorted<StringRef, false, expr_sorted, TColumnB, is_plain_b>(
|
||||
threshold, argument_types, params));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
AggregateFunctionPtr createAggregateFunctionGroupArraySorted(
|
||||
const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
|
||||
{
|
||||
UInt64 threshold = GROUP_SORTED_ARRAY_DEFAULT_THRESHOLD;
|
||||
|
||||
if (params.size() == 1)
|
||||
{
|
||||
UInt64 k = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), params[0]);
|
||||
|
||||
if (k > GROUP_SORTED_ARRAY_MAX_SIZE)
|
||||
throw Exception(
|
||||
"Too large parameter(s) for aggregate function " + name + ". Maximum: " + toString(GROUP_SORTED_ARRAY_MAX_SIZE),
|
||||
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
|
||||
if (k == 0)
|
||||
throw Exception("Parameter 0 is illegal for aggregate function " + name, ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
|
||||
threshold = k;
|
||||
}
|
||||
else if (!params.empty())
|
||||
{
|
||||
throw Exception("Aggregate function " + name + " only supports 1 parameter.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
}
|
||||
|
||||
if (argument_types.size() == 2)
|
||||
{
|
||||
if (isNumber(argument_types[1]))
|
||||
{
|
||||
#define DISPATCH2(A, B) \
|
||||
if (which.idx == TypeIndex::A) \
|
||||
return createAggregateFunctionGroupArraySortedTyped<true, B>(argument_types, params, threshold);
|
||||
#define DISPATCH(A) DISPATCH2(A, A)
|
||||
WhichDataType which(argument_types[1]);
|
||||
FOR_NUMERIC_TYPES(DISPATCH)
|
||||
DISPATCH2(Enum8, Int8)
|
||||
DISPATCH2(Enum16, Int16)
|
||||
#undef DISPATCH
|
||||
#undef DISPATCH2
|
||||
throw Exception("Invalid parameter type.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
}
|
||||
else if (argument_types[1]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
|
||||
{
|
||||
return createAggregateFunctionGroupArraySortedTyped<true, StringRef, true>(argument_types, params, threshold);
|
||||
}
|
||||
else
|
||||
{
|
||||
return createAggregateFunctionGroupArraySortedTyped<true, StringRef, false>(argument_types, params, threshold);
|
||||
}
|
||||
}
|
||||
else if (argument_types.size() == 1)
|
||||
{
|
||||
return createAggregateFunctionGroupArraySortedTyped<>(argument_types, params, threshold);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(
|
||||
"Aggregate function " + name + " requires one or two parameters.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void registerAggregateFunctionGroupArraySorted(AggregateFunctionFactory & factory)
|
||||
{
|
||||
AggregateFunctionProperties properties = {.returns_default_when_only_null = false, .is_order_dependent = true};
|
||||
factory.registerFunction("groupArraySorted", {createAggregateFunctionGroupArraySorted, properties});
|
||||
}
|
||||
}
|
310
src/AggregateFunctions/AggregateFunctionGroupArraySorted.h
Normal file
310
src/AggregateFunctions/AggregateFunctionGroupArraySorted.h
Normal file
@ -0,0 +1,310 @@
|
||||
#pragma once
|
||||
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
|
||||
#include <AggregateFunctions/AggregateFunctionGroupArraySortedData.h>
|
||||
#include <AggregateFunctions/IAggregateFunction.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
template <typename TColumn, bool is_plain>
|
||||
inline TColumn readItem(const IColumn * column, Arena * arena, size_t row)
|
||||
{
|
||||
if constexpr (std::is_same_v<TColumn, StringRef>)
|
||||
{
|
||||
if constexpr (is_plain)
|
||||
{
|
||||
StringRef str = column->getDataAt(row);
|
||||
auto ptr = arena->alloc(str.size);
|
||||
std::copy(str.data, str.data + str.size, ptr);
|
||||
return StringRef(ptr, str.size);
|
||||
}
|
||||
else
|
||||
{
|
||||
const char * begin = nullptr;
|
||||
return column->serializeValueIntoArena(row, *arena, begin);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if constexpr (std::is_same_v<TColumn, UInt64>)
|
||||
return column->getUInt(row);
|
||||
else
|
||||
return column->getInt(row);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TColumn, typename TFilter = void>
|
||||
size_t
|
||||
getFirstNElements_low_threshold(const TColumn * data, int num_elements, int threshold, size_t * results, const TFilter * filter = nullptr)
|
||||
{
|
||||
for (int i = 0; i < threshold; i++)
|
||||
{
|
||||
results[i] = 0;
|
||||
}
|
||||
|
||||
threshold = std::min(num_elements, threshold);
|
||||
int current_max = 0;
|
||||
int cur;
|
||||
int z;
|
||||
for (int i = 0; i < num_elements; i++)
|
||||
{
|
||||
if constexpr (!std::is_same_v<TFilter, void>)
|
||||
{
|
||||
if (filter[i] == 0)
|
||||
continue;
|
||||
}
|
||||
|
||||
//Starting from the highest values and we look for the immediately lower than the given one
|
||||
for (cur = current_max; cur > 0; cur--)
|
||||
{
|
||||
if (data[i] > data[results[cur - 1]])
|
||||
break;
|
||||
}
|
||||
|
||||
if (cur < threshold)
|
||||
{
|
||||
//Move all the higher values 1 position to the right
|
||||
for (z = std::min(threshold - 1, current_max); z > cur; z--)
|
||||
results[z] = results[z - 1];
|
||||
|
||||
if (current_max < threshold)
|
||||
++current_max;
|
||||
|
||||
//insert element into the given position
|
||||
results[cur] = i;
|
||||
}
|
||||
}
|
||||
|
||||
return current_max;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
struct SortableItem
|
||||
{
|
||||
T a;
|
||||
size_t b;
|
||||
bool operator<(const SortableItem & other) const { return (this->a < other.a); }
|
||||
};
|
||||
|
||||
template <typename TColumn, typename TFilter = void>
|
||||
size_t getFirstNElements_high_threshold(
|
||||
const TColumn * data, size_t num_elements, size_t threshold, size_t * results, const TFilter * filter = nullptr)
|
||||
{
|
||||
std::vector<SortableItem<TColumn>> dataIndexed(num_elements);
|
||||
size_t num_elements_filtered = 0;
|
||||
|
||||
for (size_t i = 0; i < num_elements; i++)
|
||||
{
|
||||
if constexpr (!std::is_same_v<TFilter, void>)
|
||||
{
|
||||
if (filter[i] == 0)
|
||||
continue;
|
||||
}
|
||||
|
||||
dataIndexed.data()[num_elements_filtered].a = data[i];
|
||||
dataIndexed.data()[num_elements_filtered].b = i;
|
||||
num_elements_filtered++;
|
||||
}
|
||||
|
||||
threshold = std::min(num_elements_filtered, threshold);
|
||||
|
||||
std::nth_element(dataIndexed.data(), dataIndexed.data() + threshold, dataIndexed.data() + num_elements_filtered);
|
||||
std::sort(dataIndexed.data(), dataIndexed.data() + threshold);
|
||||
|
||||
for (size_t i = 0; i < threshold; i++)
|
||||
{
|
||||
results[i] = dataIndexed[i].b;
|
||||
}
|
||||
|
||||
return threshold;
|
||||
}
|
||||
|
||||
static const size_t THRESHOLD_MAX_CUSTOM_FUNCTION = 1000;
|
||||
|
||||
template <typename TColumn>
|
||||
size_t getFirstNElements(const TColumn * data, size_t num_elements, size_t threshold, size_t * results, const UInt8 * filter = nullptr)
|
||||
{
|
||||
if (threshold < THRESHOLD_MAX_CUSTOM_FUNCTION)
|
||||
{
|
||||
if (filter != nullptr)
|
||||
return getFirstNElements_low_threshold(data, num_elements, threshold, results, filter);
|
||||
else
|
||||
return getFirstNElements_low_threshold(data, num_elements, threshold, results);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (filter != nullptr)
|
||||
return getFirstNElements_high_threshold(data, num_elements, threshold, results, filter);
|
||||
else
|
||||
return getFirstNElements_high_threshold(data, num_elements, threshold, results);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TColumnA, bool is_plain_a, bool use_column_b, typename TColumnB, bool is_plain_b>
|
||||
class AggregateFunctionGroupArraySorted : public IAggregateFunctionDataHelper<
|
||||
AggregateFunctionGroupArraySortedData<TColumnA, use_column_b, TColumnB>,
|
||||
AggregateFunctionGroupArraySorted<TColumnA, is_plain_a, use_column_b, TColumnB, is_plain_b>>
|
||||
{
|
||||
protected:
|
||||
using State = AggregateFunctionGroupArraySortedData<TColumnA, use_column_b, TColumnB>;
|
||||
using Base = IAggregateFunctionDataHelper<
|
||||
AggregateFunctionGroupArraySortedData<TColumnA, use_column_b, TColumnB>,
|
||||
AggregateFunctionGroupArraySorted>;
|
||||
|
||||
UInt64 threshold;
|
||||
DataTypePtr & input_data_type;
|
||||
mutable std::mutex mutex;
|
||||
|
||||
static void deserializeAndInsert(StringRef str, IColumn & data_to);
|
||||
|
||||
public:
|
||||
AggregateFunctionGroupArraySorted(UInt64 threshold_, const DataTypes & argument_types_, const Array & params)
|
||||
: IAggregateFunctionDataHelper<
|
||||
AggregateFunctionGroupArraySortedData<TColumnA, use_column_b, TColumnB>,
|
||||
AggregateFunctionGroupArraySorted>(argument_types_, params)
|
||||
, threshold(threshold_)
|
||||
, input_data_type(this->argument_types[0])
|
||||
{
|
||||
}
|
||||
|
||||
void create(AggregateDataPtr place) const override
|
||||
{
|
||||
Base::create(place);
|
||||
this->data(place).threshold = threshold;
|
||||
}
|
||||
|
||||
String getName() const override { return "groupArraySorted"; }
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeArray>(input_data_type); }
|
||||
|
||||
bool allocatesMemoryInArena() const override
|
||||
{
|
||||
if constexpr (std::is_same_v<TColumnA, StringRef>)
|
||||
return true;
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
|
||||
{
|
||||
State & data = this->data(place);
|
||||
if constexpr (use_column_b)
|
||||
{
|
||||
data.add(
|
||||
readItem<TColumnA, is_plain_a>(columns[0], arena, row_num), readItem<TColumnB, is_plain_b>(columns[1], arena, row_num));
|
||||
}
|
||||
else
|
||||
{
|
||||
data.add(readItem<TColumnA, is_plain_a>(columns[0], arena, row_num));
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TColumn, bool is_plain, typename TFunc>
|
||||
void
|
||||
forFirstRows(size_t batch_size, const IColumn ** columns, size_t data_column, Arena * arena, ssize_t if_argument_pos, TFunc func) const
|
||||
{
|
||||
const TColumn * values = nullptr;
|
||||
std::unique_ptr<std::vector<TColumn>> values_vector;
|
||||
std::vector<size_t> best_rows(threshold);
|
||||
|
||||
if constexpr (std::is_same_v<TColumn, StringRef>)
|
||||
{
|
||||
values_vector.reset(new std::vector<TColumn>(batch_size));
|
||||
for (size_t i = 0; i < batch_size; i++)
|
||||
(*values_vector)[i] = readItem<TColumn, is_plain>(columns[data_column], arena, i);
|
||||
values = (*values_vector).data();
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & column = assert_cast<const ColumnVector<TColumn> &>(*columns[data_column]);
|
||||
values = column.getData().data();
|
||||
}
|
||||
|
||||
const UInt8 * filter = nullptr;
|
||||
StringRef refFilter;
|
||||
|
||||
if (if_argument_pos >= 0)
|
||||
{
|
||||
refFilter = columns[if_argument_pos]->getRawData();
|
||||
filter = reinterpret_cast<const UInt8 *>(refFilter.data);
|
||||
}
|
||||
|
||||
size_t num_elements = getFirstNElements(values, batch_size, threshold, best_rows.data(), filter);
|
||||
for (size_t i = 0; i < num_elements; i++)
|
||||
{
|
||||
func(best_rows[i], values);
|
||||
}
|
||||
}
|
||||
|
||||
void addBatchSinglePlace(
|
||||
size_t batch_size, AggregateDataPtr place, const IColumn ** columns, Arena * arena, ssize_t if_argument_pos) const override
|
||||
{
|
||||
State & data = this->data(place);
|
||||
|
||||
if constexpr (use_column_b)
|
||||
{
|
||||
forFirstRows<TColumnB, is_plain_b>(
|
||||
batch_size, columns, 1, arena, if_argument_pos, [columns, &arena, &data](size_t row, const TColumnB * values)
|
||||
{
|
||||
data.add(readItem<TColumnA, is_plain_a>(columns[0], arena, row), values[row]);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
forFirstRows<TColumnA, is_plain_a>(
|
||||
batch_size, columns, 0, arena, if_argument_pos, [&data](size_t row, const TColumnA * values)
|
||||
{
|
||||
data.add(values[row]);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
|
||||
{
|
||||
this->data(place).merge(this->data(rhs));
|
||||
}
|
||||
|
||||
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
|
||||
{
|
||||
this->data(place).serialize(buf);
|
||||
}
|
||||
|
||||
void
|
||||
deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
|
||||
{
|
||||
this->data(place).deserialize(buf, arena);
|
||||
}
|
||||
|
||||
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * /*arena*/) const override
|
||||
{
|
||||
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
|
||||
|
||||
auto & values = this->data(place).values;
|
||||
offsets_to.push_back(offsets_to.back() + values.size());
|
||||
|
||||
IColumn & data_to = arr_to.getData();
|
||||
for (auto value : values)
|
||||
{
|
||||
if constexpr (std::is_same_v<TColumnA, StringRef>)
|
||||
{
|
||||
auto str = State::itemValue(value);
|
||||
if constexpr (is_plain_a)
|
||||
{
|
||||
data_to.insertData(str.data, str.size);
|
||||
}
|
||||
else
|
||||
{
|
||||
data_to.deserializeAndInsertFromArena(str.data);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
data_to.insert(State::itemValue(value));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
162
src/AggregateFunctions/AggregateFunctionGroupArraySortedData.h
Normal file
162
src/AggregateFunctions/AggregateFunctionGroupArraySortedData.h
Normal file
@ -0,0 +1,162 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/VarInt.h>
|
||||
#include <IO/WriteBuffer.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
|
||||
static inline constexpr UInt64 GROUP_SORTED_DEFAULT_THRESHOLD = 0xFFFFFF;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
template <typename T>
|
||||
static void writeOneItem(WriteBuffer & buf, T item)
|
||||
{
|
||||
if constexpr (std::numeric_limits<T>::is_signed)
|
||||
{
|
||||
writeVarInt(item, buf);
|
||||
}
|
||||
else
|
||||
{
|
||||
writeVarUInt(item, buf);
|
||||
}
|
||||
}
|
||||
|
||||
static void writeOneItem(WriteBuffer & buf, const StringRef & item)
|
||||
{
|
||||
writeBinary(item, buf);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
static void readOneItem(ReadBuffer & buf, Arena * /*arena*/, T & item)
|
||||
{
|
||||
if constexpr (std::numeric_limits<T>::is_signed)
|
||||
{
|
||||
DB::Int64 val;
|
||||
readVarT(val, buf);
|
||||
item = val;
|
||||
}
|
||||
else
|
||||
{
|
||||
DB::UInt64 val;
|
||||
readVarT(val, buf);
|
||||
item = val;
|
||||
}
|
||||
}
|
||||
|
||||
static void readOneItem(ReadBuffer & buf, Arena * arena, StringRef & item)
|
||||
{
|
||||
item = readStringBinaryInto(*arena, buf);
|
||||
}
|
||||
|
||||
template <typename Storage>
|
||||
struct AggregateFunctionGroupArraySortedDataBase
|
||||
{
|
||||
typedef typename Storage::value_type ValueType;
|
||||
AggregateFunctionGroupArraySortedDataBase(UInt64 threshold_ = GROUP_SORTED_DEFAULT_THRESHOLD) : threshold(threshold_) { }
|
||||
|
||||
virtual ~AggregateFunctionGroupArraySortedDataBase() { }
|
||||
inline void narrowDown()
|
||||
{
|
||||
while (values.size() > threshold)
|
||||
values.erase(--values.end());
|
||||
}
|
||||
|
||||
void merge(const AggregateFunctionGroupArraySortedDataBase & other)
|
||||
{
|
||||
values.merge(Storage(other.values));
|
||||
narrowDown();
|
||||
}
|
||||
|
||||
void serialize(WriteBuffer & buf) const
|
||||
{
|
||||
writeOneItem(buf, UInt64(values.size()));
|
||||
for (auto value : values)
|
||||
{
|
||||
serializeItem(buf, value);
|
||||
}
|
||||
}
|
||||
|
||||
virtual void serializeItem(WriteBuffer & buf, ValueType & val) const = 0;
|
||||
virtual ValueType deserializeItem(ReadBuffer & buf, Arena * arena) const = 0;
|
||||
|
||||
void deserialize(ReadBuffer & buf, Arena * arena)
|
||||
{
|
||||
values.clear();
|
||||
UInt64 length;
|
||||
readOneItem(buf, nullptr, length);
|
||||
|
||||
while (length--)
|
||||
{
|
||||
values.insert(deserializeItem(buf, arena));
|
||||
}
|
||||
|
||||
narrowDown();
|
||||
}
|
||||
|
||||
UInt64 threshold;
|
||||
Storage values;
|
||||
};
|
||||
|
||||
template <typename T, bool expr_sorted, typename TIndex>
|
||||
struct AggregateFunctionGroupArraySortedData
|
||||
{
|
||||
};
|
||||
|
||||
template <typename T, typename TIndex>
|
||||
struct AggregateFunctionGroupArraySortedData<T, true, TIndex> : public AggregateFunctionGroupArraySortedDataBase<std::multimap<TIndex, T>>
|
||||
{
|
||||
using Base = AggregateFunctionGroupArraySortedDataBase<std::multimap<TIndex, T>>;
|
||||
using Base::Base;
|
||||
|
||||
void add(T item, TIndex weight)
|
||||
{
|
||||
Base::values.insert({weight, item});
|
||||
Base::narrowDown();
|
||||
}
|
||||
|
||||
void serializeItem(WriteBuffer & buf, typename Base::ValueType & value) const override
|
||||
{
|
||||
writeOneItem(buf, value.first);
|
||||
writeOneItem(buf, value.second);
|
||||
}
|
||||
|
||||
virtual typename Base::ValueType deserializeItem(ReadBuffer & buf, Arena * arena) const override
|
||||
{
|
||||
TIndex first;
|
||||
T second;
|
||||
readOneItem(buf, arena, first);
|
||||
readOneItem(buf, arena, second);
|
||||
|
||||
return {first, second};
|
||||
}
|
||||
|
||||
static T itemValue(typename Base::ValueType & value) { return value.second; }
|
||||
};
|
||||
|
||||
template <typename T, typename TIndex>
|
||||
struct AggregateFunctionGroupArraySortedData<T, false, TIndex> : public AggregateFunctionGroupArraySortedDataBase<std::multiset<T>>
|
||||
{
|
||||
using Base = AggregateFunctionGroupArraySortedDataBase<std::multiset<T>>;
|
||||
using Base::Base;
|
||||
|
||||
void add(T item)
|
||||
{
|
||||
Base::values.insert(item);
|
||||
Base::narrowDown();
|
||||
}
|
||||
|
||||
void serializeItem(WriteBuffer & buf, typename Base::ValueType & value) const override { writeOneItem(buf, value); }
|
||||
|
||||
typename Base::ValueType deserializeItem(ReadBuffer & buf, Arena * arena) const override
|
||||
{
|
||||
T value;
|
||||
readOneItem(buf, arena, value);
|
||||
return value;
|
||||
}
|
||||
|
||||
static T itemValue(typename Base::ValueType & value) { return value; }
|
||||
};
|
||||
}
|
@ -59,6 +59,7 @@ void registerAggregateFunctionNothing(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionExponentialMovingAverage(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionSparkbar(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory &);
|
||||
void registerAggregateFunctionGroupArraySorted(AggregateFunctionFactory & factory);
|
||||
|
||||
class AggregateFunctionCombinatorFactory;
|
||||
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
|
||||
@ -130,6 +131,7 @@ void registerAggregateFunctions()
|
||||
registerAggregateFunctionIntervalLengthSum(factory);
|
||||
registerAggregateFunctionExponentialMovingAverage(factory);
|
||||
registerAggregateFunctionSparkbar(factory);
|
||||
registerAggregateFunctionGroupArraySorted(factory);
|
||||
|
||||
registerWindowFunctions(factory);
|
||||
}
|
||||
|
27
tests/performance/group_array_sorted.xml
Normal file
27
tests/performance/group_array_sorted.xml
Normal file
@ -0,0 +1,27 @@
|
||||
<test>
|
||||
<settings>
|
||||
<max_threads>10</max_threads>
|
||||
</settings>
|
||||
<substitutions>
|
||||
<substitution>
|
||||
<name>items</name>
|
||||
<values>
|
||||
<value>1000</value>
|
||||
<value>100000</value>
|
||||
<value>10000000</value>
|
||||
</values>
|
||||
</substitution>
|
||||
</substitutions>
|
||||
|
||||
<create_query>CREATE TABLE test ( `id` UInt64, `value` UInt64, `text` String ) ENGINE = Memory</create_query>
|
||||
<fill_query>INSERT INTO test SELECT number as id, rand64() as value, toString(number) as text FROM numbers({items})</fill_query>
|
||||
<query>SELECT groupArraySorted(10)(id, value) FROM test</query>
|
||||
<query>SELECT groupArraySorted(10)(text, value) FROM test</query>
|
||||
<query>SELECT groupArraySorted(10)((id, text), value) FROM test</query>
|
||||
<query>SELECT groupArraySorted(10)(text) FROM test</query>
|
||||
<query>SELECT groupArraySorted(10000)(id, value) FROM test</query>
|
||||
<query>SELECT groupArraySorted(10000)(text, value) FROM test</query>
|
||||
<query>SELECT groupArraySorted(10000)((id, text), value) FROM test</query>
|
||||
<query>SELECT groupArraySorted(10000)(text) FROM test</query>
|
||||
<drop_query>DROP TABLE IF EXISTS test</drop_query>
|
||||
</test>
|
18
tests/queries/0_stateless/02158_grouparraysorted.reference
Normal file
18
tests/queries/0_stateless/02158_grouparraysorted.reference
Normal file
@ -0,0 +1,18 @@
|
||||
[0,1,2,3,4]
|
||||
[0,1,2,3,4,5,6,7,8,9]
|
||||
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
|
||||
[999,998,997,996,995,994,993,992,991,990,989,988,987,986,985,984,983,982,981,980,979,978,977,976,975,974,973,972,971,970,969,968,967,966,965,964,963,962,961,960,959,958,957,956,955,954,953,952,951,950,949,948,947,946,945,944,943,942,941,940,939,938,937,936,935,934,933,932,931,930,929,928,927,926,925,924,923,922,921,920,919,918,917,916,915,914,913,912,911,910,909,908,907,906,905,904,903,902,901,900]
|
||||
['0','1','2','3','4']
|
||||
['0','1','2','3','4']
|
||||
['9','8','7','6','5']
|
||||
[(0,'0'),(1,'1'),(2,'2'),(3,'3'),(4,'4')]
|
||||
['0','1','10','11','12']
|
||||
['0','1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20','21','22','23','24','25','26','27','28','29','30','31','32','33','34','35','36','37','38','39','40','41','42','43','44','45','46','47','48','49']
|
||||
[0,0,1,1,2,2,3,3,4,4]
|
||||
pablo [1,2]
|
||||
luis [1,3]
|
||||
pablo [1,2]
|
||||
luis [1,3]
|
||||
[4,5,6,7,8]
|
||||
[10,11,12,13,14]
|
||||
['10','11','12','13','14']
|
43
tests/queries/0_stateless/02158_grouparraysorted.sql
Normal file
43
tests/queries/0_stateless/02158_grouparraysorted.sql
Normal file
@ -0,0 +1,43 @@
|
||||
|
||||
SELECT groupArraySorted(5)(number) from numbers(100);
|
||||
|
||||
SELECT groupArraySorted(number, number) from numbers(100);
|
||||
|
||||
SELECT groupArraySorted(100)(number, number) from numbers(1000);
|
||||
|
||||
SELECT groupArraySorted(100)(number, -number) from numbers(1000);
|
||||
|
||||
SELECT groupArraySorted(5)(str, number) FROM (SELECT toString(number) as str, number FROM numbers(10));
|
||||
|
||||
SELECT groupArraySorted(5)(text) FROM (select toString(number) as text from numbers(10));
|
||||
|
||||
SELECT groupArraySorted(5)(text, -number) FROM (select toString(number) as text, number from numbers(10));
|
||||
|
||||
SELECT groupArraySorted(5)((number,text)) from (SELECT toString(number) as text, number FROM numbers(100));
|
||||
|
||||
SELECT groupArraySorted(5)(text,text) from (SELECT toString(number) as text FROM numbers(100));
|
||||
|
||||
SELECT groupArraySorted(50)(text,(number,text)) from (SELECT toString(number) as text, number FROM numbers(100));
|
||||
|
||||
SELECT groupArraySorted(10)(toInt64(number/2)) FROM numbers(100);
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS test;
|
||||
DROP VIEW IF EXISTS mv_test;
|
||||
CREATE TABLE test (`n` String, `h` Int64) ENGINE = MergeTree ORDER BY n;
|
||||
CREATE MATERIALIZED VIEW mv_test (`n` String, `h` AggregateFunction(groupArraySorted(2), Int64, Int64)) ENGINE = AggregatingMergeTree ORDER BY n AS SELECT n, groupArraySortedState(2)(h, h) as h FROM test GROUP BY n;
|
||||
INSERT INTO test VALUES ('pablo',1)('pablo', 2)('luis', 1)('luis', 3)('pablo', 5)('pablo',4)('pablo', 5)('luis', 6)('luis', 7)('pablo', 8)('pablo',9)('pablo',10)('luis',11)('luis',12)('pablo',13);
|
||||
SELECT n, groupArraySortedMerge(2)(h) from mv_test GROUP BY n;
|
||||
|
||||
DROP TABLE IF EXISTS test;
|
||||
DROP VIEW IF EXISTS mv_test;
|
||||
CREATE TABLE test (`n` String, `h` Int64) ENGINE = MergeTree ORDER BY n;
|
||||
CREATE MATERIALIZED VIEW mv_test (`n` String, `h` AggregateFunction(groupArraySorted(2), Int64)) ENGINE = AggregatingMergeTree ORDER BY n AS SELECT n, groupArraySortedState(2)(h) as h FROM test GROUP BY n;
|
||||
INSERT INTO test VALUES ('pablo',1)('pablo', 2)('luis', 1)('luis', 3)('pablo', 5)('pablo',4)('pablo', 5)('luis', 6)('luis', 7)('pablo', 8)('pablo',9)('pablo',10)('luis',11)('luis',12)('pablo',13);
|
||||
SELECT n, groupArraySortedMerge(2)(h) from mv_test GROUP BY n;
|
||||
DROP TABLE test;
|
||||
DROP VIEW mv_test;
|
||||
|
||||
SELECT groupArraySortedIf(5)(number, number, number>3) from numbers(100);
|
||||
SELECT groupArraySortedIf(5)(number, toString(number), number>3) from numbers(100);
|
||||
SELECT groupArraySortedIf(5)(toString(number), number>3) from numbers(100);
|
Loading…
Reference in New Issue
Block a user