ClickHouse/src/AggregateFunctions/AggregateFunctionMaxIntersections.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

187 lines
6.5 KiB
C++
Raw Normal View History

2018-03-14 04:36:41 +00:00
#pragma once
2022-01-30 19:49:48 +00:00
#include <base/sort.h>
2018-03-14 04:36:41 +00:00
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/ArenaAllocator.h>
#include <Common/NaNUtils.h>
#include <Common/assert_cast.h>
2018-03-14 04:36:41 +00:00
#include <AggregateFunctions/IAggregateFunction.h>
#define AGGREGATE_FUNCTION_MAX_INTERSECTIONS_MAX_ARRAY_SIZE 0xFFFFFF
namespace DB
{
struct Settings;
2018-03-14 04:36:41 +00:00
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int TOO_LARGE_ARRAY_SIZE;
}
/** maxIntersections: returns maximum count of the intersected intervals defined by start_column and end_column values,
* maxIntersectionsPosition: returns leftmost position of maximum intersection of intervals.
*/
/// Similar to GroupArrayNumericData.
template <typename T>
struct MaxIntersectionsData
{
/// Left or right end of the interval and signed weight; with positive sign for begin of interval and negative sign for end of interval.
using Value = std::pair<T, Int64>;
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(Value), 4096>;
2018-03-14 04:36:41 +00:00
using Array = PODArray<Value, 32, Allocator>;
Array value;
};
enum class AggregateFunctionIntersectionsKind
{
Count,
Position
};
template <typename PointType>
class AggregateFunctionIntersectionsMax final
: public IAggregateFunctionDataHelper<MaxIntersectionsData<PointType>, AggregateFunctionIntersectionsMax<PointType>>
{
private:
AggregateFunctionIntersectionsKind kind;
public:
AggregateFunctionIntersectionsMax(AggregateFunctionIntersectionsKind kind_, const DataTypes & arguments)
2022-11-28 15:02:59 +00:00
: IAggregateFunctionDataHelper<MaxIntersectionsData<PointType>, AggregateFunctionIntersectionsMax<PointType>>(arguments, {}, createResultType(kind_))
, kind(kind_)
2018-03-14 04:36:41 +00:00
{
2019-05-24 12:11:03 +00:00
if (!isNativeNumber(arguments[0]))
2023-01-23 13:16:14 +00:00
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "{}: first argument must be represented by integer", getName());
2018-03-14 04:36:41 +00:00
2019-05-24 12:11:03 +00:00
if (!isNativeNumber(arguments[1]))
2023-01-23 13:16:14 +00:00
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "{}: second argument must be represented by integer", getName());
2018-03-14 04:36:41 +00:00
if (!arguments[0]->equals(*arguments[1]))
2023-01-23 13:16:14 +00:00
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "{}: arguments must have the same type", getName());
2018-03-14 04:36:41 +00:00
}
String getName() const override
{
return kind == AggregateFunctionIntersectionsKind::Count
? "maxIntersections"
: "maxIntersectionsPosition";
}
2022-11-28 15:02:59 +00:00
static DataTypePtr createResultType(AggregateFunctionIntersectionsKind kind_)
2018-03-14 04:36:41 +00:00
{
2022-11-28 15:02:59 +00:00
if (kind_ == AggregateFunctionIntersectionsKind::Count)
2018-03-14 04:36:41 +00:00
return std::make_shared<DataTypeUInt64>();
else
return std::make_shared<DataTypeNumber<PointType>>();
}
bool allocatesMemoryInArena() const override { return false; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
2018-03-14 04:36:41 +00:00
{
PointType left = assert_cast<const ColumnVector<PointType> &>(*columns[0]).getData()[row_num];
PointType right = assert_cast<const ColumnVector<PointType> &>(*columns[1]).getData()[row_num];
2018-03-14 04:36:41 +00:00
if (!isNaN(left))
this->data(place).value.push_back(std::make_pair(left, Int64(1)), arena);
if (!isNaN(right))
this->data(place).value.push_back(std::make_pair(right, Int64(-1)), arena);
2018-03-14 04:36:41 +00:00
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
2018-03-14 04:36:41 +00:00
{
auto & cur_elems = this->data(place);
auto & rhs_elems = this->data(rhs);
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.end(), arena);
}
2021-05-30 13:57:30 +00:00
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
2018-03-14 04:36:41 +00:00
{
const auto & value = this->data(place).value;
size_t size = value.size();
writeVarUInt(size, buf);
2023-02-26 21:58:40 +00:00
/// In this version, pairs were serialized with padding.
/// We must ensure that padding bytes are zero-filled.
static_assert(offsetof(typename MaxIntersectionsData<PointType>::Value, first) == 0);
static_assert(offsetof(typename MaxIntersectionsData<PointType>::Value, second) > 0);
char zero_padding[offsetof(typename MaxIntersectionsData<PointType>::Value, second) - sizeof(value[0].first)]{};
for (size_t i = 0; i < size; ++i)
{
2023-02-26 21:58:40 +00:00
writePODBinary(value[i].first, buf);
writePODBinary(zero_padding, buf);
writePODBinary(value[i].second, buf);
}
2018-03-14 04:36:41 +00:00
}
2021-05-31 14:44:57 +00:00
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
2018-03-14 04:36:41 +00:00
{
size_t size = 0;
readVarUInt(size, buf);
if (unlikely(size > AGGREGATE_FUNCTION_MAX_INTERSECTIONS_MAX_ARRAY_SIZE))
2023-04-01 16:23:59 +00:00
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size (maximum: {})", AGGREGATE_FUNCTION_MAX_INTERSECTIONS_MAX_ARRAY_SIZE);
2018-03-14 04:36:41 +00:00
auto & value = this->data(place).value;
value.resize(size, arena);
2022-11-09 17:54:49 +00:00
buf.readStrict(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
2018-03-14 04:36:41 +00:00
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
2018-03-14 04:36:41 +00:00
{
Int64 current_intersections = 0;
Int64 max_intersections = 0;
PointType position_of_max_intersections = 0;
/// const_cast because we will sort the array
auto & array = this->data(place).value;
2018-03-14 04:36:41 +00:00
/// Sort by position; for equal position, sort by weight to get deterministic result.
2022-01-30 19:49:48 +00:00
::sort(array.begin(), array.end());
2018-03-14 04:36:41 +00:00
for (const auto & point_weight : array)
{
current_intersections += point_weight.second;
if (current_intersections > max_intersections)
{
max_intersections = current_intersections;
position_of_max_intersections = point_weight.first;
}
}
if (kind == AggregateFunctionIntersectionsKind::Count)
{
auto & result_column = assert_cast<ColumnUInt64 &>(to).getData();
2018-03-14 04:36:41 +00:00
result_column.push_back(max_intersections);
}
else
{
auto & result_column = assert_cast<ColumnVector<PointType> &>(to).getData();
2018-03-14 04:36:41 +00:00
result_column.push_back(position_of_max_intersections);
}
}
};
}