mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Destructive IAggregateFunction::insertResultInto and ColumnAggregateFunction::convertToValues (#10890)
* Destructive IAggregateFunction::insertResultInto and ColumnAggregateFunction::convertToValues * Try fix build. * Try fix build. * Fix build. * Make convertToValues static. * fix build. * Remove const casts. * Added comment. * Fix build. * Fix build. * Add test. * Fix test.
This commit is contained in:
parent
f8195a577c
commit
f65305878b
@ -93,7 +93,7 @@ public:
|
||||
buf.read(c);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr, IColumn & to) const override
|
||||
{
|
||||
to.insertDefault();
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ public:
|
||||
return Data::allocatesMemoryInArena();
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
this->data(place).result.insertResultInto(to);
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ public:
|
||||
nested_func->deserialize(place, buf, arena);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
nested_func->insertResultInto(place, to);
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ public:
|
||||
readBinary(this->data(place).denominator, buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
auto & column = static_cast<ColVecResult &>(to);
|
||||
column.getData().push_back(this->data(place).template result<ResultType>());
|
||||
|
@ -74,7 +74,7 @@ public:
|
||||
readBinary(this->data(place).value, buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).value);
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ public:
|
||||
data(place).deserialize(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnFloat64 &>(to).getData().push_back(getBoundingRatio(data(place)));
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ public:
|
||||
}
|
||||
|
||||
void insertResultInto(
|
||||
ConstAggregateDataPtr place,
|
||||
AggregateDataPtr place,
|
||||
IColumn & to
|
||||
) const override
|
||||
{
|
||||
|
@ -57,7 +57,7 @@ public:
|
||||
readVarUInt(data(place).count, buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
|
||||
}
|
||||
@ -108,7 +108,7 @@ public:
|
||||
readVarUInt(data(place).count, buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
|
||||
}
|
||||
|
@ -140,7 +140,7 @@ public:
|
||||
this->data(place).deserialize(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
auto & column = assert_cast<ColumnVector<Float64> &>(to);
|
||||
column.getData().push_back(this->data(place).get());
|
||||
|
@ -225,15 +225,15 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const AggregateFunctionForEachData & state = data(place);
|
||||
AggregateFunctionForEachData & state = data(place);
|
||||
|
||||
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
|
||||
IColumn & elems_to = arr_to.getData();
|
||||
|
||||
const char * nested_state = state.array_of_aggregate_datas;
|
||||
char * nested_state = state.array_of_aggregate_datas;
|
||||
for (size_t i = 0; i < state.dynamic_array_size; ++i)
|
||||
{
|
||||
nested_func->insertResultInto(nested_state, elems_to);
|
||||
|
@ -282,7 +282,7 @@ public:
|
||||
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const auto & value = this->data(place).value;
|
||||
size_t size = value.size();
|
||||
@ -600,7 +600,7 @@ public:
|
||||
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
auto & column_array = assert_cast<ColumnArray &>(to);
|
||||
|
||||
@ -815,7 +815,7 @@ public:
|
||||
data(place).last = prev;
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
auto & column_array = assert_cast<ColumnArray &>(to);
|
||||
|
||||
|
@ -179,7 +179,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
ColumnArray & to_array = assert_cast<ColumnArray &>(to);
|
||||
IColumn & to_data = to_array.getData();
|
||||
|
@ -158,7 +158,7 @@ public:
|
||||
this->data(place).sum = value.back();
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const auto & data = this->data(place);
|
||||
size_t size = data.value.size();
|
||||
|
@ -48,7 +48,7 @@ public:
|
||||
this->data(place).rbs.read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
|
||||
}
|
||||
@ -113,7 +113,7 @@ public:
|
||||
this->data(place).rbs.read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ public:
|
||||
this->data(place).value.read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
|
||||
@ -241,7 +241,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
|
||||
|
@ -353,9 +353,9 @@ public:
|
||||
this->data(place).read(buf, max_bins);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
auto & data = this->data(const_cast<AggregateDataPtr>(place));
|
||||
auto & data = this->data(place);
|
||||
|
||||
auto & to_array = assert_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets & offsets_to = to_array.getOffsets();
|
||||
|
@ -95,7 +95,7 @@ public:
|
||||
nested_func->deserialize(place, buf, arena);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
nested_func->insertResultInto(place, to);
|
||||
}
|
||||
|
@ -389,7 +389,7 @@ public:
|
||||
/** This function is called if aggregate function without State modifier is selected in a query.
|
||||
* Inserts all weights of the model into the column 'to', so user may use such information if needed
|
||||
*/
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
this->data(place).returnWeights(to);
|
||||
}
|
||||
|
@ -129,14 +129,14 @@ public:
|
||||
buf.read(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
Int64 current_intersections = 0;
|
||||
Int64 max_intersections = 0;
|
||||
PointType position_of_max_intersections = 0;
|
||||
|
||||
/// const_cast because we will sort the array
|
||||
auto & array = const_cast<typename MaxIntersectionsData<PointType>::Array &>(this->data(place).value);
|
||||
auto & array = this->data(place).value;
|
||||
|
||||
/// Sort by position; for equal position, sort by weight to get deterministic result.
|
||||
std::sort(array.begin(), array.end());
|
||||
|
@ -93,7 +93,7 @@ public:
|
||||
nested_func->deserialize(place, buf, arena);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
nested_func->insertResultInto(place, to);
|
||||
}
|
||||
|
@ -746,7 +746,7 @@ public:
|
||||
return Data::allocatesMemoryInArena();
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
this->data(place).insertResultInto(to);
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr, IColumn & to) const override
|
||||
{
|
||||
to.insertDefault();
|
||||
}
|
||||
|
@ -146,7 +146,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
if (result_is_nullable)
|
||||
{
|
||||
|
@ -147,7 +147,7 @@ public:
|
||||
}
|
||||
|
||||
void insertResultInto(
|
||||
ConstAggregateDataPtr place,
|
||||
AggregateDataPtr place,
|
||||
IColumn & to) const override
|
||||
{
|
||||
if (place[size_of_data])
|
||||
|
@ -138,10 +138,10 @@ public:
|
||||
this->data(place).deserialize(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
/// const_cast is required because some data structures apply finalizaton (like sorting) for obtain a result.
|
||||
auto & data = this->data(const_cast<AggregateDataPtr>(place));
|
||||
auto & data = this->data(place);
|
||||
|
||||
if constexpr (returns_many)
|
||||
{
|
||||
|
@ -173,7 +173,7 @@ public:
|
||||
}
|
||||
|
||||
void insertResultInto(
|
||||
ConstAggregateDataPtr place,
|
||||
AggregateDataPtr place,
|
||||
IColumn & to) const override
|
||||
{
|
||||
auto & col = assert_cast<ColumnArray &>(to);
|
||||
|
@ -123,7 +123,7 @@ public:
|
||||
this->data(place).deserialize(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
auto & data_to = assert_cast<ColumnUInt8 &>(assert_cast<ColumnArray &>(to).getData()).getData();
|
||||
auto & offsets_to = assert_cast<ColumnArray &>(to).getOffsets();
|
||||
|
@ -560,9 +560,9 @@ public:
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt8>(); }
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const_cast<Data &>(this->data(place)).sort();
|
||||
this->data(place).sort();
|
||||
|
||||
const auto & data_ref = this->data(place);
|
||||
|
||||
@ -588,7 +588,7 @@ public:
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt64>(); }
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const_cast<Data &>(this->data(place)).sort();
|
||||
assert_cast<ColumnUInt64 &>(to).getData().push_back(count(place));
|
||||
|
@ -169,7 +169,7 @@ public:
|
||||
}
|
||||
|
||||
void insertResultInto(
|
||||
ConstAggregateDataPtr place,
|
||||
AggregateDataPtr place,
|
||||
IColumn & to
|
||||
) const override
|
||||
{
|
||||
|
@ -80,9 +80,9 @@ public:
|
||||
nested_func->deserialize(place, buf, arena);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnAggregateFunction &>(to).getData().push_back(const_cast<AggregateDataPtr>(place));
|
||||
assert_cast<ColumnAggregateFunction &>(to).getData().push_back(place);
|
||||
}
|
||||
|
||||
/// Aggregate function or aggregate function state.
|
||||
|
@ -143,7 +143,7 @@ public:
|
||||
this->data(place).deserialize(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
this->data(place).publish(to);
|
||||
}
|
||||
@ -395,7 +395,7 @@ public:
|
||||
this->data(place).deserialize(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
this->data(place).publish(to);
|
||||
}
|
||||
|
@ -445,7 +445,7 @@ public:
|
||||
this->data(place).read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const auto & data = this->data(place);
|
||||
auto & dst = static_cast<ColVecResult &>(to).getData();
|
||||
|
@ -156,7 +156,7 @@ public:
|
||||
this->data(place).read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
auto & column = static_cast<ColVecResult &>(to);
|
||||
column.getData().push_back(this->data(place).get());
|
||||
|
@ -242,10 +242,10 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
// Final step does compaction of keys that have zero values, this mutates the state
|
||||
auto & merged_maps = this->data(const_cast<AggregateDataPtr>(place)).merged_maps;
|
||||
auto & merged_maps = this->data(place).merged_maps;
|
||||
for (auto it = merged_maps.cbegin(); it != merged_maps.cend();)
|
||||
{
|
||||
// Key is not compacted if it has at least one non-zero value
|
||||
|
@ -253,7 +253,7 @@ public:
|
||||
|
||||
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).deserialize(buf); }
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
const auto & value = this->data(place).result;
|
||||
size_t size = value.size();
|
||||
|
@ -84,7 +84,7 @@ public:
|
||||
set.read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
|
||||
@ -211,7 +211,7 @@ public:
|
||||
this->data(place).value.merge(this->data(rhs).value);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
|
||||
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
|
||||
|
@ -240,7 +240,7 @@ public:
|
||||
this->data(place).set.read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
|
||||
}
|
||||
@ -294,7 +294,7 @@ public:
|
||||
this->data(place).set.read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ public:
|
||||
this->data(place).set.read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
|
||||
}
|
||||
@ -229,7 +229,7 @@ public:
|
||||
this->data(place).set.read(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
|
||||
}
|
||||
|
@ -180,7 +180,7 @@ public:
|
||||
this->data(place).read(buf, threshold);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
|
||||
}
|
||||
@ -242,7 +242,7 @@ public:
|
||||
this->data(place).read(buf, threshold);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
|
||||
}
|
||||
|
@ -151,14 +151,14 @@ private:
|
||||
// The level path must be 1---2---3---...---check_events_size, find the max event level that statisfied the path in the sliding window.
|
||||
// If found, returns the max event level, else return 0.
|
||||
// The Algorithm complexity is O(n).
|
||||
UInt8 getEventLevel(const Data & data) const
|
||||
UInt8 getEventLevel(Data & data) const
|
||||
{
|
||||
if (data.size() == 0)
|
||||
return 0;
|
||||
if (!strict_order && events_size == 1)
|
||||
return 1;
|
||||
|
||||
const_cast<Data &>(data).sort();
|
||||
data.sort();
|
||||
|
||||
/// events_timestamp stores the timestamp that latest i-th level event happen withing time window after previous level event.
|
||||
/// timestamp defaults to -1, which unsigned timestamp value never meet
|
||||
@ -279,7 +279,7 @@ public:
|
||||
this->data(place).deserialize(buf);
|
||||
}
|
||||
|
||||
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
|
||||
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
|
||||
{
|
||||
assert_cast<ColumnUInt8 &>(to).getData().push_back(getEventLevel(this->data(place)));
|
||||
}
|
||||
|
@ -104,7 +104,9 @@ public:
|
||||
}
|
||||
|
||||
/// Inserts results into a column.
|
||||
virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;
|
||||
/// This method must be called once, from single thread.
|
||||
/// After this method was called for state, you can't do anything with state but destroy.
|
||||
virtual void insertResultInto(AggregateDataPtr place, IColumn & to) const = 0;
|
||||
|
||||
/// Used for machine learning methods. Predict result from trained model.
|
||||
/// Will insert result into `to` column for rows in range [offset, offset + limit).
|
||||
|
@ -167,7 +167,7 @@ namespace detail
|
||||
buf.readStrict(reinterpret_cast<char *>(elems.data()), size * sizeof(elems[0]));
|
||||
}
|
||||
|
||||
UInt16 get(double level) const
|
||||
UInt16 get(double level)
|
||||
{
|
||||
UInt16 quantile = 0;
|
||||
|
||||
@ -178,7 +178,7 @@ namespace detail
|
||||
: (elems.size() - 1);
|
||||
|
||||
/// Sorting an array will not be considered a violation of constancy.
|
||||
auto & array = const_cast<Array &>(elems);
|
||||
auto & array = elems;
|
||||
std::nth_element(array.begin(), array.begin() + n, array.end());
|
||||
quantile = array[n];
|
||||
}
|
||||
@ -187,10 +187,10 @@ namespace detail
|
||||
}
|
||||
|
||||
template <typename ResultType>
|
||||
void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const
|
||||
void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result)
|
||||
{
|
||||
size_t prev_n = 0;
|
||||
auto & array = const_cast<Array &>(elems);
|
||||
auto & array = elems;
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
auto level_index = levels_permutation[i];
|
||||
@ -208,14 +208,14 @@ namespace detail
|
||||
}
|
||||
|
||||
/// Same, but in the case of an empty state, NaN is returned.
|
||||
float getFloat(double level) const
|
||||
float getFloat(double level)
|
||||
{
|
||||
return !elems.empty()
|
||||
? get(level)
|
||||
: std::numeric_limits<float>::quiet_NaN();
|
||||
}
|
||||
|
||||
void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const
|
||||
void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result)
|
||||
{
|
||||
if (!elems.empty())
|
||||
getMany(levels, levels_permutation, size, result);
|
||||
@ -707,7 +707,7 @@ public:
|
||||
}
|
||||
|
||||
/// Get the value of the `level` quantile. The level must be between 0 and 1.
|
||||
UInt16 get(double level) const
|
||||
UInt16 get(double level)
|
||||
{
|
||||
Kind kind = which();
|
||||
|
||||
@ -728,7 +728,7 @@ public:
|
||||
|
||||
/// Get the size values of the quantiles of the `levels` levels. Record `size` results starting with `result` address.
|
||||
template <typename ResultType>
|
||||
void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result) const
|
||||
void getMany(const double * levels, const size_t * levels_permutation, size_t size, ResultType * result)
|
||||
{
|
||||
Kind kind = which();
|
||||
|
||||
@ -748,14 +748,14 @@ public:
|
||||
}
|
||||
|
||||
/// The same, but in the case of an empty state, NaN is returned.
|
||||
float getFloat(double level) const
|
||||
float getFloat(double level)
|
||||
{
|
||||
return tiny.count
|
||||
? get(level)
|
||||
: std::numeric_limits<float>::quiet_NaN();
|
||||
}
|
||||
|
||||
void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result) const
|
||||
void getManyFloat(const double * levels, const size_t * levels_permutation, size_t size, float * result)
|
||||
{
|
||||
if (tiny.count)
|
||||
getMany(levels, levels_permutation, size, result);
|
||||
|
@ -39,7 +39,7 @@ void ColumnAggregateFunction::addArena(ConstArenaPtr arena_)
|
||||
foreign_arenas.push_back(arena_);
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnAggregateFunction::convertToValues() const
|
||||
MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr column)
|
||||
{
|
||||
/** If the aggregate function returns an unfinalized/unfinished state,
|
||||
* then you just need to copy pointers to it and also shared ownership of data.
|
||||
@ -65,20 +65,26 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues() const
|
||||
* `AggregateFunction(quantileTimingState(0.5), UInt64)`
|
||||
* into `AggregateFunction(quantileTiming(0.5), UInt64)`
|
||||
* - in the same states.
|
||||
*
|
||||
*column_aggregate_func
|
||||
* Then `finalizeAggregation` function will be calculated, which will call `convertToValues` already on the result.
|
||||
* And this converts a column of type
|
||||
* AggregateFunction(quantileTiming(0.5), UInt64)
|
||||
* into UInt16 - already finished result of `quantileTiming`.
|
||||
*/
|
||||
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*column);
|
||||
auto & func = column_aggregate_func.func;
|
||||
auto & data = column_aggregate_func.data;
|
||||
|
||||
if (const AggregateFunctionState *function_state = typeid_cast<const AggregateFunctionState *>(func.get()))
|
||||
{
|
||||
auto res = createView();
|
||||
auto res = column_aggregate_func.createView();
|
||||
res->set(function_state->getNestedFunction());
|
||||
res->data.assign(data.begin(), data.end());
|
||||
return res;
|
||||
}
|
||||
|
||||
column_aggregate_func.ensureOwnership();
|
||||
|
||||
MutableColumnPtr res = func->getReturnType()->createColumn();
|
||||
res->reserve(data.size());
|
||||
|
||||
|
@ -114,14 +114,14 @@ public:
|
||||
/// Take shared ownership of Arena, that holds memory for states of aggregate functions.
|
||||
void addArena(ConstArenaPtr arena_);
|
||||
|
||||
/** Transform column with states of aggregate functions to column with final result values.
|
||||
*/
|
||||
MutableColumnPtr convertToValues() const;
|
||||
/// Transform column with states of aggregate functions to column with final result values.
|
||||
/// It expects ColumnAggregateFunction as an argument, this column will be destroyed.
|
||||
/// This method is made static and receive MutableColumnPtr object to explicitly destroy it.
|
||||
static MutableColumnPtr convertToValues(MutableColumnPtr column);
|
||||
|
||||
std::string getName() const override { return "AggregateFunction(" + func->getName() + ")"; }
|
||||
const char * getFamilyName() const override { return "AggregateFunction"; }
|
||||
|
||||
bool tryFinalizeAggregateFunction(MutableColumnPtr* res_) const;
|
||||
MutableColumnPtr predictValues(Block & block, const ColumnNumbers & arguments, const Context & context) const;
|
||||
|
||||
size_t size() const override
|
||||
|
@ -17,7 +17,10 @@ namespace DB
|
||||
{
|
||||
current.type = unfinalized_type->getReturnType();
|
||||
if (current.column)
|
||||
current.column = typeid_cast<const ColumnAggregateFunction &>(*current.column).convertToValues();
|
||||
{
|
||||
auto mut_column = (*std::move(current.column)).mutate();
|
||||
current.column = ColumnAggregateFunction::convertToValues(std::move(mut_column));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -57,15 +57,16 @@ public:
|
||||
|
||||
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
|
||||
{
|
||||
const ColumnAggregateFunction * column_with_states
|
||||
= typeid_cast<const ColumnAggregateFunction *>(&*block.getByPosition(arguments.at(0)).column);
|
||||
if (!column_with_states)
|
||||
auto column = block.getByPosition(arguments.at(0)).column;
|
||||
if (!typeid_cast<const ColumnAggregateFunction *>(column.get()))
|
||||
throw Exception("Illegal column " + block.getByPosition(arguments.at(0)).column->getName()
|
||||
+ " of first argument of function "
|
||||
+ getName(),
|
||||
ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
block.getByPosition(result).column = column_with_states->convertToValues();
|
||||
/// Column is copied here, because there is no guarantee that we own it.
|
||||
auto mut_column = (*std::move(column)).mutate();
|
||||
block.getByPosition(result).column = ColumnAggregateFunction::convertToValues(std::move(mut_column));
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -21,8 +21,13 @@ void finalizeChunk(Chunk & chunk)
|
||||
auto columns = chunk.detachColumns();
|
||||
|
||||
for (auto & column : columns)
|
||||
if (const auto * agg_function = typeid_cast<const ColumnAggregateFunction *>(column.get()))
|
||||
column = agg_function->convertToValues();
|
||||
{
|
||||
if (typeid_cast<const ColumnAggregateFunction *>(column.get()))
|
||||
{
|
||||
auto mut_column = (*std::move(column)).mutate();
|
||||
column = ColumnAggregateFunction::convertToValues(std::move(mut_column));
|
||||
}
|
||||
}
|
||||
|
||||
chunk.setColumns(std::move(columns), num_rows);
|
||||
}
|
||||
|
@ -0,0 +1,18 @@
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
||||
200
|
@ -0,0 +1,23 @@
|
||||
drop table if exists test_quantile;
|
||||
create table test_quantile (x AggregateFunction(quantileTiming(0.2), UInt64)) engine = Memory;
|
||||
insert into test_quantile select medianTimingState(.2)(number) from (select * from numbers(1000) order by number desc);
|
||||
select y from (
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile union all
|
||||
select finalizeAggregation(x) as y from test_quantile)
|
||||
order by y;
|
Loading…
Reference in New Issue
Block a user