mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
Remove useless header
This commit is contained in:
parent
9976006ee6
commit
2d1351c3a6
@ -1,9 +1,23 @@
|
|||||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||||
#include <AggregateFunctions/AggregateFunctionSumMap.h>
|
|
||||||
#include <AggregateFunctions/Helpers.h>
|
#include <AggregateFunctions/Helpers.h>
|
||||||
#include <AggregateFunctions/FactoryHelpers.h>
|
|
||||||
#include <Functions/FunctionHelpers.h>
|
#include <Functions/FunctionHelpers.h>
|
||||||
#include <IO/WriteHelpers.h>
|
|
||||||
|
#include <IO/ReadHelpers.h>
|
||||||
|
|
||||||
|
#include <DataTypes/DataTypeArray.h>
|
||||||
|
#include <DataTypes/DataTypeTuple.h>
|
||||||
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
|
|
||||||
|
#include <Columns/ColumnArray.h>
|
||||||
|
#include <Columns/ColumnTuple.h>
|
||||||
|
#include <Columns/ColumnDecimal.h>
|
||||||
|
#include <Columns/ColumnString.h>
|
||||||
|
|
||||||
|
#include <Common/FieldVisitorSum.h>
|
||||||
|
#include <Common/assert_cast.h>
|
||||||
|
#include <AggregateFunctions/IAggregateFunction.h>
|
||||||
|
#include <AggregateFunctions/FactoryHelpers.h>
|
||||||
|
#include <map>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -12,13 +26,636 @@ struct Settings;
|
|||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||||
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
struct AggregateFunctionMapData
|
||||||
|
{
|
||||||
|
// Map needs to be ordered to maintain function properties
|
||||||
|
std::map<T, Array> merged_maps;
|
||||||
|
};
|
||||||
|
|
||||||
|
/** Aggregate function, that takes at least two arguments: keys and values, and as a result, builds a tuple of at least 2 arrays -
|
||||||
|
* ordered keys and variable number of argument values aggregated by corresponding keys.
|
||||||
|
*
|
||||||
|
* sumMap function is the most useful when using SummingMergeTree to sum Nested columns, which name ends in "Map".
|
||||||
|
*
|
||||||
|
* Example: sumMap(k, v...) of:
|
||||||
|
* k v
|
||||||
|
* [1,2,3] [10,10,10]
|
||||||
|
* [3,4,5] [10,10,10]
|
||||||
|
* [4,5,6] [10,10,10]
|
||||||
|
* [6,7,8] [10,10,10]
|
||||||
|
* [7,5,3] [5,15,25]
|
||||||
|
* [8,9,10] [20,20,20]
|
||||||
|
* will return:
|
||||||
|
* ([1,2,3,4,5,6,7,8,9,10],[10,10,45,20,35,20,15,30,20,20])
|
||||||
|
*
|
||||||
|
* minMap and maxMap share the same idea, but calculate min and max correspondingly.
|
||||||
|
*
|
||||||
|
* NOTE: The implementation of these functions are "amateur grade" - not efficient and low quality.
|
||||||
|
*/
|
||||||
|
|
||||||
|
template <typename T, typename Derived, typename Visitor, bool overflow, bool tuple_argument, bool compact>
|
||||||
|
class AggregateFunctionMapBase : public IAggregateFunctionDataHelper<
|
||||||
|
AggregateFunctionMapData<NearestFieldType<T>>, Derived>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
static constexpr auto STATE_VERSION_1_MIN_REVISION = 54452;
|
||||||
|
|
||||||
|
DataTypePtr keys_type;
|
||||||
|
SerializationPtr keys_serialization;
|
||||||
|
DataTypes values_types;
|
||||||
|
Serializations values_serializations;
|
||||||
|
Serializations promoted_values_serializations;
|
||||||
|
|
||||||
|
public:
|
||||||
|
using Base = IAggregateFunctionDataHelper<
|
||||||
|
AggregateFunctionMapData<NearestFieldType<T>>, Derived>;
|
||||||
|
|
||||||
|
AggregateFunctionMapBase(const DataTypePtr & keys_type_,
|
||||||
|
const DataTypes & values_types_, const DataTypes & argument_types_)
|
||||||
|
: Base(argument_types_, {} /* parameters */, createResultType(keys_type_, values_types_, getName()))
|
||||||
|
, keys_type(keys_type_)
|
||||||
|
, keys_serialization(keys_type->getDefaultSerialization())
|
||||||
|
, values_types(values_types_)
|
||||||
|
{
|
||||||
|
values_serializations.reserve(values_types.size());
|
||||||
|
promoted_values_serializations.reserve(values_types.size());
|
||||||
|
for (const auto & type : values_types)
|
||||||
|
{
|
||||||
|
values_serializations.emplace_back(type->getDefaultSerialization());
|
||||||
|
if (type->canBePromoted())
|
||||||
|
{
|
||||||
|
if (type->isNullable())
|
||||||
|
promoted_values_serializations.emplace_back(
|
||||||
|
makeNullable(removeNullable(type)->promoteNumericType())->getDefaultSerialization());
|
||||||
|
else
|
||||||
|
promoted_values_serializations.emplace_back(type->promoteNumericType()->getDefaultSerialization());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
promoted_values_serializations.emplace_back(type->getDefaultSerialization());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isVersioned() const override { return true; }
|
||||||
|
|
||||||
|
size_t getDefaultVersion() const override { return 1; }
|
||||||
|
|
||||||
|
size_t getVersionFromRevision(size_t revision) const override
|
||||||
|
{
|
||||||
|
if (revision >= STATE_VERSION_1_MIN_REVISION)
|
||||||
|
return 1;
|
||||||
|
else
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static DataTypePtr createResultType(
|
||||||
|
const DataTypePtr & keys_type_,
|
||||||
|
const DataTypes & values_types_,
|
||||||
|
const String & name_)
|
||||||
|
{
|
||||||
|
DataTypes types;
|
||||||
|
types.emplace_back(std::make_shared<DataTypeArray>(keys_type_));
|
||||||
|
|
||||||
|
for (const auto & value_type : values_types_)
|
||||||
|
{
|
||||||
|
if constexpr (std::is_same_v<Visitor, FieldVisitorSum>)
|
||||||
|
{
|
||||||
|
if (!value_type->isSummable())
|
||||||
|
throw Exception{ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||||
|
"Values for {} cannot be summed, passed type {}",
|
||||||
|
name_, value_type->getName()};
|
||||||
|
}
|
||||||
|
|
||||||
|
DataTypePtr result_type;
|
||||||
|
|
||||||
|
if constexpr (overflow)
|
||||||
|
{
|
||||||
|
if (value_type->onlyNull())
|
||||||
|
throw Exception{ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||||
|
"Cannot calculate {} of type {}",
|
||||||
|
name_, value_type->getName()};
|
||||||
|
|
||||||
|
// Overflow, meaning that the returned type is the same as
|
||||||
|
// the input type. Nulls are skipped.
|
||||||
|
result_type = removeNullable(value_type);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
auto value_type_without_nullable = removeNullable(value_type);
|
||||||
|
|
||||||
|
// No overflow, meaning we promote the types if necessary.
|
||||||
|
if (!value_type_without_nullable->canBePromoted())
|
||||||
|
throw Exception{ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||||
|
"Values for {} are expected to be Numeric, Float or Decimal, passed type {}",
|
||||||
|
name_, value_type->getName()};
|
||||||
|
|
||||||
|
WhichDataType value_type_to_check(value_type_without_nullable);
|
||||||
|
|
||||||
|
/// Do not promote decimal because of implementation issues of this function design
|
||||||
|
/// Currently we cannot get result column type in case of decimal we cannot get decimal scale
|
||||||
|
/// in method void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
||||||
|
/// If we decide to make this function more efficient we should promote decimal type during summ
|
||||||
|
if (value_type_to_check.isDecimal())
|
||||||
|
result_type = value_type_without_nullable;
|
||||||
|
else
|
||||||
|
result_type = value_type_without_nullable->promoteNumericType();
|
||||||
|
}
|
||||||
|
|
||||||
|
types.emplace_back(std::make_shared<DataTypeArray>(result_type));
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<DataTypeTuple>(types);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool allocatesMemoryInArena() const override { return false; }
|
||||||
|
|
||||||
|
static const auto & getArgumentColumns(const IColumn**& columns)
|
||||||
|
{
|
||||||
|
if constexpr (tuple_argument)
|
||||||
|
{
|
||||||
|
return assert_cast<const ColumnTuple *>(columns[0])->getColumns();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return columns;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void add(AggregateDataPtr __restrict place, const IColumn ** columns_, const size_t row_num, Arena *) const override
|
||||||
|
{
|
||||||
|
const auto & columns = getArgumentColumns(columns_);
|
||||||
|
|
||||||
|
// Column 0 contains array of keys of known type
|
||||||
|
const ColumnArray & array_column0 = assert_cast<const ColumnArray &>(*columns[0]);
|
||||||
|
const IColumn::Offsets & offsets0 = array_column0.getOffsets();
|
||||||
|
const IColumn & key_column = array_column0.getData();
|
||||||
|
const size_t keys_vec_offset = offsets0[row_num - 1];
|
||||||
|
const size_t keys_vec_size = (offsets0[row_num] - keys_vec_offset);
|
||||||
|
|
||||||
|
// Columns 1..n contain arrays of numeric values to sum
|
||||||
|
auto & merged_maps = this->data(place).merged_maps;
|
||||||
|
for (size_t col = 0, size = values_types.size(); col < size; ++col)
|
||||||
|
{
|
||||||
|
const auto & array_column = assert_cast<const ColumnArray &>(*columns[col + 1]);
|
||||||
|
const IColumn & value_column = array_column.getData();
|
||||||
|
const IColumn::Offsets & offsets = array_column.getOffsets();
|
||||||
|
const size_t values_vec_offset = offsets[row_num - 1];
|
||||||
|
const size_t values_vec_size = (offsets[row_num] - values_vec_offset);
|
||||||
|
|
||||||
|
// Expect key and value arrays to be of same length
|
||||||
|
if (keys_vec_size != values_vec_size)
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Sizes of keys and values arrays do not match");
|
||||||
|
|
||||||
|
// Insert column values for all keys
|
||||||
|
for (size_t i = 0; i < keys_vec_size; ++i)
|
||||||
|
{
|
||||||
|
auto value = value_column[values_vec_offset + i];
|
||||||
|
T key = static_cast<T>(key_column[keys_vec_offset + i].get<T>());
|
||||||
|
|
||||||
|
if (!keepKey(key))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
decltype(merged_maps.begin()) it;
|
||||||
|
if constexpr (is_decimal<T>)
|
||||||
|
{
|
||||||
|
// FIXME why is storing NearestFieldType not enough, and we
|
||||||
|
// have to check for decimals again here?
|
||||||
|
UInt32 scale = static_cast<const ColumnDecimal<T> &>(key_column).getScale();
|
||||||
|
it = merged_maps.find(DecimalField<T>(key, scale));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
it = merged_maps.find(key);
|
||||||
|
|
||||||
|
if (it != merged_maps.end())
|
||||||
|
{
|
||||||
|
if (!value.isNull())
|
||||||
|
{
|
||||||
|
if (it->second[col].isNull())
|
||||||
|
it->second[col] = value;
|
||||||
|
else
|
||||||
|
applyVisitor(Visitor(value), it->second[col]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Create a value array for this key
|
||||||
|
Array new_values;
|
||||||
|
new_values.resize(size);
|
||||||
|
new_values[col] = value;
|
||||||
|
|
||||||
|
if constexpr (is_decimal<T>)
|
||||||
|
{
|
||||||
|
UInt32 scale = static_cast<const ColumnDecimal<T> &>(key_column).getScale();
|
||||||
|
merged_maps.emplace(DecimalField<T>(key, scale), std::move(new_values));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
merged_maps.emplace(key, std::move(new_values));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
|
||||||
|
{
|
||||||
|
auto & merged_maps = this->data(place).merged_maps;
|
||||||
|
const auto & rhs_maps = this->data(rhs).merged_maps;
|
||||||
|
|
||||||
|
for (const auto & elem : rhs_maps)
|
||||||
|
{
|
||||||
|
const auto & it = merged_maps.find(elem.first);
|
||||||
|
if (it != merged_maps.end())
|
||||||
|
{
|
||||||
|
for (size_t col = 0; col < values_types.size(); ++col)
|
||||||
|
if (!elem.second[col].isNull())
|
||||||
|
applyVisitor(Visitor(elem.second[col]), it->second[col]);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
merged_maps[elem.first] = elem.second;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
|
||||||
|
{
|
||||||
|
if (!version)
|
||||||
|
version = getDefaultVersion();
|
||||||
|
|
||||||
|
const auto & merged_maps = this->data(place).merged_maps;
|
||||||
|
size_t size = merged_maps.size();
|
||||||
|
writeVarUInt(size, buf);
|
||||||
|
|
||||||
|
std::function<void(size_t, const Array &)> serialize;
|
||||||
|
switch (*version)
|
||||||
|
{
|
||||||
|
case 0:
|
||||||
|
{
|
||||||
|
serialize = [&](size_t col_idx, const Array & values){ values_serializations[col_idx]->serializeBinary(values[col_idx], buf, {}); };
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
{
|
||||||
|
serialize = [&](size_t col_idx, const Array & values){ promoted_values_serializations[col_idx]->serializeBinary(values[col_idx], buf, {}); };
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto & elem : merged_maps)
|
||||||
|
{
|
||||||
|
keys_serialization->serializeBinary(elem.first, buf, {});
|
||||||
|
for (size_t col = 0; col < values_types.size(); ++col)
|
||||||
|
serialize(col, elem.second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version, Arena *) const override
|
||||||
|
{
|
||||||
|
if (!version)
|
||||||
|
version = getDefaultVersion();
|
||||||
|
|
||||||
|
auto & merged_maps = this->data(place).merged_maps;
|
||||||
|
size_t size = 0;
|
||||||
|
readVarUInt(size, buf);
|
||||||
|
|
||||||
|
std::function<void(size_t, Array &)> deserialize;
|
||||||
|
switch (*version)
|
||||||
|
{
|
||||||
|
case 0:
|
||||||
|
{
|
||||||
|
deserialize = [&](size_t col_idx, Array & values){ values_serializations[col_idx]->deserializeBinary(values[col_idx], buf, {}); };
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
{
|
||||||
|
deserialize = [&](size_t col_idx, Array & values){ promoted_values_serializations[col_idx]->deserializeBinary(values[col_idx], buf, {}); };
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < size; ++i)
|
||||||
|
{
|
||||||
|
Field key;
|
||||||
|
keys_serialization->deserializeBinary(key, buf, {});
|
||||||
|
|
||||||
|
Array values;
|
||||||
|
values.resize(values_types.size());
|
||||||
|
|
||||||
|
for (size_t col = 0; col < values_types.size(); ++col)
|
||||||
|
deserialize(col, values);
|
||||||
|
|
||||||
|
if constexpr (is_decimal<T>)
|
||||||
|
merged_maps[key.get<DecimalField<T>>()] = values;
|
||||||
|
else
|
||||||
|
merged_maps[key.get<T>()] = values;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
||||||
|
{
|
||||||
|
size_t num_columns = values_types.size();
|
||||||
|
|
||||||
|
// Final step does compaction of keys that have zero values, this mutates the state
|
||||||
|
auto & merged_maps = this->data(place).merged_maps;
|
||||||
|
|
||||||
|
// Remove keys which are zeros or empty. This should be enabled only for sumMap.
|
||||||
|
if constexpr (compact)
|
||||||
|
{
|
||||||
|
for (auto it = merged_maps.cbegin(); it != merged_maps.cend();)
|
||||||
|
{
|
||||||
|
// Key is not compacted if it has at least one non-zero value
|
||||||
|
bool erase = true;
|
||||||
|
for (size_t col = 0; col < num_columns; ++col)
|
||||||
|
{
|
||||||
|
if (!it->second[col].isNull() && it->second[col] != values_types[col]->getDefault())
|
||||||
|
{
|
||||||
|
erase = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (erase)
|
||||||
|
it = merged_maps.erase(it);
|
||||||
|
else
|
||||||
|
++it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t size = merged_maps.size();
|
||||||
|
|
||||||
|
auto & to_tuple = assert_cast<ColumnTuple &>(to);
|
||||||
|
auto & to_keys_arr = assert_cast<ColumnArray &>(to_tuple.getColumn(0));
|
||||||
|
auto & to_keys_col = to_keys_arr.getData();
|
||||||
|
|
||||||
|
// Advance column offsets
|
||||||
|
auto & to_keys_offsets = to_keys_arr.getOffsets();
|
||||||
|
to_keys_offsets.push_back(to_keys_offsets.back() + size);
|
||||||
|
to_keys_col.reserve(size);
|
||||||
|
|
||||||
|
for (size_t col = 0; col < num_columns; ++col)
|
||||||
|
{
|
||||||
|
auto & to_values_arr = assert_cast<ColumnArray &>(to_tuple.getColumn(col + 1));
|
||||||
|
auto & to_values_offsets = to_values_arr.getOffsets();
|
||||||
|
to_values_offsets.push_back(to_values_offsets.back() + size);
|
||||||
|
to_values_arr.getData().reserve(size);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write arrays of keys and values
|
||||||
|
for (const auto & elem : merged_maps)
|
||||||
|
{
|
||||||
|
// Write array of keys into column
|
||||||
|
to_keys_col.insert(elem.first);
|
||||||
|
|
||||||
|
// Write 0..n arrays of values
|
||||||
|
for (size_t col = 0; col < num_columns; ++col)
|
||||||
|
{
|
||||||
|
auto & to_values_col = assert_cast<ColumnArray &>(to_tuple.getColumn(col + 1)).getData();
|
||||||
|
if (elem.second[col].isNull())
|
||||||
|
to_values_col.insertDefault();
|
||||||
|
else
|
||||||
|
to_values_col.insert(elem.second[col]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool keepKey(const T & key) const { return static_cast<const Derived &>(*this).keepKey(key); }
|
||||||
|
String getName() const override { return Derived::getNameImpl(); }
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename T, bool overflow, bool tuple_argument>
|
||||||
|
class AggregateFunctionSumMap final :
|
||||||
|
public AggregateFunctionMapBase<T, AggregateFunctionSumMap<T, overflow, tuple_argument>, FieldVisitorSum, overflow, tuple_argument, true>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
using Self = AggregateFunctionSumMap<T, overflow, tuple_argument>;
|
||||||
|
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorSum, overflow, tuple_argument, true>;
|
||||||
|
|
||||||
|
public:
|
||||||
|
AggregateFunctionSumMap(const DataTypePtr & keys_type_,
|
||||||
|
DataTypes & values_types_, const DataTypes & argument_types_,
|
||||||
|
const Array & params_)
|
||||||
|
: Base{keys_type_, values_types_, argument_types_}
|
||||||
|
{
|
||||||
|
// The constructor accepts parameters to have a uniform interface with
|
||||||
|
// sumMapFiltered, but this function doesn't have any parameters.
|
||||||
|
assertNoParameters(getNameImpl(), params_);
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getNameImpl()
|
||||||
|
{
|
||||||
|
if constexpr (overflow)
|
||||||
|
{
|
||||||
|
return "sumMapWithOverflow";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return "sumMap";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool keepKey(const T &) const { return true; }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
template <typename T, bool overflow, bool tuple_argument>
|
||||||
|
class AggregateFunctionSumMapFiltered final :
|
||||||
|
public AggregateFunctionMapBase<T,
|
||||||
|
AggregateFunctionSumMapFiltered<T, overflow, tuple_argument>,
|
||||||
|
FieldVisitorSum,
|
||||||
|
overflow,
|
||||||
|
tuple_argument,
|
||||||
|
true>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
using Self = AggregateFunctionSumMapFiltered<T, overflow, tuple_argument>;
|
||||||
|
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorSum, overflow, tuple_argument, true>;
|
||||||
|
|
||||||
|
using ContainerT = std::unordered_set<T>;
|
||||||
|
|
||||||
|
ContainerT keys_to_keep;
|
||||||
|
|
||||||
|
public:
|
||||||
|
AggregateFunctionSumMapFiltered(const DataTypePtr & keys_type_,
|
||||||
|
const DataTypes & values_types_, const DataTypes & argument_types_,
|
||||||
|
const Array & params_)
|
||||||
|
: Base{keys_type_, values_types_, argument_types_}
|
||||||
|
{
|
||||||
|
if (params_.size() != 1)
|
||||||
|
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||||
|
"Aggregate function '{}' requires exactly one parameter "
|
||||||
|
"of Array type", getNameImpl());
|
||||||
|
|
||||||
|
Array keys_to_keep_values;
|
||||||
|
if (!params_.front().tryGet<Array>(keys_to_keep_values))
|
||||||
|
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
||||||
|
"Aggregate function {} requires an Array as a parameter",
|
||||||
|
getNameImpl());
|
||||||
|
|
||||||
|
this->parameters = params_;
|
||||||
|
|
||||||
|
keys_to_keep.reserve(keys_to_keep_values.size());
|
||||||
|
|
||||||
|
for (const Field & f : keys_to_keep_values)
|
||||||
|
keys_to_keep.emplace(f.safeGet<T>());
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getNameImpl()
|
||||||
|
{
|
||||||
|
if constexpr (overflow)
|
||||||
|
{
|
||||||
|
return "sumMapFilteredWithOverflow";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return "sumMapFiltered";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool keepKey(const T & key) const { return keys_to_keep.count(key); }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/** Implements `Max` operation.
|
||||||
|
* Returns true if changed
|
||||||
|
*/
|
||||||
|
class FieldVisitorMax : public StaticVisitor<bool>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
const Field & rhs;
|
||||||
|
|
||||||
|
template <typename FieldType>
|
||||||
|
bool compareImpl(FieldType & x) const
|
||||||
|
{
|
||||||
|
auto val = rhs.get<FieldType>();
|
||||||
|
if (val > x)
|
||||||
|
{
|
||||||
|
x = val;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit FieldVisitorMax(const Field & rhs_) : rhs(rhs_) {}
|
||||||
|
|
||||||
|
bool operator() (Null &) const
|
||||||
|
{
|
||||||
|
/// Do not update current value, skip nulls
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator() (AggregateFunctionStateData &) const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot compare AggregateFunctionStates"); }
|
||||||
|
|
||||||
|
bool operator() (Array & x) const { return compareImpl<Array>(x); }
|
||||||
|
bool operator() (Tuple & x) const { return compareImpl<Tuple>(x); }
|
||||||
|
template <typename T>
|
||||||
|
bool operator() (DecimalField<T> & x) const { return compareImpl<DecimalField<T>>(x); }
|
||||||
|
template <typename T>
|
||||||
|
bool operator() (T & x) const { return compareImpl<T>(x); }
|
||||||
|
};
|
||||||
|
|
||||||
|
/** Implements `Min` operation.
|
||||||
|
* Returns true if changed
|
||||||
|
*/
|
||||||
|
class FieldVisitorMin : public StaticVisitor<bool>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
const Field & rhs;
|
||||||
|
|
||||||
|
template <typename FieldType>
|
||||||
|
bool compareImpl(FieldType & x) const
|
||||||
|
{
|
||||||
|
auto val = rhs.get<FieldType>();
|
||||||
|
if (val < x)
|
||||||
|
{
|
||||||
|
x = val;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit FieldVisitorMin(const Field & rhs_) : rhs(rhs_) {}
|
||||||
|
|
||||||
|
|
||||||
|
bool operator() (Null &) const
|
||||||
|
{
|
||||||
|
/// Do not update current value, skip nulls
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator() (AggregateFunctionStateData &) const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot sum AggregateFunctionStates"); }
|
||||||
|
|
||||||
|
bool operator() (Array & x) const { return compareImpl<Array>(x); }
|
||||||
|
bool operator() (Tuple & x) const { return compareImpl<Tuple>(x); }
|
||||||
|
template <typename T>
|
||||||
|
bool operator() (DecimalField<T> & x) const { return compareImpl<DecimalField<T>>(x); }
|
||||||
|
template <typename T>
|
||||||
|
bool operator() (T & x) const { return compareImpl<T>(x); }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
template <typename T, bool tuple_argument>
|
||||||
|
class AggregateFunctionMinMap final :
|
||||||
|
public AggregateFunctionMapBase<T, AggregateFunctionMinMap<T, tuple_argument>, FieldVisitorMin, true, tuple_argument, false>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
using Self = AggregateFunctionMinMap<T, tuple_argument>;
|
||||||
|
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorMin, true, tuple_argument, false>;
|
||||||
|
|
||||||
|
public:
|
||||||
|
AggregateFunctionMinMap(const DataTypePtr & keys_type_,
|
||||||
|
DataTypes & values_types_, const DataTypes & argument_types_,
|
||||||
|
const Array & params_)
|
||||||
|
: Base{keys_type_, values_types_, argument_types_}
|
||||||
|
{
|
||||||
|
// The constructor accepts parameters to have a uniform interface with
|
||||||
|
// sumMapFiltered, but this function doesn't have any parameters.
|
||||||
|
assertNoParameters(getNameImpl(), params_);
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getNameImpl() { return "minMap"; }
|
||||||
|
|
||||||
|
bool keepKey(const T &) const { return true; }
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename T, bool tuple_argument>
|
||||||
|
class AggregateFunctionMaxMap final :
|
||||||
|
public AggregateFunctionMapBase<T, AggregateFunctionMaxMap<T, tuple_argument>, FieldVisitorMax, true, tuple_argument, false>
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
using Self = AggregateFunctionMaxMap<T, tuple_argument>;
|
||||||
|
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorMax, true, tuple_argument, false>;
|
||||||
|
|
||||||
|
public:
|
||||||
|
AggregateFunctionMaxMap(const DataTypePtr & keys_type_,
|
||||||
|
DataTypes & values_types_, const DataTypes & argument_types_,
|
||||||
|
const Array & params_)
|
||||||
|
: Base{keys_type_, values_types_, argument_types_}
|
||||||
|
{
|
||||||
|
// The constructor accepts parameters to have a uniform interface with
|
||||||
|
// sumMapFiltered, but this function doesn't have any parameters.
|
||||||
|
assertNoParameters(getNameImpl(), params_);
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getNameImpl() { return "maxMap"; }
|
||||||
|
|
||||||
|
bool keepKey(const T &) const { return true; }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
auto parseArguments(const std::string & name, const DataTypes & arguments)
|
auto parseArguments(const std::string & name, const DataTypes & arguments)
|
||||||
{
|
{
|
||||||
DataTypes args;
|
DataTypes args;
|
||||||
|
@ -1,656 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <IO/WriteHelpers.h>
|
|
||||||
#include <IO/ReadHelpers.h>
|
|
||||||
|
|
||||||
#include <DataTypes/DataTypeArray.h>
|
|
||||||
#include <DataTypes/DataTypeTuple.h>
|
|
||||||
#include <DataTypes/DataTypeNullable.h>
|
|
||||||
|
|
||||||
#include <Columns/ColumnArray.h>
|
|
||||||
#include <Columns/ColumnTuple.h>
|
|
||||||
#include <Columns/ColumnVector.h>
|
|
||||||
#include <Columns/ColumnDecimal.h>
|
|
||||||
#include <Columns/ColumnString.h>
|
|
||||||
|
|
||||||
#include <Common/FieldVisitorSum.h>
|
|
||||||
#include <Common/assert_cast.h>
|
|
||||||
#include <AggregateFunctions/IAggregateFunction.h>
|
|
||||||
#include <AggregateFunctions/FactoryHelpers.h>
|
|
||||||
#include <map>
|
|
||||||
#include <Common/ClickHouseRevision.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
struct Settings;
|
|
||||||
|
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int BAD_ARGUMENTS;
|
|
||||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
|
||||||
extern const int LOGICAL_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename T>
|
|
||||||
struct AggregateFunctionMapData
|
|
||||||
{
|
|
||||||
// Map needs to be ordered to maintain function properties
|
|
||||||
std::map<T, Array> merged_maps;
|
|
||||||
};
|
|
||||||
|
|
||||||
/** Aggregate function, that takes at least two arguments: keys and values, and as a result, builds a tuple of at least 2 arrays -
|
|
||||||
* ordered keys and variable number of argument values aggregated by corresponding keys.
|
|
||||||
*
|
|
||||||
* sumMap function is the most useful when using SummingMergeTree to sum Nested columns, which name ends in "Map".
|
|
||||||
*
|
|
||||||
* Example: sumMap(k, v...) of:
|
|
||||||
* k v
|
|
||||||
* [1,2,3] [10,10,10]
|
|
||||||
* [3,4,5] [10,10,10]
|
|
||||||
* [4,5,6] [10,10,10]
|
|
||||||
* [6,7,8] [10,10,10]
|
|
||||||
* [7,5,3] [5,15,25]
|
|
||||||
* [8,9,10] [20,20,20]
|
|
||||||
* will return:
|
|
||||||
* ([1,2,3,4,5,6,7,8,9,10],[10,10,45,20,35,20,15,30,20,20])
|
|
||||||
*
|
|
||||||
* minMap and maxMap share the same idea, but calculate min and max correspondingly.
|
|
||||||
*
|
|
||||||
* NOTE: The implementation of these functions are "amateur grade" - not efficient and low quality.
|
|
||||||
*/
|
|
||||||
|
|
||||||
template <typename T, typename Derived, typename Visitor, bool overflow, bool tuple_argument, bool compact>
|
|
||||||
class AggregateFunctionMapBase : public IAggregateFunctionDataHelper<
|
|
||||||
AggregateFunctionMapData<NearestFieldType<T>>, Derived>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
static constexpr auto STATE_VERSION_1_MIN_REVISION = 54452;
|
|
||||||
|
|
||||||
DataTypePtr keys_type;
|
|
||||||
SerializationPtr keys_serialization;
|
|
||||||
DataTypes values_types;
|
|
||||||
Serializations values_serializations;
|
|
||||||
Serializations promoted_values_serializations;
|
|
||||||
|
|
||||||
public:
|
|
||||||
using Base = IAggregateFunctionDataHelper<
|
|
||||||
AggregateFunctionMapData<NearestFieldType<T>>, Derived>;
|
|
||||||
|
|
||||||
AggregateFunctionMapBase(const DataTypePtr & keys_type_,
|
|
||||||
const DataTypes & values_types_, const DataTypes & argument_types_)
|
|
||||||
: Base(argument_types_, {} /* parameters */, createResultType(keys_type_, values_types_, getName()))
|
|
||||||
, keys_type(keys_type_)
|
|
||||||
, keys_serialization(keys_type->getDefaultSerialization())
|
|
||||||
, values_types(values_types_)
|
|
||||||
{
|
|
||||||
values_serializations.reserve(values_types.size());
|
|
||||||
promoted_values_serializations.reserve(values_types.size());
|
|
||||||
for (const auto & type : values_types)
|
|
||||||
{
|
|
||||||
values_serializations.emplace_back(type->getDefaultSerialization());
|
|
||||||
if (type->canBePromoted())
|
|
||||||
{
|
|
||||||
if (type->isNullable())
|
|
||||||
promoted_values_serializations.emplace_back(
|
|
||||||
makeNullable(removeNullable(type)->promoteNumericType())->getDefaultSerialization());
|
|
||||||
else
|
|
||||||
promoted_values_serializations.emplace_back(type->promoteNumericType()->getDefaultSerialization());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
promoted_values_serializations.emplace_back(type->getDefaultSerialization());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool isVersioned() const override { return true; }
|
|
||||||
|
|
||||||
size_t getDefaultVersion() const override { return 1; }
|
|
||||||
|
|
||||||
size_t getVersionFromRevision(size_t revision) const override
|
|
||||||
{
|
|
||||||
if (revision >= STATE_VERSION_1_MIN_REVISION)
|
|
||||||
return 1;
|
|
||||||
else
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static DataTypePtr createResultType(
|
|
||||||
const DataTypePtr & keys_type_,
|
|
||||||
const DataTypes & values_types_,
|
|
||||||
const String & name_)
|
|
||||||
{
|
|
||||||
DataTypes types;
|
|
||||||
types.emplace_back(std::make_shared<DataTypeArray>(keys_type_));
|
|
||||||
|
|
||||||
for (const auto & value_type : values_types_)
|
|
||||||
{
|
|
||||||
if constexpr (std::is_same_v<Visitor, FieldVisitorSum>)
|
|
||||||
{
|
|
||||||
if (!value_type->isSummable())
|
|
||||||
throw Exception{ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
|
||||||
"Values for {} cannot be summed, passed type {}",
|
|
||||||
name_, value_type->getName()};
|
|
||||||
}
|
|
||||||
|
|
||||||
DataTypePtr result_type;
|
|
||||||
|
|
||||||
if constexpr (overflow)
|
|
||||||
{
|
|
||||||
if (value_type->onlyNull())
|
|
||||||
throw Exception{ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
|
||||||
"Cannot calculate {} of type {}",
|
|
||||||
name_, value_type->getName()};
|
|
||||||
|
|
||||||
// Overflow, meaning that the returned type is the same as
|
|
||||||
// the input type. Nulls are skipped.
|
|
||||||
result_type = removeNullable(value_type);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
auto value_type_without_nullable = removeNullable(value_type);
|
|
||||||
|
|
||||||
// No overflow, meaning we promote the types if necessary.
|
|
||||||
if (!value_type_without_nullable->canBePromoted())
|
|
||||||
throw Exception{ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
|
||||||
"Values for {} are expected to be Numeric, Float or Decimal, passed type {}",
|
|
||||||
name_, value_type->getName()};
|
|
||||||
|
|
||||||
WhichDataType value_type_to_check(value_type_without_nullable);
|
|
||||||
|
|
||||||
/// Do not promote decimal because of implementation issues of this function design
|
|
||||||
/// Currently we cannot get result column type in case of decimal we cannot get decimal scale
|
|
||||||
/// in method void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
|
||||||
/// If we decide to make this function more efficient we should promote decimal type during summ
|
|
||||||
if (value_type_to_check.isDecimal())
|
|
||||||
result_type = value_type_without_nullable;
|
|
||||||
else
|
|
||||||
result_type = value_type_without_nullable->promoteNumericType();
|
|
||||||
}
|
|
||||||
|
|
||||||
types.emplace_back(std::make_shared<DataTypeArray>(result_type));
|
|
||||||
}
|
|
||||||
|
|
||||||
return std::make_shared<DataTypeTuple>(types);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool allocatesMemoryInArena() const override { return false; }
|
|
||||||
|
|
||||||
static const auto & getArgumentColumns(const IColumn**& columns)
|
|
||||||
{
|
|
||||||
if constexpr (tuple_argument)
|
|
||||||
{
|
|
||||||
return assert_cast<const ColumnTuple *>(columns[0])->getColumns();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return columns;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns_, const size_t row_num, Arena *) const override
|
|
||||||
{
|
|
||||||
const auto & columns = getArgumentColumns(columns_);
|
|
||||||
|
|
||||||
// Column 0 contains array of keys of known type
|
|
||||||
const ColumnArray & array_column0 = assert_cast<const ColumnArray &>(*columns[0]);
|
|
||||||
const IColumn::Offsets & offsets0 = array_column0.getOffsets();
|
|
||||||
const IColumn & key_column = array_column0.getData();
|
|
||||||
const size_t keys_vec_offset = offsets0[row_num - 1];
|
|
||||||
const size_t keys_vec_size = (offsets0[row_num] - keys_vec_offset);
|
|
||||||
|
|
||||||
// Columns 1..n contain arrays of numeric values to sum
|
|
||||||
auto & merged_maps = this->data(place).merged_maps;
|
|
||||||
for (size_t col = 0, size = values_types.size(); col < size; ++col)
|
|
||||||
{
|
|
||||||
const auto & array_column = assert_cast<const ColumnArray &>(*columns[col + 1]);
|
|
||||||
const IColumn & value_column = array_column.getData();
|
|
||||||
const IColumn::Offsets & offsets = array_column.getOffsets();
|
|
||||||
const size_t values_vec_offset = offsets[row_num - 1];
|
|
||||||
const size_t values_vec_size = (offsets[row_num] - values_vec_offset);
|
|
||||||
|
|
||||||
// Expect key and value arrays to be of same length
|
|
||||||
if (keys_vec_size != values_vec_size)
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Sizes of keys and values arrays do not match");
|
|
||||||
|
|
||||||
// Insert column values for all keys
|
|
||||||
for (size_t i = 0; i < keys_vec_size; ++i)
|
|
||||||
{
|
|
||||||
auto value = value_column[values_vec_offset + i];
|
|
||||||
T key = static_cast<T>(key_column[keys_vec_offset + i].get<T>());
|
|
||||||
|
|
||||||
if (!keepKey(key))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
decltype(merged_maps.begin()) it;
|
|
||||||
if constexpr (is_decimal<T>)
|
|
||||||
{
|
|
||||||
// FIXME why is storing NearestFieldType not enough, and we
|
|
||||||
// have to check for decimals again here?
|
|
||||||
UInt32 scale = static_cast<const ColumnDecimal<T> &>(key_column).getScale();
|
|
||||||
it = merged_maps.find(DecimalField<T>(key, scale));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
it = merged_maps.find(key);
|
|
||||||
|
|
||||||
if (it != merged_maps.end())
|
|
||||||
{
|
|
||||||
if (!value.isNull())
|
|
||||||
{
|
|
||||||
if (it->second[col].isNull())
|
|
||||||
it->second[col] = value;
|
|
||||||
else
|
|
||||||
applyVisitor(Visitor(value), it->second[col]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Create a value array for this key
|
|
||||||
Array new_values;
|
|
||||||
new_values.resize(size);
|
|
||||||
new_values[col] = value;
|
|
||||||
|
|
||||||
if constexpr (is_decimal<T>)
|
|
||||||
{
|
|
||||||
UInt32 scale = static_cast<const ColumnDecimal<T> &>(key_column).getScale();
|
|
||||||
merged_maps.emplace(DecimalField<T>(key, scale), std::move(new_values));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
merged_maps.emplace(key, std::move(new_values));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
|
|
||||||
{
|
|
||||||
auto & merged_maps = this->data(place).merged_maps;
|
|
||||||
const auto & rhs_maps = this->data(rhs).merged_maps;
|
|
||||||
|
|
||||||
for (const auto & elem : rhs_maps)
|
|
||||||
{
|
|
||||||
const auto & it = merged_maps.find(elem.first);
|
|
||||||
if (it != merged_maps.end())
|
|
||||||
{
|
|
||||||
for (size_t col = 0; col < values_types.size(); ++col)
|
|
||||||
if (!elem.second[col].isNull())
|
|
||||||
applyVisitor(Visitor(elem.second[col]), it->second[col]);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
merged_maps[elem.first] = elem.second;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version) const override
|
|
||||||
{
|
|
||||||
if (!version)
|
|
||||||
version = getDefaultVersion();
|
|
||||||
|
|
||||||
const auto & merged_maps = this->data(place).merged_maps;
|
|
||||||
size_t size = merged_maps.size();
|
|
||||||
writeVarUInt(size, buf);
|
|
||||||
|
|
||||||
std::function<void(size_t, const Array &)> serialize;
|
|
||||||
switch (*version)
|
|
||||||
{
|
|
||||||
case 0:
|
|
||||||
{
|
|
||||||
serialize = [&](size_t col_idx, const Array & values){ values_serializations[col_idx]->serializeBinary(values[col_idx], buf, {}); };
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 1:
|
|
||||||
{
|
|
||||||
serialize = [&](size_t col_idx, const Array & values){ promoted_values_serializations[col_idx]->serializeBinary(values[col_idx], buf, {}); };
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const auto & elem : merged_maps)
|
|
||||||
{
|
|
||||||
keys_serialization->serializeBinary(elem.first, buf, {});
|
|
||||||
for (size_t col = 0; col < values_types.size(); ++col)
|
|
||||||
serialize(col, elem.second);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> version, Arena *) const override
|
|
||||||
{
|
|
||||||
if (!version)
|
|
||||||
version = getDefaultVersion();
|
|
||||||
|
|
||||||
auto & merged_maps = this->data(place).merged_maps;
|
|
||||||
size_t size = 0;
|
|
||||||
readVarUInt(size, buf);
|
|
||||||
|
|
||||||
std::function<void(size_t, Array &)> deserialize;
|
|
||||||
switch (*version)
|
|
||||||
{
|
|
||||||
case 0:
|
|
||||||
{
|
|
||||||
deserialize = [&](size_t col_idx, Array & values){ values_serializations[col_idx]->deserializeBinary(values[col_idx], buf, {}); };
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 1:
|
|
||||||
{
|
|
||||||
deserialize = [&](size_t col_idx, Array & values){ promoted_values_serializations[col_idx]->deserializeBinary(values[col_idx], buf, {}); };
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t i = 0; i < size; ++i)
|
|
||||||
{
|
|
||||||
Field key;
|
|
||||||
keys_serialization->deserializeBinary(key, buf, {});
|
|
||||||
|
|
||||||
Array values;
|
|
||||||
values.resize(values_types.size());
|
|
||||||
|
|
||||||
for (size_t col = 0; col < values_types.size(); ++col)
|
|
||||||
deserialize(col, values);
|
|
||||||
|
|
||||||
if constexpr (is_decimal<T>)
|
|
||||||
merged_maps[key.get<DecimalField<T>>()] = values;
|
|
||||||
else
|
|
||||||
merged_maps[key.get<T>()] = values;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
|
||||||
{
|
|
||||||
size_t num_columns = values_types.size();
|
|
||||||
|
|
||||||
// Final step does compaction of keys that have zero values, this mutates the state
|
|
||||||
auto & merged_maps = this->data(place).merged_maps;
|
|
||||||
|
|
||||||
// Remove keys which are zeros or empty. This should be enabled only for sumMap.
|
|
||||||
if constexpr (compact)
|
|
||||||
{
|
|
||||||
for (auto it = merged_maps.cbegin(); it != merged_maps.cend();)
|
|
||||||
{
|
|
||||||
// Key is not compacted if it has at least one non-zero value
|
|
||||||
bool erase = true;
|
|
||||||
for (size_t col = 0; col < num_columns; ++col)
|
|
||||||
{
|
|
||||||
if (!it->second[col].isNull() && it->second[col] != values_types[col]->getDefault())
|
|
||||||
{
|
|
||||||
erase = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (erase)
|
|
||||||
it = merged_maps.erase(it);
|
|
||||||
else
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t size = merged_maps.size();
|
|
||||||
|
|
||||||
auto & to_tuple = assert_cast<ColumnTuple &>(to);
|
|
||||||
auto & to_keys_arr = assert_cast<ColumnArray &>(to_tuple.getColumn(0));
|
|
||||||
auto & to_keys_col = to_keys_arr.getData();
|
|
||||||
|
|
||||||
// Advance column offsets
|
|
||||||
auto & to_keys_offsets = to_keys_arr.getOffsets();
|
|
||||||
to_keys_offsets.push_back(to_keys_offsets.back() + size);
|
|
||||||
to_keys_col.reserve(size);
|
|
||||||
|
|
||||||
for (size_t col = 0; col < num_columns; ++col)
|
|
||||||
{
|
|
||||||
auto & to_values_arr = assert_cast<ColumnArray &>(to_tuple.getColumn(col + 1));
|
|
||||||
auto & to_values_offsets = to_values_arr.getOffsets();
|
|
||||||
to_values_offsets.push_back(to_values_offsets.back() + size);
|
|
||||||
to_values_arr.getData().reserve(size);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write arrays of keys and values
|
|
||||||
for (const auto & elem : merged_maps)
|
|
||||||
{
|
|
||||||
// Write array of keys into column
|
|
||||||
to_keys_col.insert(elem.first);
|
|
||||||
|
|
||||||
// Write 0..n arrays of values
|
|
||||||
for (size_t col = 0; col < num_columns; ++col)
|
|
||||||
{
|
|
||||||
auto & to_values_col = assert_cast<ColumnArray &>(to_tuple.getColumn(col + 1)).getData();
|
|
||||||
if (elem.second[col].isNull())
|
|
||||||
to_values_col.insertDefault();
|
|
||||||
else
|
|
||||||
to_values_col.insert(elem.second[col]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool keepKey(const T & key) const { return static_cast<const Derived &>(*this).keepKey(key); }
|
|
||||||
String getName() const override { return Derived::getNameImpl(); }
|
|
||||||
};
|
|
||||||
|
|
||||||
template <typename T, bool overflow, bool tuple_argument>
|
|
||||||
class AggregateFunctionSumMap final :
|
|
||||||
public AggregateFunctionMapBase<T, AggregateFunctionSumMap<T, overflow, tuple_argument>, FieldVisitorSum, overflow, tuple_argument, true>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
using Self = AggregateFunctionSumMap<T, overflow, tuple_argument>;
|
|
||||||
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorSum, overflow, tuple_argument, true>;
|
|
||||||
|
|
||||||
public:
|
|
||||||
AggregateFunctionSumMap(const DataTypePtr & keys_type_,
|
|
||||||
DataTypes & values_types_, const DataTypes & argument_types_,
|
|
||||||
const Array & params_)
|
|
||||||
: Base{keys_type_, values_types_, argument_types_}
|
|
||||||
{
|
|
||||||
// The constructor accepts parameters to have a uniform interface with
|
|
||||||
// sumMapFiltered, but this function doesn't have any parameters.
|
|
||||||
assertNoParameters(getNameImpl(), params_);
|
|
||||||
}
|
|
||||||
|
|
||||||
static String getNameImpl()
|
|
||||||
{
|
|
||||||
if constexpr (overflow)
|
|
||||||
{
|
|
||||||
return "sumMapWithOverflow";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return "sumMap";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool keepKey(const T &) const { return true; }
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
template <typename T, bool overflow, bool tuple_argument>
|
|
||||||
class AggregateFunctionSumMapFiltered final :
|
|
||||||
public AggregateFunctionMapBase<T,
|
|
||||||
AggregateFunctionSumMapFiltered<T, overflow, tuple_argument>,
|
|
||||||
FieldVisitorSum,
|
|
||||||
overflow,
|
|
||||||
tuple_argument,
|
|
||||||
true>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
using Self = AggregateFunctionSumMapFiltered<T, overflow, tuple_argument>;
|
|
||||||
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorSum, overflow, tuple_argument, true>;
|
|
||||||
|
|
||||||
using ContainerT = std::unordered_set<T>;
|
|
||||||
|
|
||||||
ContainerT keys_to_keep;
|
|
||||||
|
|
||||||
public:
|
|
||||||
AggregateFunctionSumMapFiltered(const DataTypePtr & keys_type_,
|
|
||||||
const DataTypes & values_types_, const DataTypes & argument_types_,
|
|
||||||
const Array & params_)
|
|
||||||
: Base{keys_type_, values_types_, argument_types_}
|
|
||||||
{
|
|
||||||
if (params_.size() != 1)
|
|
||||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
|
||||||
"Aggregate function '{}' requires exactly one parameter "
|
|
||||||
"of Array type", getNameImpl());
|
|
||||||
|
|
||||||
Array keys_to_keep_values;
|
|
||||||
if (!params_.front().tryGet<Array>(keys_to_keep_values))
|
|
||||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
|
|
||||||
"Aggregate function {} requires an Array as a parameter",
|
|
||||||
getNameImpl());
|
|
||||||
|
|
||||||
this->parameters = params_;
|
|
||||||
|
|
||||||
keys_to_keep.reserve(keys_to_keep_values.size());
|
|
||||||
|
|
||||||
for (const Field & f : keys_to_keep_values)
|
|
||||||
keys_to_keep.emplace(f.safeGet<T>());
|
|
||||||
}
|
|
||||||
|
|
||||||
static String getNameImpl()
|
|
||||||
{
|
|
||||||
if constexpr (overflow)
|
|
||||||
{
|
|
||||||
return "sumMapFilteredWithOverflow";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return "sumMapFiltered";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool keepKey(const T & key) const { return keys_to_keep.count(key); }
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/** Implements `Max` operation.
|
|
||||||
* Returns true if changed
|
|
||||||
*/
|
|
||||||
class FieldVisitorMax : public StaticVisitor<bool>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
const Field & rhs;
|
|
||||||
|
|
||||||
template <typename FieldType>
|
|
||||||
bool compareImpl(FieldType & x) const
|
|
||||||
{
|
|
||||||
auto val = rhs.get<FieldType>();
|
|
||||||
if (val > x)
|
|
||||||
{
|
|
||||||
x = val;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
explicit FieldVisitorMax(const Field & rhs_) : rhs(rhs_) {}
|
|
||||||
|
|
||||||
bool operator() (Null &) const
|
|
||||||
{
|
|
||||||
/// Do not update current value, skip nulls
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool operator() (AggregateFunctionStateData &) const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot compare AggregateFunctionStates"); }
|
|
||||||
|
|
||||||
bool operator() (Array & x) const { return compareImpl<Array>(x); }
|
|
||||||
bool operator() (Tuple & x) const { return compareImpl<Tuple>(x); }
|
|
||||||
template <typename T>
|
|
||||||
bool operator() (DecimalField<T> & x) const { return compareImpl<DecimalField<T>>(x); }
|
|
||||||
template <typename T>
|
|
||||||
bool operator() (T & x) const { return compareImpl<T>(x); }
|
|
||||||
};
|
|
||||||
|
|
||||||
/** Implements `Min` operation.
|
|
||||||
* Returns true if changed
|
|
||||||
*/
|
|
||||||
class FieldVisitorMin : public StaticVisitor<bool>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
const Field & rhs;
|
|
||||||
|
|
||||||
template <typename FieldType>
|
|
||||||
bool compareImpl(FieldType & x) const
|
|
||||||
{
|
|
||||||
auto val = rhs.get<FieldType>();
|
|
||||||
if (val < x)
|
|
||||||
{
|
|
||||||
x = val;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
explicit FieldVisitorMin(const Field & rhs_) : rhs(rhs_) {}
|
|
||||||
|
|
||||||
|
|
||||||
bool operator() (Null &) const
|
|
||||||
{
|
|
||||||
/// Do not update current value, skip nulls
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool operator() (AggregateFunctionStateData &) const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot sum AggregateFunctionStates"); }
|
|
||||||
|
|
||||||
bool operator() (Array & x) const { return compareImpl<Array>(x); }
|
|
||||||
bool operator() (Tuple & x) const { return compareImpl<Tuple>(x); }
|
|
||||||
template <typename T>
|
|
||||||
bool operator() (DecimalField<T> & x) const { return compareImpl<DecimalField<T>>(x); }
|
|
||||||
template <typename T>
|
|
||||||
bool operator() (T & x) const { return compareImpl<T>(x); }
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
template <typename T, bool tuple_argument>
|
|
||||||
class AggregateFunctionMinMap final :
|
|
||||||
public AggregateFunctionMapBase<T, AggregateFunctionMinMap<T, tuple_argument>, FieldVisitorMin, true, tuple_argument, false>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
using Self = AggregateFunctionMinMap<T, tuple_argument>;
|
|
||||||
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorMin, true, tuple_argument, false>;
|
|
||||||
|
|
||||||
public:
|
|
||||||
AggregateFunctionMinMap(const DataTypePtr & keys_type_,
|
|
||||||
DataTypes & values_types_, const DataTypes & argument_types_,
|
|
||||||
const Array & params_)
|
|
||||||
: Base{keys_type_, values_types_, argument_types_}
|
|
||||||
{
|
|
||||||
// The constructor accepts parameters to have a uniform interface with
|
|
||||||
// sumMapFiltered, but this function doesn't have any parameters.
|
|
||||||
assertNoParameters(getNameImpl(), params_);
|
|
||||||
}
|
|
||||||
|
|
||||||
static String getNameImpl() { return "minMap"; }
|
|
||||||
|
|
||||||
bool keepKey(const T &) const { return true; }
|
|
||||||
};
|
|
||||||
|
|
||||||
template <typename T, bool tuple_argument>
|
|
||||||
class AggregateFunctionMaxMap final :
|
|
||||||
public AggregateFunctionMapBase<T, AggregateFunctionMaxMap<T, tuple_argument>, FieldVisitorMax, true, tuple_argument, false>
|
|
||||||
{
|
|
||||||
private:
|
|
||||||
using Self = AggregateFunctionMaxMap<T, tuple_argument>;
|
|
||||||
using Base = AggregateFunctionMapBase<T, Self, FieldVisitorMax, true, tuple_argument, false>;
|
|
||||||
|
|
||||||
public:
|
|
||||||
AggregateFunctionMaxMap(const DataTypePtr & keys_type_,
|
|
||||||
DataTypes & values_types_, const DataTypes & argument_types_,
|
|
||||||
const Array & params_)
|
|
||||||
: Base{keys_type_, values_types_, argument_types_}
|
|
||||||
{
|
|
||||||
// The constructor accepts parameters to have a uniform interface with
|
|
||||||
// sumMapFiltered, but this function doesn't have any parameters.
|
|
||||||
assertNoParameters(getNameImpl(), params_);
|
|
||||||
}
|
|
||||||
|
|
||||||
static String getNameImpl() { return "maxMap"; }
|
|
||||||
|
|
||||||
bool keepKey(const T &) const { return true; }
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
|
@ -234,7 +234,7 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename T0, typename T1, typename ColVecT0, typename ColVecT1>
|
template <typename T0, typename T1, typename ColVecT0, typename ColVecT1>
|
||||||
ColumnPtr executeRightType(
|
NO_INLINE ColumnPtr executeRightType(
|
||||||
[[maybe_unused]] const ColumnUInt8 * cond_col,
|
[[maybe_unused]] const ColumnUInt8 * cond_col,
|
||||||
[[maybe_unused]] const ColumnsWithTypeAndName & arguments,
|
[[maybe_unused]] const ColumnsWithTypeAndName & arguments,
|
||||||
[[maybe_unused]] const ColVecT0 * col_left) const
|
[[maybe_unused]] const ColVecT0 * col_left) const
|
||||||
@ -266,7 +266,7 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename T0, typename T1, typename ColVecT0, typename ColVecT1>
|
template <typename T0, typename T1, typename ColVecT0, typename ColVecT1>
|
||||||
ColumnPtr executeConstRightType(
|
NO_INLINE ColumnPtr executeConstRightType(
|
||||||
[[maybe_unused]] const ColumnUInt8 * cond_col,
|
[[maybe_unused]] const ColumnUInt8 * cond_col,
|
||||||
[[maybe_unused]] const ColumnsWithTypeAndName & arguments,
|
[[maybe_unused]] const ColumnsWithTypeAndName & arguments,
|
||||||
[[maybe_unused]] const ColumnConst * col_left) const
|
[[maybe_unused]] const ColumnConst * col_left) const
|
||||||
@ -298,7 +298,7 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename T0, typename T1, typename ColVecT0, typename ColVecT1>
|
template <typename T0, typename T1, typename ColVecT0, typename ColVecT1>
|
||||||
ColumnPtr executeRightTypeArray(
|
NO_INLINE ColumnPtr executeRightTypeArray(
|
||||||
[[maybe_unused]] const ColumnUInt8 * cond_col,
|
[[maybe_unused]] const ColumnUInt8 * cond_col,
|
||||||
[[maybe_unused]] const ColumnsWithTypeAndName & arguments,
|
[[maybe_unused]] const ColumnsWithTypeAndName & arguments,
|
||||||
[[maybe_unused]] const DataTypePtr result_type,
|
[[maybe_unused]] const DataTypePtr result_type,
|
||||||
@ -355,7 +355,7 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename T0, typename T1, typename ColVecT0, typename ColVecT1>
|
template <typename T0, typename T1, typename ColVecT0, typename ColVecT1>
|
||||||
ColumnPtr executeConstRightTypeArray(
|
NO_INLINE ColumnPtr executeConstRightTypeArray(
|
||||||
[[maybe_unused]] const ColumnUInt8 * cond_col,
|
[[maybe_unused]] const ColumnUInt8 * cond_col,
|
||||||
[[maybe_unused]] const ColumnsWithTypeAndName & arguments,
|
[[maybe_unused]] const ColumnsWithTypeAndName & arguments,
|
||||||
[[maybe_unused]] const DataTypePtr & result_type,
|
[[maybe_unused]] const DataTypePtr & result_type,
|
||||||
@ -413,7 +413,7 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <typename T0, typename T1>
|
template <typename T0, typename T1>
|
||||||
ColumnPtr executeTyped(
|
NO_INLINE ColumnPtr executeTyped(
|
||||||
const ColumnUInt8 * cond_col, const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const
|
const ColumnUInt8 * cond_col, const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const
|
||||||
{
|
{
|
||||||
using ColVecT0 = ColumnVectorOrDecimal<T0>;
|
using ColVecT0 = ColumnVectorOrDecimal<T0>;
|
||||||
@ -1086,7 +1086,7 @@ public:
|
|||||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}. "
|
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}. "
|
||||||
"Must be ColumnUInt8 or ColumnConstUInt8.", arg_cond.column->getName(), getName());
|
"Must be ColumnUInt8 or ColumnConstUInt8.", arg_cond.column->getName(), getName());
|
||||||
|
|
||||||
auto call = [&](const auto & types) -> bool
|
auto call = [&](const auto & types) NO_INLINE -> bool
|
||||||
{
|
{
|
||||||
using Types = std::decay_t<decltype(types)>;
|
using Types = std::decay_t<decltype(types)>;
|
||||||
using T0 = typename Types::LeftType;
|
using T0 = typename Types::LeftType;
|
||||||
|
Loading…
Reference in New Issue
Block a user