Merge pull request #11729 from CurtizJ/distinct-combinator

Merging #10930. (Distinct combinator)
This commit is contained in:
Anton Popov 2020-06-26 13:40:32 +03:00 committed by GitHub
commit 39fe7d6c3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
63 changed files with 529 additions and 120 deletions

View File

@ -93,7 +93,7 @@ public:
buf.read(c);
}
void insertResultInto(AggregateDataPtr, IColumn & to) const override
void insertResultInto(AggregateDataPtr, IColumn & to, Arena *) const override
{
to.insertDefault();
}

View File

@ -85,7 +85,7 @@ public:
return Data::allocatesMemoryInArena();
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
this->data(place).result.insertResultInto(to);
}

View File

@ -119,9 +119,9 @@ public:
nested_func->deserialize(place, buf, arena);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override
{
nested_func->insertResultInto(place, to);
nested_func->insertResultInto(place, to, arena);
}
bool allocatesMemoryInArena() const override

View File

@ -80,7 +80,7 @@ public:
readBinary(this->data(place).denominator, buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & column = static_cast<ColVecResult &>(to);
column.getData().push_back(this->data(place).template result<ResultType>());

View File

@ -74,7 +74,7 @@ public:
readBinary(this->data(place).value, buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).value);
}

View File

@ -150,7 +150,7 @@ public:
data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnFloat64 &>(to).getData().push_back(getBoundingRatio(data(place)));
}

View File

@ -119,8 +119,8 @@ public:
void insertResultInto(
AggregateDataPtr place,
IColumn & to
) const override
IColumn & to,
Arena *) const override
{
auto & col = static_cast<ColumnArray &>(to);
auto & data_col = static_cast<ColumnFloat64 &>(col.getData());

View File

@ -57,7 +57,7 @@ public:
readVarUInt(data(place).count, buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}
@ -112,7 +112,7 @@ public:
readVarUInt(data(place).count, buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}

View File

@ -0,0 +1,64 @@
#include <AggregateFunctions/AggregateFunctionDistinct.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/Helpers.h>
#include <Common/typeid_cast.h>
#include "registerAggregateFunctions.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
class AggregateFunctionCombinatorDistinct final : public IAggregateFunctionCombinator
{
public:
String getName() const override { return "Distinct"; }
DataTypes transformArguments(const DataTypes & arguments) const override
{
if (arguments.empty())
throw Exception("Incorrect number of arguments for aggregate function with " + getName() + " suffix",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return arguments;
}
AggregateFunctionPtr transformAggregateFunction(
const AggregateFunctionPtr & nested_function,
const AggregateFunctionProperties &,
const DataTypes & arguments,
const Array &) const override
{
AggregateFunctionPtr res;
if (arguments.size() == 1)
{
res.reset(createWithNumericType<
AggregateFunctionDistinct,
AggregateFunctionDistinctSingleNumericData>(*arguments[0], nested_function, arguments));
if (res)
return res;
if (arguments[0]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
return std::make_shared<
AggregateFunctionDistinct<
AggregateFunctionDistinctSingleGenericData<true>>>(nested_function, arguments);
else
return std::make_shared<
AggregateFunctionDistinct<
AggregateFunctionDistinctSingleGenericData<false>>>(nested_function, arguments);
}
return std::make_shared<AggregateFunctionDistinct<AggregateFunctionDistinctMultipleGenericData>>(nested_function, arguments);
}
};
void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFactory & factory)
{
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorDistinct>());
}
}

View File

@ -0,0 +1,240 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/KeyHolderHelpers.h>
#include <Columns/ColumnArray.h>
#include <Common/assert_cast.h>
#include <DataTypes/DataTypeArray.h>
#include <Interpreters/AggregationCommon.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HashTable/HashMap.h>
#include <Common/SipHash.h>
#include <Common/FieldVisitors.h>
namespace DB
{
template <typename T>
struct AggregateFunctionDistinctSingleNumericData
{
/// When creating, the hash table must be small.
using Set = HashSetWithStackMemory<T, DefaultHash<T>, 4>;
using Self = AggregateFunctionDistinctSingleNumericData<T>;
Set set;
void add(const IColumn ** columns, size_t /* columns_num */, size_t row_num, Arena *)
{
const auto & vec = assert_cast<const ColumnVector<T> &>(*columns[0]).getData();
set.insert(vec[row_num]);
}
void merge(const Self & rhs, Arena *)
{
set.merge(rhs.set);
}
void serialize(WriteBuffer & buf) const
{
set.write(buf);
}
void deserialize(ReadBuffer & buf, Arena *)
{
set.read(buf);
}
MutableColumns getArguments(const DataTypes & argument_types) const
{
MutableColumns argument_columns;
argument_columns.emplace_back(argument_types[0]->createColumn());
for (const auto & elem : set)
argument_columns[0]->insert(elem.getValue());
return argument_columns;
}
};
struct AggregateFunctionDistinctGenericData
{
/// When creating, the hash table must be small.
using Set = HashSetWithSavedHashWithStackMemory<StringRef, StringRefHash, 4>;
using Self = AggregateFunctionDistinctGenericData;
Set set;
void merge(const Self & rhs, Arena * arena)
{
Set::LookupResult it;
bool inserted;
for (const auto & elem : rhs.set)
set.emplace(ArenaKeyHolder{elem.getValue(), *arena}, it, inserted);
}
void serialize(WriteBuffer & buf) const
{
writeVarUInt(set.size(), buf);
for (const auto & elem : set)
writeStringBinary(elem.getValue(), buf);
}
void deserialize(ReadBuffer & buf, Arena * arena)
{
size_t size;
readVarUInt(size, buf);
for (size_t i = 0; i < size; ++i)
set.insert(readStringBinaryInto(*arena, buf));
}
};
template <bool is_plain_column>
struct AggregateFunctionDistinctSingleGenericData : public AggregateFunctionDistinctGenericData
{
void add(const IColumn ** columns, size_t /* columns_num */, size_t row_num, Arena * arena)
{
Set::LookupResult it;
bool inserted;
auto key_holder = getKeyHolder<is_plain_column>(*columns[0], row_num, *arena);
set.emplace(key_holder, it, inserted);
}
MutableColumns getArguments(const DataTypes & argument_types) const
{
MutableColumns argument_columns;
argument_columns.emplace_back(argument_types[0]->createColumn());
for (const auto & elem : set)
deserializeAndInsert<is_plain_column>(elem.getValue(), *argument_columns[0]);
return argument_columns;
}
};
struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDistinctGenericData
{
void add(const IColumn ** columns, size_t columns_num, size_t row_num, Arena * arena)
{
const char * begin = nullptr;
StringRef value(begin, 0);
for (size_t i = 0; i < columns_num; ++i)
{
auto cur_ref = columns[i]->serializeValueIntoArena(row_num, *arena, begin);
value.data = cur_ref.data - value.size;
value.size += cur_ref.size;
}
Set::LookupResult it;
bool inserted;
auto key_holder = SerializedKeyHolder{value, *arena};
set.emplace(key_holder, it, inserted);
}
MutableColumns getArguments(const DataTypes & argument_types) const
{
MutableColumns argument_columns(argument_types.size());
for (size_t i = 0; i < argument_types.size(); ++i)
argument_columns[i] = argument_types[i]->createColumn();
for (const auto & elem : set)
{
const char * begin = elem.getValue().data;
for (auto & column : argument_columns)
begin = column->deserializeAndInsertFromArena(begin);
}
return argument_columns;
}
};
/** Adaptor for aggregate functions.
* Adding -Distinct suffix to aggregate function
**/
template <typename Data>
class AggregateFunctionDistinct : public IAggregateFunctionDataHelper<Data, AggregateFunctionDistinct<Data>>
{
private:
static constexpr auto prefix_size = sizeof(Data);
AggregateFunctionPtr nested_func;
size_t arguments_num;
AggregateDataPtr getNestedPlace(AggregateDataPtr place) const noexcept
{
return place + prefix_size;
}
ConstAggregateDataPtr getNestedPlace(ConstAggregateDataPtr place) const noexcept
{
return place + prefix_size;
}
public:
AggregateFunctionDistinct(AggregateFunctionPtr nested_func_, const DataTypes & arguments)
: IAggregateFunctionDataHelper<Data, AggregateFunctionDistinct>(arguments, nested_func_->getParameters())
, nested_func(nested_func_)
, arguments_num(arguments.size()) {}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
this->data(place).add(columns, arguments_num, row_num, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
this->data(place).merge(this->data(rhs), arena);
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const override
{
this->data(place).deserialize(buf, arena);
}
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override
{
auto arguments = this->data(place).getArguments(this->argument_types);
ColumnRawPtrs arguments_raw(arguments.size());
for (size_t i = 0; i < arguments.size(); ++i)
arguments_raw[i] = arguments[i].get();
assert(!arguments.empty());
nested_func->addBatchSinglePlace(arguments[0]->size(), getNestedPlace(place), arguments_raw.data(), arena);
nested_func->insertResultInto(getNestedPlace(place), to, arena);
}
size_t sizeOfData() const override
{
return prefix_size + nested_func->sizeOfData();
}
void create(AggregateDataPtr place) const override
{
new (place) Data;
nested_func->create(getNestedPlace(place));
}
void destroy(AggregateDataPtr place) const noexcept override
{
this->data(place).~Data();
nested_func->destroy(getNestedPlace(place));
}
String getName() const override
{
return nested_func->getName() + "Distinct";
}
DataTypePtr getReturnType() const override
{
return nested_func->getReturnType();
}
bool allocatesMemoryInArena() const override
{
return true;
}
};
}

View File

@ -132,7 +132,7 @@ public:
this->data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & column = assert_cast<ColumnVector<Float64> &>(to);
column.getData().push_back(this->data(place).get());

View File

@ -225,7 +225,7 @@ public:
}
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override
{
AggregateFunctionForEachData & state = data(place);
@ -236,7 +236,7 @@ public:
char * nested_state = state.array_of_aggregate_datas;
for (size_t i = 0; i < state.dynamic_array_size; ++i)
{
nested_func->insertResultInto(nested_state, elems_to);
nested_func->insertResultInto(nested_state, elems_to, arena);
nested_state += nested_size_of_data;
}

View File

@ -282,7 +282,7 @@ public:
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
const auto & value = this->data(place).value;
size_t size = value.size();
@ -600,7 +600,7 @@ public:
// if constexpr (Trait::sampler == Sampler::DETERMINATOR)
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & column_array = assert_cast<ColumnArray &>(to);
@ -815,7 +815,7 @@ public:
data(place).last = prev;
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & column_array = assert_cast<ColumnArray &>(to);

View File

@ -179,7 +179,7 @@ public:
}
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
ColumnArray & to_array = assert_cast<ColumnArray &>(to);
IColumn & to_data = to_array.getData();

View File

@ -158,7 +158,7 @@ public:
this->data(place).sum = value.back();
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
const auto & data = this->data(place);
size_t size = data.value.size();

View File

@ -48,7 +48,7 @@ public:
this->data(place).rbs.read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
}
@ -113,7 +113,7 @@ public:
this->data(place).rbs.read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
}

View File

@ -16,6 +16,7 @@
#include <Common/assert_cast.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/KeyHolderHelpers.h>
#define AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE 0xFFFFFF
@ -97,7 +98,7 @@ public:
this->data(place).value.read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
@ -147,26 +148,6 @@ class AggregateFunctionGroupUniqArrayGeneric
using State = AggregateFunctionGroupUniqArrayGenericData;
static auto getKeyHolder(const IColumn & column, size_t row_num, Arena & arena)
{
if constexpr (is_plain_column)
{
return ArenaKeyHolder{column.getDataAt(row_num), arena};
}
else
{
const char * begin = nullptr;
StringRef serialized = column.serializeValueIntoArena(row_num, arena, begin);
assert(serialized.data != nullptr);
return SerializedKeyHolder{serialized, arena};
}
}
static void deserializeAndInsert(StringRef str, IColumn & data_to)
{
return deserializeAndInsertImpl<is_plain_column>(str, data_to);
}
public:
AggregateFunctionGroupUniqArrayGeneric(const DataTypePtr & input_data_type_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<AggregateFunctionGroupUniqArrayGenericData, AggregateFunctionGroupUniqArrayGeneric<is_plain_column, Tlimit_num_elem>>({input_data_type_}, {})
@ -215,7 +196,7 @@ public:
bool inserted;
State::Set::LookupResult it;
auto key_holder = getKeyHolder(*columns[0], row_num, *arena);
auto key_holder = getKeyHolder<is_plain_column>(*columns[0], row_num, *arena);
set.emplace(key_holder, it, inserted);
}
@ -237,7 +218,7 @@ public:
}
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
@ -247,22 +228,10 @@ public:
offsets_to.push_back(offsets_to.back() + set.size());
for (auto & elem : set)
deserializeAndInsert(elem.getValue(), data_to);
deserializeAndInsert<is_plain_column>(elem.getValue(), data_to);
}
};
template <>
inline void deserializeAndInsertImpl<false>(StringRef str, IColumn & data_to)
{
data_to.deserializeAndInsertFromArena(str.data);
}
template <>
inline void deserializeAndInsertImpl<true>(StringRef str, IColumn & data_to)
{
data_to.insertData(str.data, str.size);
}
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_UNIQ_MAX_SIZE
}

View File

@ -353,7 +353,7 @@ public:
this->data(place).read(buf, max_bins);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & data = this->data(place);

View File

@ -95,9 +95,9 @@ public:
nested_func->deserialize(place, buf, arena);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override
{
nested_func->insertResultInto(place, to);
nested_func->insertResultInto(place, to, arena);
}
bool allocatesMemoryInArena() const override

View File

@ -388,7 +388,7 @@ public:
/** This function is called if aggregate function without State modifier is selected in a query.
* Inserts all weights of the model into the column 'to', so user may use such information if needed
*/
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
this->data(place).returnWeights(to);
}

View File

@ -129,7 +129,7 @@ public:
buf.read(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
Int64 current_intersections = 0;
Int64 max_intersections = 0;

View File

@ -93,9 +93,9 @@ public:
nested_func->deserialize(place, buf, arena);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override
{
nested_func->insertResultInto(place, to);
nested_func->insertResultInto(place, to, arena);
}
bool allocatesMemoryInArena() const override

View File

@ -746,7 +746,7 @@ public:
return Data::allocatesMemoryInArena();
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
this->data(place).insertResultInto(to);
}

View File

@ -67,7 +67,7 @@ public:
{
}
void insertResultInto(AggregateDataPtr, IColumn & to) const override
void insertResultInto(AggregateDataPtr, IColumn & to, Arena *) const override
{
to.insertDefault();
}

View File

@ -150,14 +150,14 @@ public:
}
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const override
{
if constexpr (result_is_nullable)
{
ColumnNullable & to_concrete = assert_cast<ColumnNullable &>(to);
if (getFlag(place))
{
nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn());
nested_function->insertResultInto(nestedPlace(place), to_concrete.getNestedColumn(), arena);
to_concrete.getNullMapData().push_back(0);
}
else
@ -167,7 +167,7 @@ public:
}
else
{
nested_function->insertResultInto(nestedPlace(place), to);
nested_function->insertResultInto(nestedPlace(place), to, arena);
}
}

View File

@ -148,7 +148,8 @@ public:
void insertResultInto(
AggregateDataPtr place,
IColumn & to) const override
IColumn & to,
Arena * arena) const override
{
if (place[size_of_data])
{
@ -157,20 +158,20 @@ public:
// -OrNull
if (inner_nullable)
nested_function->insertResultInto(place, to);
nested_function->insertResultInto(place, to, arena);
else
{
ColumnNullable & col = typeid_cast<ColumnNullable &>(to);
col.getNullMapColumn().insertDefault();
nested_function->insertResultInto(place, col.getNestedColumn());
nested_function->insertResultInto(place, col.getNestedColumn(), arena);
}
}
else
{
// -OrDefault
nested_function->insertResultInto(place, to);
nested_function->insertResultInto(place, to, arena);
}
}
else

View File

@ -138,7 +138,7 @@ public:
this->data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
/// const_cast is required because some data structures apply finalizaton (like sorting) for obtain a result.
auto & data = this->data(place);

View File

@ -174,13 +174,14 @@ public:
void insertResultInto(
AggregateDataPtr place,
IColumn & to) const override
IColumn & to,
Arena * arena) const override
{
auto & col = assert_cast<ColumnArray &>(to);
auto & col_offsets = assert_cast<ColumnArray::ColumnOffsets &>(col.getOffsetsColumn());
for (size_t i = 0; i < total; ++i)
nested_function->insertResultInto(place + i * size_of_data, col.getData());
nested_function->insertResultInto(place + i * size_of_data, col.getData(), arena);
col_offsets.getData().push_back(col.getData().size());
}

View File

@ -123,7 +123,7 @@ public:
this->data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & data_to = assert_cast<ColumnUInt8 &>(assert_cast<ColumnArray &>(to).getData()).getData();
auto & offsets_to = assert_cast<ColumnArray &>(to).getOffsets();

View File

@ -560,7 +560,7 @@ public:
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt8>(); }
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
this->data(place).sort();
@ -588,7 +588,7 @@ public:
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt64>(); }
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
const_cast<Data &>(this->data(place)).sort();
assert_cast<ColumnUInt64 &>(to).getData().push_back(count(place));

View File

@ -170,8 +170,8 @@ public:
void insertResultInto(
AggregateDataPtr place,
IColumn & to
) const override
IColumn & to,
Arena *) const override
{
Ret k = this->data(place).getK();
Ret b = this->data(place).getB(k);

View File

@ -80,7 +80,7 @@ public:
nested_func->deserialize(place, buf, arena);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnAggregateFunction &>(to).getData().push_back(place);
}

View File

@ -143,7 +143,7 @@ public:
this->data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
this->data(place).publish(to);
}
@ -395,7 +395,7 @@ public:
this->data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
this->data(place).publish(to);
}

View File

@ -455,7 +455,7 @@ public:
this->data(place).read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
const auto & data = this->data(place);
auto & dst = static_cast<ColVecResult &>(to).getData();

View File

@ -305,7 +305,7 @@ public:
this->data(place).read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
auto & column = static_cast<ColVecResult &>(to);
column.getData().push_back(this->data(place).get());

View File

@ -246,7 +246,7 @@ public:
}
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
// Final step does compaction of keys that have zero values, this mutates the state
auto & merged_maps = this->data(place).merged_maps;

View File

@ -253,7 +253,7 @@ public:
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).deserialize(buf); }
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
const auto & value = this->data(place).result;
size_t size = value.size();

View File

@ -79,7 +79,7 @@ public:
set.read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
@ -200,7 +200,7 @@ public:
this->data(place).value.merge(this->data(rhs).value);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();

View File

@ -240,7 +240,7 @@ public:
this->data(place).set.read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
@ -294,7 +294,7 @@ public:
this->data(place).set.read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}

View File

@ -167,7 +167,7 @@ public:
this->data(place).set.read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}
@ -229,7 +229,7 @@ public:
this->data(place).set.read(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).set.size());
}

View File

@ -180,7 +180,7 @@ public:
this->data(place).read(buf, threshold);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}
@ -242,7 +242,7 @@ public:
this->data(place).read(buf, threshold);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(this->data(place).size());
}

View File

@ -280,7 +280,7 @@ public:
this->data(place).deserialize(buf);
}
void insertResultInto(AggregateDataPtr place, IColumn & to) const override
void insertResultInto(AggregateDataPtr place, IColumn & to, Arena *) const override
{
assert_cast<ColumnUInt8 &>(to).getData().push_back(getEventLevel(this->data(place)));
}

View File

@ -33,6 +33,19 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
return nullptr;
}
template <template <typename> class AggregateFunctionTemplate, template <typename> class Data, typename... TArgs>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type, TArgs && ... args)
{
WhichDataType which(argument_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<Data<TYPE>>(std::forward<TArgs>(args)...);
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<Data<Int8>>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<Data<Int16>>(std::forward<TArgs>(args)...);
return nullptr;
}
template <template <typename, bool> class AggregateFunctionTemplate, bool bool_param, typename... TArgs>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type, TArgs && ... args)
{

View File

@ -106,7 +106,7 @@ public:
/// Inserts results into a column.
/// This method must be called once, from single thread.
/// After this method was called for state, you can't do anything with state but destroy.
virtual void insertResultInto(AggregateDataPtr place, IColumn & to) const = 0;
virtual void insertResultInto(AggregateDataPtr place, IColumn & to, Arena * arena) const = 0;
/// Used for machine learning methods. Predict result from trained model.
/// Will insert result into `to` column for rows in range [offset, offset + limit).

View File

@ -0,0 +1,34 @@
#pragma once
#include <Common/HashTable/HashTableKeyHolder.h>
#include <Columns/IColumn.h>
namespace DB
{
template <bool is_plain_column = false>
static auto getKeyHolder(const IColumn & column, size_t row_num, Arena & arena)
{
if constexpr (is_plain_column)
{
return ArenaKeyHolder{column.getDataAt(row_num), arena};
}
else
{
const char * begin = nullptr;
StringRef serialized = column.serializeValueIntoArena(row_num, arena, begin);
assert(serialized.data != nullptr);
return SerializedKeyHolder{serialized, arena};
}
}
template <bool is_plain_column>
static void deserializeAndInsert(StringRef str, IColumn & data_to)
{
if constexpr (is_plain_column)
data_to.insertData(str.data, str.size);
else
data_to.deserializeAndInsertFromArena(str.data);
}
}

View File

@ -58,6 +58,7 @@ void registerAggregateFunctions()
registerAggregateFunctionCombinatorNull(factory);
registerAggregateFunctionCombinatorOrFill(factory);
registerAggregateFunctionCombinatorResample(factory);
registerAggregateFunctionCombinatorDistinct(factory);
}
}

View File

@ -45,6 +45,7 @@ void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory
void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorOrFill(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctions();

View File

@ -14,6 +14,7 @@ SRCS(
AggregateFunctionCategoricalInformationValue.cpp
AggregateFunctionCombinatorFactory.cpp
AggregateFunctionCount.cpp
AggregateFunctionDistinct.cpp
AggregateFunctionEntropy.cpp
AggregateFunctionFactory.cpp
AggregateFunctionForEach.cpp

View File

@ -135,7 +135,7 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr colum
res->reserve(data.size());
for (auto * val : data)
func->insertResultInto(val, *res);
func->insertResultInto(val, *res, &column_aggregate_func.createOrGetArena());
return res;
}

View File

@ -188,7 +188,7 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum
for (size_t i = 0; i < input_rows_count; ++i)
if (!res_col_aggregate_function)
agg_func.insertResultInto(places[i], res_col);
agg_func.insertResultInto(places[i], res_col, arena.get());
else
res_col_aggregate_function->insertFrom(places[i]);
block.getByPosition(result).column = std::move(result_holder);

View File

@ -377,7 +377,7 @@ void FunctionArrayReduceInRanges::executeImpl(Block & block, const ColumnNumbers
}
if (!res_col_aggregate_function)
agg_func.insertResultInto(place, result_data);
agg_func.insertResultInto(place, result_data, arena.get());
else
res_col_aggregate_function->insertFrom(place);
}

View File

@ -124,7 +124,7 @@ public:
}
agg_func.merge(place.data(), state_to_add, arena.get());
agg_func.insertResultInto(place.data(), result_column);
agg_func.insertResultInto(place.data(), result_column, arena.get());
++row_number;
}

View File

@ -831,10 +831,11 @@ Block Aggregator::convertOneBucketToBlock(
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final_)
{
convertToBlockImpl(method, method.data.impls[bucket],
key_columns, aggregate_columns, final_aggregate_columns, final_);
key_columns, aggregate_columns, final_aggregate_columns, arena, final_);
});
block.info.bucket_num = bucket;
@ -992,6 +993,7 @@ void Aggregator::convertToBlockImpl(
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final) const
{
if (data.empty())
@ -1001,7 +1003,7 @@ void Aggregator::convertToBlockImpl(
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};
if (final)
convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns);
convertToBlockImplFinal(method, data, key_columns, final_aggregate_columns, arena);
else
convertToBlockImplNotFinal(method, data, key_columns, aggregate_columns);
/// In order to release memory early.
@ -1012,7 +1014,8 @@ void Aggregator::convertToBlockImpl(
template <typename Mapped>
inline void Aggregator::insertAggregatesIntoColumns(
Mapped & mapped,
MutableColumns & final_aggregate_columns) const
MutableColumns & final_aggregate_columns,
Arena * arena) const
{
/** Final values of aggregate functions are inserted to columns.
* Then states of aggregate functions, that are not longer needed, are destroyed.
@ -1043,7 +1046,8 @@ inline void Aggregator::insertAggregatesIntoColumns(
for (; insert_i < params.aggregates_size; ++insert_i)
aggregate_functions[insert_i]->insertResultInto(
mapped + offsets_of_aggregate_states[insert_i],
*final_aggregate_columns[insert_i]);
*final_aggregate_columns[insert_i],
arena);
}
catch (...)
{
@ -1080,21 +1084,22 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
Method & method,
Table & data,
MutableColumns & key_columns,
MutableColumns & final_aggregate_columns) const
MutableColumns & final_aggregate_columns,
Arena * arena) const
{
if constexpr (Method::low_cardinality_optimization)
{
if (data.hasNullKeyData())
{
key_columns[0]->insertDefault();
insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns);
insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns, arena);
}
}
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(key, key_columns, key_sizes);
insertAggregatesIntoColumns(mapped, final_aggregate_columns);
insertAggregatesIntoColumns(mapped, final_aggregate_columns, arena);
});
}
@ -1183,7 +1188,7 @@ Block Aggregator::prepareBlockAndFill(
}
}
filler(key_columns, aggregate_columns_data, final_aggregate_columns, final);
filler(key_columns, aggregate_columns_data, final_aggregate_columns, data_variants.aggregates_pool, final);
Block res = header.cloneEmpty();
@ -1207,6 +1212,7 @@ Block Aggregator::prepareBlockAndFill(
return res;
}
void Aggregator::fillAggregateColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
MutableColumns & final_aggregate_columns)
@ -1249,6 +1255,7 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final_)
{
if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
@ -1263,7 +1270,7 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
}
else
{
insertAggregatesIntoColumns(data, final_aggregate_columns);
insertAggregatesIntoColumns(data, final_aggregate_columns, arena);
}
if (params.overflow_row)
@ -1291,12 +1298,13 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final_)
{
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \
key_columns, aggregate_columns, final_aggregate_columns, final_);
key_columns, aggregate_columns, final_aggregate_columns, arena, final_);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)

View File

@ -1176,19 +1176,22 @@ protected:
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
Arena * arena,
bool final) const;
template <typename Mapped>
void insertAggregatesIntoColumns(
Mapped & mapped,
MutableColumns & final_aggregate_columns) const;
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <typename Method, typename Table>
void convertToBlockImplFinal(
Method & method,
Table & data,
MutableColumns & key_columns,
MutableColumns & final_aggregate_columns) const;
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <typename Method, typename Table>
void convertToBlockImplNotFinal(

View File

@ -89,7 +89,10 @@ struct CustomizeFunctionsData
};
char countdistinct[] = "countdistinct";
using CustomizeFunctionsVisitor = InDepthNodeVisitor<OneTypeMatcher<CustomizeFunctionsData<countdistinct>>, true>;
using CustomizeCountDistinctVisitor = InDepthNodeVisitor<OneTypeMatcher<CustomizeFunctionsData<countdistinct>>, true>;
char countifdistinct[] = "countifdistinct";
using CustomizeCountIfDistinctVisitor = InDepthNodeVisitor<OneTypeMatcher<CustomizeFunctionsData<countifdistinct>>, true>;
char in[] = "in";
using CustomizeInVisitor = InDepthNodeVisitor<OneTypeMatcher<CustomizeFunctionsData<in>>, true>;
@ -103,6 +106,26 @@ using CustomizeGlobalInVisitor = InDepthNodeVisitor<OneTypeMatcher<CustomizeFunc
char globalNotIn[] = "globalnotin";
using CustomizeGlobalNotInVisitor = InDepthNodeVisitor<OneTypeMatcher<CustomizeFunctionsData<globalNotIn>>, true>;
template <char const * func_suffix>
struct CustomizeFunctionsSuffixData
{
using TypeToVisit = ASTFunction;
const String & customized_func_suffix;
void visit(ASTFunction & func, ASTPtr &)
{
if (endsWith(Poco::toLower(func.name), func_suffix))
{
size_t prefix_len = func.name.length() - strlen(func_suffix);
func.name = func.name.substr(0, prefix_len) + customized_func_suffix;
}
}
};
/// Swap 'if' and 'distinct' suffixes to make execution more optimal.
char ifDistinct[] = "ifdistinct";
using CustomizeIfDistinctVisitor = InDepthNodeVisitor<OneTypeMatcher<CustomizeFunctionsSuffixData<ifDistinct>>, true>;
/// Translate qualified names such as db.table.column, table.column, table_alias.column to names' normal form.
/// Expand asterisks and qualified asterisks with column names.
@ -1044,8 +1067,14 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
void SyntaxAnalyzer::normalize(ASTPtr & query, Aliases & aliases, const Settings & settings)
{
CustomizeFunctionsVisitor::Data data{settings.count_distinct_implementation};
CustomizeFunctionsVisitor(data).visit(query);
CustomizeCountDistinctVisitor::Data data_count_distinct{settings.count_distinct_implementation};
CustomizeCountDistinctVisitor(data_count_distinct).visit(query);
CustomizeCountIfDistinctVisitor::Data data_count_if_distinct{settings.count_distinct_implementation.toString() + "If"};
CustomizeCountIfDistinctVisitor(data_count_if_distinct).visit(query);
CustomizeIfDistinctVisitor::Data data_distinct_if{"DistinctIf"};
CustomizeIfDistinctVisitor(data_distinct_if).visit(query);
if (settings.transform_null_in)
{

View File

@ -223,7 +223,7 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::finishGroup()
/// Write the simple aggregation result for the current group.
for (auto & desc : def.columns_to_simple_aggregate)
{
desc.function->insertResultInto(desc.state.data(), *desc.column);
desc.function->insertResultInto(desc.state.data(), *desc.column, arena.get());
desc.destroyState();
}

View File

@ -308,7 +308,7 @@ void GraphiteRollupSortedAlgorithm::GraphiteRollupMergedData::insertRow(
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)
{
aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *value_column);
aggregation_pattern->function->insertResultInto(place_for_aggregate_state.data(), *value_column, nullptr);
aggregation_pattern->function->destroy(place_for_aggregate_state.data());
aggregate_state_created = false;
}

View File

@ -498,7 +498,7 @@ void SummingSortedAlgorithm::SummingMergedData::finishGroup()
{
try
{
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
desc.function->insertResultInto(desc.state.data(), *desc.merged_column, nullptr);
/// Update zero status of current row
if (desc.column_numbers.size() == 1)

View File

@ -0,0 +1,6 @@
<test>
<query>SELECT x, sum(DISTINCT y) from (SELECT number % 12 AS x, number % 12321 AS y FROM numbers(10000000)) GROUP BY x</query>
<query>SELECT x, sum(y) from (SELECT DISTINCT number % 12 AS x, number % 12321 AS y FROM numbers(10000000)) GROUP BY x</query>
<query>SELECT sumIf(DISTINCT number % 10, number % 5 = 2) FROM numbers(100000000)</query>
<query>SELECT countIf(DISTINCT number % 10, number % 5 = 2) FROM numbers(100000000)</query>
</test>

View File

@ -0,0 +1,15 @@
4999950000
78
[0,1,2,3,4,5,6,7,8,9,10,11,12]
20
0.49237
0.49237
15
18
21
24
27
2
SELECT uniqExactIf(number % 10, (number % 5) = 2)\nFROM numbers(10000)
9
SELECT sumDistinctIf(number % 10, (number % 5) = 2)\nFROM numbers(10000)

View File

@ -0,0 +1,15 @@
SELECT sum(DISTINCT number) FROM numbers_mt(100000);
SELECT sum(DISTINCT number % 13) FROM numbers_mt(100000);
SELECT arraySort(groupArray(DISTINCT number % 13)) FROM numbers_mt(100000);
SELECT finalizeAggregation(countState(DISTINCT toString(number % 20))) FROM numbers_mt(100000);
SELECT round(corrStable(DISTINCT x, y), 5) FROM (SELECT number % 10 AS x, number % 5 AS y FROM numbers(1000));
SELECT round(corrStable(x, y), 5) FROM (SELECT DISTINCT number % 10 AS x, number % 5 AS y FROM numbers(1000));
SELECT sum(DISTINCT y) FROM (SELECT number % 5 AS x, number % 15 AS y FROM numbers(1000)) GROUP BY x;
SET enable_debug_queries = 1;
SELECT countIf(DISTINCT number % 10, number % 5 = 2) FROM numbers(10000);
ANALYZE SELECT countIf(DISTINCT number % 10, number % 5 = 2) FROM numbers(10000);
SELECT sumIf(DISTINCT number % 10, number % 5 = 2) FROM numbers(10000);
ANALYZE SELECT sumIf(DISTINCT number % 10, number % 5 = 2) FROM numbers(10000);

View File

@ -0,0 +1,4 @@
78
[0,1,2,3,4,5,6,7,8,9,10,11,12]
20
0.49237

View File

@ -0,0 +1,4 @@
SELECT sum(DISTINCT number % 13) FROM remote('127.0.0.{1,2}', numbers_mt(100000));
SELECT arraySort(groupArray(DISTINCT number % 13)) FROM remote('127.0.0.{1,2}', numbers_mt(100000));
SELECT finalizeAggregation(countState(DISTINCT toString(number % 20))) FROM remote('127.0.0.{1,2}', numbers_mt(100000));
SELECT round(corrStable(DISTINCT x, y), 5) FROM (SELECT number % 10 AS x, number % 5 AS y FROM remote('127.0.0.{1,2}', numbers(1000)));