Optimize merge of uniqExact without_key (#43072)

* impl for uniqExact

* rm unused (read|write)Text methods

* fix style

* small fixes

* impl for variadic uniqExact

* refactor

* fix style

* more agressive inlining

* disable if max_threads=1

* small improvements

* review fixes

* Revert "rm unused (read|write)Text methods"

This reverts commit a7e7480584.

* encapsulate is_able_to_parallelize_merge in Data

* encapsulate is_exact & argument_is_tuple in Data
This commit is contained in:
Nikita Taranov 2022-11-17 13:19:02 +01:00 committed by GitHub
parent a29c5b9e3a
commit 7beb58b0cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 832 additions and 171 deletions

View File

@ -9,6 +9,7 @@
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
#include <Core/Settings.h>
namespace DB
{
@ -28,8 +29,9 @@ namespace
/** `DataForVariadic` is a data structure that will be used for `uniq` aggregate function of multiple arguments.
* It differs, for example, in that it uses a trivial hash function, since `uniq` of many arguments first hashes them out itself.
*/
template <typename Data, typename DataForVariadic>
AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
template <typename Data, template <bool, bool> typename DataForVariadic>
AggregateFunctionPtr
createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
{
assertNoParameters(name, params);
@ -61,21 +63,22 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
else if (which.isTuple())
{
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true, true>>(argument_types);
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, true>>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false, true>>(argument_types);
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, true>>>(argument_types);
}
}
/// "Variadic" method also works as a fallback generic case for single argument.
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true, false>>(argument_types);
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, false>>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false, false>>(argument_types);
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, false>>>(argument_types);
}
template <bool is_exact, template <typename> class Data, typename DataForVariadic>
AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
template <bool is_exact, template <typename, bool> typename Data, template <bool, bool, bool> typename DataForVariadic, bool is_able_to_parallelize_merge>
AggregateFunctionPtr
createAggregateFunctionUniq(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
{
assertNoParameters(name, params);
@ -91,35 +94,35 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
{
const IDataType & argument_type = *argument_types[0];
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionUniq, Data>(*argument_types[0], argument_types));
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionUniq, Data, is_able_to_parallelize_merge>(*argument_types[0], argument_types));
WhichDataType which(argument_type);
if (res)
return res;
else if (which.isDate())
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data<DataTypeDate::FieldType>>>(argument_types);
return std::make_shared<AggregateFunctionUniq<DataTypeDate::FieldType, Data<DataTypeDate::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isDate32())
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data<DataTypeDate32::FieldType>>>(argument_types);
return std::make_shared<AggregateFunctionUniq<DataTypeDate32::FieldType, Data<DataTypeDate32::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isDateTime())
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data<DataTypeDateTime::FieldType>>>(argument_types);
return std::make_shared<AggregateFunctionUniq<DataTypeDateTime::FieldType, Data<DataTypeDateTime::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isStringOrFixedString())
return std::make_shared<AggregateFunctionUniq<String, Data<String>>>(argument_types);
return std::make_shared<AggregateFunctionUniq<String, Data<String, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isUUID())
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data<DataTypeUUID::FieldType>>>(argument_types);
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data<DataTypeUUID::FieldType, is_able_to_parallelize_merge>>>(argument_types);
else if (which.isTuple())
{
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true, true>>(argument_types);
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, true, is_able_to_parallelize_merge>>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false, true>>(argument_types);
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, true, is_able_to_parallelize_merge>>>(argument_types);
}
}
/// "Variadic" method also works as a fallback generic case for single argument.
if (use_exact_hash_function)
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, true, false>>(argument_types);
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<true, false, is_able_to_parallelize_merge>>>(argument_types);
else
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic, false, false>>(argument_types);
return std::make_shared<AggregateFunctionUniqVariadic<DataForVariadic<false, false, is_able_to_parallelize_merge>>>(argument_types);
}
}
@ -132,14 +135,23 @@ void registerAggregateFunctionsUniq(AggregateFunctionFactory & factory)
{createAggregateFunctionUniq<AggregateFunctionUniqUniquesHashSetData, AggregateFunctionUniqUniquesHashSetDataForVariadic>, properties});
factory.registerFunction("uniqHLL12",
{createAggregateFunctionUniq<false, AggregateFunctionUniqHLL12Data, AggregateFunctionUniqHLL12DataForVariadic>, properties});
{createAggregateFunctionUniq<false, AggregateFunctionUniqHLL12Data, AggregateFunctionUniqHLL12DataForVariadic, false /* is_able_to_parallelize_merge */>, properties});
factory.registerFunction("uniqExact",
{createAggregateFunctionUniq<true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactData<String>>, properties});
auto assign_bool_param = [](const std::string & name, const DataTypes & argument_types, const Array & params, const Settings * settings)
{
/// Using two level hash set if we wouldn't be able to merge in parallel can cause ~10% slowdown.
if (settings && settings->max_threads > 1)
return createAggregateFunctionUniq<
true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactDataForVariadic, true /* is_able_to_parallelize_merge */>(name, argument_types, params, settings);
else
return createAggregateFunctionUniq<
true, AggregateFunctionUniqExactData, AggregateFunctionUniqExactDataForVariadic, false /* is_able_to_parallelize_merge */>(name, argument_types, params, settings);
};
factory.registerFunction("uniqExact", {assign_bool_param, properties});
#if USE_DATASKETCHES
factory.registerFunction("uniqTheta",
{createAggregateFunctionUniq<AggregateFunctionUniqThetaData, AggregateFunctionUniqThetaData>, properties});
{createAggregateFunctionUniq<AggregateFunctionUniqThetaData, AggregateFunctionUniqThetaDataForVariadic>, properties});
#endif
}

View File

@ -1,7 +1,10 @@
#pragma once
#include <city.h>
#include <atomic>
#include <memory>
#include <type_traits>
#include <utility>
#include <city.h>
#include <base/bit_cast.h>
@ -13,17 +16,18 @@
#include <Interpreters/AggregationCommon.h>
#include <Common/CombinedCardinalityEstimator.h>
#include <Common/HashTable/Hash.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HyperLogLogWithSmallSetOptimization.h>
#include <Common/CombinedCardinalityEstimator.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <AggregateFunctions/UniquesHashSet.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/ThetaSketchData.h>
#include <AggregateFunctions/UniqExactSet.h>
#include <AggregateFunctions/UniqVariadicHash.h>
#include <AggregateFunctions/UniquesHashSet.h>
namespace DB
@ -37,94 +41,128 @@ struct AggregateFunctionUniqUniquesHashSetData
using Set = UniquesHashSet<DefaultHash<UInt64>>;
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniq"; }
};
/// For a function that takes multiple arguments. Such a function pre-hashes them in advance, so TrivialHash is used here.
template <bool is_exact_, bool argument_is_tuple_>
struct AggregateFunctionUniqUniquesHashSetDataForVariadic
{
using Set = UniquesHashSet<TrivialHash>;
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
static String getName() { return "uniq"; }
};
/// uniqHLL12
template <typename T>
template <typename T, bool is_able_to_parallelize_merge_>
struct AggregateFunctionUniqHLL12Data
{
using Set = HyperLogLogWithSmallSetOptimization<T, 16, 12>;
Set set;
static String getName() { return "uniqHLL12"; }
};
template <>
struct AggregateFunctionUniqHLL12Data<String>
{
using Set = HyperLogLogWithSmallSetOptimization<UInt64, 16, 12>;
Set set;
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_variadic = false;
static String getName() { return "uniqHLL12"; }
};
template <>
struct AggregateFunctionUniqHLL12Data<UUID>
struct AggregateFunctionUniqHLL12Data<String, false>
{
using Set = HyperLogLogWithSmallSetOptimization<UInt64, 16, 12>;
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniqHLL12"; }
};
template <>
struct AggregateFunctionUniqHLL12Data<UUID, false>
{
using Set = HyperLogLogWithSmallSetOptimization<UInt64, 16, 12>;
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniqHLL12"; }
};
template <bool is_exact_, bool argument_is_tuple_, bool is_able_to_parallelize_merge_>
struct AggregateFunctionUniqHLL12DataForVariadic
{
using Set = HyperLogLogWithSmallSetOptimization<UInt64, 16, 12, TrivialHash>;
Set set;
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
static String getName() { return "uniqHLL12"; }
};
/// uniqExact
template <typename T>
template <typename T, bool is_able_to_parallelize_merge_>
struct AggregateFunctionUniqExactData
{
using Key = T;
/// When creating, the hash table must be small.
using Set = HashSet<
Key,
HashCRC32<Key>,
HashTableGrower<4>,
HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 4)>>;
using SingleLevelSet = HashSet<Key, HashCRC32<Key>, HashTableGrower<4>, HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 4)>>;
using TwoLevelSet = TwoLevelHashSet<Key, HashCRC32<Key>>;
using Set = UniqExactSet<SingleLevelSet, TwoLevelSet>;
Set set;
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_variadic = false;
static String getName() { return "uniqExact"; }
};
/// For rows, we put the SipHash values (128 bits) into the hash table.
template <>
struct AggregateFunctionUniqExactData<String>
template <bool is_able_to_parallelize_merge_>
struct AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
{
using Key = UInt128;
/// When creating, the hash table must be small.
using Set = HashSet<
Key,
UInt128TrivialHash,
HashTableGrower<3>,
HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 3)>>;
using SingleLevelSet = HashSet<Key, UInt128TrivialHash, HashTableGrower<3>, HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 3)>>;
using TwoLevelSet = TwoLevelHashSet<Key, UInt128TrivialHash>;
using Set = UniqExactSet<SingleLevelSet, TwoLevelSet>;
Set set;
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_variadic = false;
static String getName() { return "uniqExact"; }
};
template <bool is_exact_, bool argument_is_tuple_, bool is_able_to_parallelize_merge_>
struct AggregateFunctionUniqExactDataForVariadic : AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
{
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
};
/// uniqTheta
#if USE_DATASKETCHES
@ -134,14 +172,37 @@ struct AggregateFunctionUniqThetaData
using Set = ThetaSketchData<UInt64>;
Set set;
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_variadic = false;
static String getName() { return "uniqTheta"; }
};
template <bool is_exact_, bool argument_is_tuple_>
struct AggregateFunctionUniqThetaDataForVariadic : AggregateFunctionUniqThetaData
{
constexpr static bool is_able_to_parallelize_merge = false;
constexpr static bool is_variadic = true;
constexpr static bool is_exact = is_exact_;
constexpr static bool argument_is_tuple = argument_is_tuple_;
};
#endif
namespace detail
{
template <typename T>
struct IsUniqExactSet : std::false_type
{
};
template <typename T1, typename T2>
struct IsUniqExactSet<UniqExactSet<T1, T2>> : std::true_type
{
};
/** Hash function for uniq.
*/
template <typename T> struct AggregateFunctionUniqTraits
@ -162,17 +223,31 @@ template <typename T> struct AggregateFunctionUniqTraits
};
/** The structure for the delegation work to add one element to the `uniq` aggregate functions.
/** The structure for the delegation work to add elements to the `uniq` aggregate functions.
* Used for partial specialization to add strings.
*/
template <typename T, typename Data>
struct OneAdder
struct Adder
{
static void ALWAYS_INLINE add(Data & data, const IColumn & column, size_t row_num)
/// We have to introduce this template parameter (and a bunch of ugly code dealing with it), because we cannot
/// add runtime branches in whatever_hash_set::insert - it will immediately pop up in the perf top.
template <bool use_single_level_hash_table = true>
static void ALWAYS_INLINE add(Data & data, const IColumn ** columns, size_t num_args, size_t row_num)
{
if constexpr (std::is_same_v<Data, AggregateFunctionUniqUniquesHashSetData>
|| std::is_same_v<Data, AggregateFunctionUniqHLL12Data<T>>)
if constexpr (Data::is_variadic)
{
if constexpr (IsUniqExactSet<typename Data::Set>::value)
data.set.template insert<T, use_single_level_hash_table>(
UniqVariadicHash<Data::is_exact, Data::argument_is_tuple>::apply(num_args, columns, row_num));
else
data.set.insert(T{UniqVariadicHash<Data::is_exact, Data::argument_is_tuple>::apply(num_args, columns, row_num)});
}
else if constexpr (
std::is_same_v<
Data,
AggregateFunctionUniqUniquesHashSetData> || std::is_same_v<Data, AggregateFunctionUniqHLL12Data<T, Data::is_able_to_parallelize_merge>>)
{
const auto & column = *columns[0];
if constexpr (!std::is_same_v<T, String>)
{
using ValueType = typename decltype(data.set)::value_type;
@ -185,11 +260,13 @@ struct OneAdder
data.set.insert(CityHash_v1_0_2::CityHash64(value.data, value.size));
}
}
else if constexpr (std::is_same_v<Data, AggregateFunctionUniqExactData<T>>)
else if constexpr (std::is_same_v<Data, AggregateFunctionUniqExactData<T, Data::is_able_to_parallelize_merge>>)
{
const auto & column = *columns[0];
if constexpr (!std::is_same_v<T, String>)
{
data.set.insert(assert_cast<const ColumnVector<T> &>(column).getData()[row_num]);
data.set.template insert<const T &, use_single_level_hash_table>(
assert_cast<const ColumnVector<T> &>(column).getData()[row_num]);
}
else
{
@ -200,16 +277,72 @@ struct OneAdder
hash.update(value.data, value.size);
hash.get128(key);
data.set.insert(key);
data.set.template insert<const UInt128 &, use_single_level_hash_table>(key);
}
}
#if USE_DATASKETCHES
else if constexpr (std::is_same_v<Data, AggregateFunctionUniqThetaData>)
{
const auto & column = *columns[0];
data.set.insertOriginal(column.getDataAt(row_num));
}
#endif
}
static void ALWAYS_INLINE
add(Data & data, const IColumn ** columns, size_t num_args, size_t row_begin, size_t row_end, const char8_t * flags, const UInt8 * null_map)
{
bool use_single_level_hash_table = true;
if constexpr (Data::is_able_to_parallelize_merge)
use_single_level_hash_table = data.set.isSingleLevel();
if (use_single_level_hash_table)
addImpl<true>(data, columns, num_args, row_begin, row_end, flags, null_map);
else
addImpl<false>(data, columns, num_args, row_begin, row_end, flags, null_map);
if constexpr (Data::is_able_to_parallelize_merge)
{
if (data.set.isSingleLevel() && data.set.size() > 100'000)
data.set.convertToTwoLevel();
}
}
private:
template <bool use_single_level_hash_table>
static void ALWAYS_INLINE
addImpl(Data & data, const IColumn ** columns, size_t num_args, size_t row_begin, size_t row_end, const char8_t * flags, const UInt8 * null_map)
{
if (!flags)
{
if (!null_map)
{
for (size_t row = row_begin; row < row_end; ++row)
add<use_single_level_hash_table>(data, columns, num_args, row);
}
else
{
for (size_t row = row_begin; row < row_end; ++row)
if (!null_map[row])
add<use_single_level_hash_table>(data, columns, num_args, row);
}
}
else
{
if (!null_map)
{
for (size_t row = row_begin; row < row_end; ++row)
if (flags[row])
add<use_single_level_hash_table>(data, columns, num_args, row);
}
else
{
for (size_t row = row_begin; row < row_end; ++row)
if (!null_map[row] && flags[row])
add<use_single_level_hash_table>(data, columns, num_args, row);
}
}
}
};
}
@ -219,9 +352,15 @@ struct OneAdder
template <typename T, typename Data>
class AggregateFunctionUniq final : public IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>>
{
private:
static constexpr size_t num_args = 1;
static constexpr bool is_able_to_parallelize_merge = Data::is_able_to_parallelize_merge;
public:
AggregateFunctionUniq(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>>(argument_types_, {}) {}
explicit AggregateFunctionUniq(const DataTypes & argument_types_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>>(argument_types_, {})
{
}
String getName() const override { return Data::getName(); }
@ -235,7 +374,18 @@ public:
/// ALWAYS_INLINE is required to have better code layout for uniqHLL12 function
void ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num);
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_num);
}
void ALWAYS_INLINE addBatchSinglePlace(
size_t row_begin, size_t row_end, AggregateDataPtr __restrict place, const IColumn ** columns, Arena *, ssize_t if_argument_pos)
const override
{
const char8_t * flags = nullptr;
if (if_argument_pos >= 0)
flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData().data();
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_begin, row_end, flags, nullptr /* null_map */);
}
void addManyDefaults(
@ -244,7 +394,23 @@ public:
size_t /*length*/,
Arena * /*arena*/) const override
{
detail::OneAdder<T, Data>::add(this->data(place), *columns[0], 0);
detail::Adder<T, Data>::add(this->data(place), columns, num_args, 0);
}
void addBatchSinglePlaceNotNull(
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
const IColumn ** columns,
const UInt8 * null_map,
Arena *,
ssize_t if_argument_pos) const override
{
const char8_t * flags = nullptr;
if (if_argument_pos >= 0)
flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData().data();
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
@ -252,6 +418,16 @@ public:
this->data(place).set.merge(this->data(rhs).set);
}
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
{
if constexpr (is_able_to_parallelize_merge)
this->data(place).set.merge(this->data(rhs).set, &thread_pool);
else
this->data(place).set.merge(this->data(rhs).set);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
this->data(place).set.write(buf);
@ -273,15 +449,20 @@ public:
* You can pass multiple arguments as is; You can also pass one argument - a tuple.
* But (for the possibility of efficient implementation), you can not pass several arguments, among which there are tuples.
*/
template <typename Data, bool is_exact, bool argument_is_tuple>
class AggregateFunctionUniqVariadic final : public IAggregateFunctionDataHelper<Data, AggregateFunctionUniqVariadic<Data, is_exact, argument_is_tuple>>
template <typename Data>
class AggregateFunctionUniqVariadic final : public IAggregateFunctionDataHelper<Data, AggregateFunctionUniqVariadic<Data>>
{
private:
using T = typename Data::Set::value_type;
static constexpr size_t is_able_to_parallelize_merge = Data::is_able_to_parallelize_merge;
static constexpr size_t argument_is_tuple = Data::argument_is_tuple;
size_t num_args = 0;
public:
AggregateFunctionUniqVariadic(const DataTypes & arguments)
: IAggregateFunctionDataHelper<Data, AggregateFunctionUniqVariadic<Data, is_exact, argument_is_tuple>>(arguments, {})
explicit AggregateFunctionUniqVariadic(const DataTypes & arguments)
: IAggregateFunctionDataHelper<Data, AggregateFunctionUniqVariadic<Data>>(arguments, {})
{
if (argument_is_tuple)
num_args = typeid_cast<const DataTypeTuple &>(*arguments[0]).getElements().size();
@ -300,8 +481,34 @@ public:
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
this->data(place).set.insert(typename Data::Set::value_type(
UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num)));
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_num);
}
void addBatchSinglePlace(
size_t row_begin, size_t row_end, AggregateDataPtr __restrict place, const IColumn ** columns, Arena *, ssize_t if_argument_pos)
const override
{
const char8_t * flags = nullptr;
if (if_argument_pos >= 0)
flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData().data();
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_begin, row_end, flags, nullptr /* null_map */);
}
void addBatchSinglePlaceNotNull(
size_t row_begin,
size_t row_end,
AggregateDataPtr __restrict place,
const IColumn ** columns,
const UInt8 * null_map,
Arena *,
ssize_t if_argument_pos) const override
{
const char8_t * flags = nullptr;
if (if_argument_pos >= 0)
flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData().data();
detail::Adder<T, Data>::add(this->data(place), columns, num_args, row_begin, row_end, flags, null_map);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
@ -309,6 +516,16 @@ public:
this->data(place).set.merge(this->data(rhs).set);
}
bool isAbleToParallelizeMerge() const override { return is_able_to_parallelize_merge; }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena *) const override
{
if constexpr (is_able_to_parallelize_merge)
this->data(place).set.merge(this->data(rhs).set, &thread_pool);
else
this->data(place).set.merge(this->data(rhs).set);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
this->data(place).set.write(buf);

View File

@ -74,6 +74,19 @@ static IAggregateFunction * createWithNumericType(const IDataType & argument_typ
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate, template <typename, bool> class Data, bool bool_param, typename... TArgs>
static IAggregateFunction * createWithNumericType(const IDataType & argument_type, TArgs && ... args)
{
WhichDataType which(argument_type);
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate<TYPE, Data<TYPE, bool_param>>(std::forward<TArgs>(args)...); /// NOLINT
FOR_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate<Int8, Data<Int8, bool_param>>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate<Int16, Data<Int16, bool_param>>(std::forward<TArgs>(args)...);
return nullptr;
}
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data, typename... TArgs>
static IAggregateFunction * createWithUnsignedIntegerType(const IDataType & argument_type, TArgs && ... args)
{

View File

@ -1,14 +1,15 @@
#pragma once
#include <Columns/ColumnSparse.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnSparse.h>
#include <Core/Block.h>
#include <Core/ColumnNumbers.h>
#include <Core/Field.h>
#include <Interpreters/Context_fwd.h>
#include <Common/Exception.h>
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/ThreadPool.h>
#include "config.h"
@ -147,6 +148,16 @@ public:
/// Merges state (on which place points to) with other state of current aggregation function.
virtual void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;
/// Tells if merge() with thread pool parameter could be used.
virtual bool isAbleToParallelizeMerge() const { return false; }
/// Should be used only if isAbleToParallelizeMerge() returned true.
virtual void
merge(AggregateDataPtr __restrict /*place*/, ConstAggregateDataPtr /*rhs*/, ThreadPool & /*thread_pool*/, Arena * /*arena*/) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "merge() with thread pool parameter isn't implemented for {} ", getName());
}
/// Serializes state (to transmit it over the network, for example).
virtual void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> version = std::nullopt) const = 0; /// NOLINT

View File

@ -0,0 +1,112 @@
#pragma once
#include <Common/CurrentThread.h>
#include <Common/HashTable/HashSet.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
namespace DB
{
template <typename SingleLevelSet, typename TwoLevelSet>
class UniqExactSet
{
static_assert(std::is_same_v<typename SingleLevelSet::value_type, typename TwoLevelSet::value_type>);
public:
using value_type = typename SingleLevelSet::value_type;
template <typename Arg, bool use_single_level_hash_table = true>
auto ALWAYS_INLINE insert(Arg && arg)
{
if constexpr (use_single_level_hash_table)
asSingleLevel().insert(std::forward<Arg>(arg));
else
asTwoLevel().insert(std::forward<Arg>(arg));
}
auto merge(const UniqExactSet & other, ThreadPool * thread_pool = nullptr)
{
if (isSingleLevel() && other.isTwoLevel())
convertToTwoLevel();
if (isSingleLevel())
{
asSingleLevel().merge(other.asSingleLevel());
}
else
{
auto & lhs = asTwoLevel();
const auto rhs_ptr = other.getTwoLevelSet();
const auto & rhs = *rhs_ptr;
if (!thread_pool)
{
for (size_t i = 0; i < rhs.NUM_BUCKETS; ++i)
lhs.impls[i].merge(rhs.impls[i]);
}
else
{
auto next_bucket_to_merge = std::make_shared<std::atomic_uint32_t>(0);
auto thread_func = [&lhs, &rhs, next_bucket_to_merge, thread_group = CurrentThread::getGroup()]()
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("UniqExactMerger");
while (true)
{
const auto bucket = next_bucket_to_merge->fetch_add(1);
if (bucket >= rhs.NUM_BUCKETS)
return;
lhs.impls[bucket].merge(rhs.impls[bucket]);
}
};
for (size_t i = 0; i < std::min<size_t>(thread_pool->getMaxThreads(), rhs.NUM_BUCKETS); ++i)
thread_pool->scheduleOrThrowOnError(thread_func);
thread_pool->wait();
}
}
}
void read(ReadBuffer & in) { asSingleLevel().read(in); }
void write(WriteBuffer & out) const
{
if (isSingleLevel())
asSingleLevel().write(out);
else
/// We have to preserve compatibility with the old implementation that used only single level hash sets.
asTwoLevel().writeAsSingleLevel(out);
}
size_t size() const { return isSingleLevel() ? asSingleLevel().size() : asTwoLevel().size(); }
/// To convert set to two level before merging (we cannot just call convertToTwoLevel() on right hand side set, because it is declared const).
std::shared_ptr<TwoLevelSet> getTwoLevelSet() const
{
return two_level_set ? two_level_set : std::make_shared<TwoLevelSet>(asSingleLevel());
}
void convertToTwoLevel()
{
two_level_set = getTwoLevelSet();
single_level_set.clear();
}
bool isSingleLevel() const { return !two_level_set; }
bool isTwoLevel() const { return !!two_level_set; }
private:
SingleLevelSet & asSingleLevel() { return single_level_set; }
const SingleLevelSet & asSingleLevel() const { return single_level_set; }
TwoLevelSet & asTwoLevel() { return *two_level_set; }
const TwoLevelSet & asTwoLevel() const { return *two_level_set; }
SingleLevelSet single_level_set;
std::shared_ptr<TwoLevelSet> two_level_set;
};
}

View File

@ -329,7 +329,7 @@ public:
free();
}
void insert(Value x)
void ALWAYS_INLINE insert(Value x)
{
HashValue hash_value = hash(x);
if (!good(hash_value))

View File

@ -3,6 +3,7 @@
#include <Common/HashTable/Hash.h>
#include <Common/HashTable/HashTable.h>
#include <Common/HashTable/HashTableAllocator.h>
#include <Common/HashTable/TwoLevelHashTable.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
@ -10,6 +11,14 @@
#include <IO/ReadHelpers.h>
#include <IO/VarInt.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/** NOTE HashSet could only be used for memmoveable (position independent) types.
* Example: std::string is not position independent in libstdc++ with C++11 ABI or in libc++.
* Also, key must be of type, that zero bytes is compared equals to zero key.
@ -64,6 +73,47 @@ public:
};
template <
typename Key,
typename TCell, /// Supposed to have no state (HashTableNoState)
typename Hash = DefaultHash<Key>,
typename Grower = TwoLevelHashTableGrower<>,
typename Allocator = HashTableAllocator>
class TwoLevelHashSetTable
: public TwoLevelHashTable<Key, TCell, Hash, Grower, Allocator, HashSetTable<Key, TCell, Hash, Grower, Allocator>>
{
public:
using Self = TwoLevelHashSetTable;
using Base = TwoLevelHashTable<Key, TCell, Hash, Grower, Allocator, HashSetTable<Key, TCell, Hash, Grower, Allocator>>;
using Base::Base;
/// Writes its content in a way that it will be correctly read by HashSetTable.
/// Used by uniqExact to preserve backward compatibility.
void writeAsSingleLevel(DB::WriteBuffer & wb) const
{
DB::writeVarUInt(this->size(), wb);
bool zero_written = false;
for (size_t i = 0; i < Base::NUM_BUCKETS; ++i)
{
if (this->impls[i].hasZero())
{
if (zero_written)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "No more than one zero value expected");
this->impls[i].zeroValue()->write(wb);
zero_written = true;
}
}
static constexpr HashTableNoState state;
for (auto ptr = this->begin(); ptr != this->end(); ++ptr)
if (!ptr.getPtr()->isZero(state))
ptr.getPtr()->write(wb);
}
};
template <typename Key, typename Hash, typename TState = HashTableNoState>
struct HashSetCellWithSavedHash : public HashTableCell<Key, Hash, TState>
{
@ -89,6 +139,13 @@ template <
typename Allocator = HashTableAllocator>
using HashSet = HashSetTable<Key, HashTableCell<Key, Hash>, Hash, Grower, Allocator>;
template <
typename Key,
typename Hash = DefaultHash<Key>,
typename Grower = TwoLevelHashTableGrower<>,
typename Allocator = HashTableAllocator>
using TwoLevelHashSet = TwoLevelHashSetTable<Key, HashTableCell<Key, Hash>, Hash, Grower, Allocator>;
template <typename Key, typename Hash, size_t initial_size_degree>
using HashSetWithStackMemory = HashSet<
Key,

View File

@ -432,20 +432,12 @@ struct AllocatorBufferDeleter<true, Allocator, Cell>
// The HashTable
template
<
typename Key,
typename Cell,
typename Hash,
typename Grower,
typename Allocator
>
class HashTable :
private boost::noncopyable,
protected Hash,
protected Allocator,
protected Cell::State,
protected ZeroValueStorage<Cell::need_zero_value_storage, Cell> /// empty base optimization
template <typename Key, typename Cell, typename Hash, typename Grower, typename Allocator>
class HashTable : private boost::noncopyable,
protected Hash,
protected Allocator,
protected Cell::State,
public ZeroValueStorage<Cell::need_zero_value_storage, Cell> /// empty base optimization
{
public:
// If we use an allocator with inline memory, check that the initial

View File

@ -159,14 +159,16 @@ public:
class const_iterator /// NOLINT
{
Self * container{};
const Self * container{};
size_t bucket{};
typename Impl::const_iterator current_it{};
friend class TwoLevelHashTable;
const_iterator(Self * container_, size_t bucket_, typename Impl::const_iterator current_it_)
: container(container_), bucket(bucket_), current_it(current_it_) {}
const_iterator(const Self * container_, size_t bucket_, typename Impl::const_iterator current_it_)
: container(container_), bucket(bucket_), current_it(current_it_)
{
}
public:
const_iterator() = default;

View File

@ -27,7 +27,7 @@ int main(int, char **)
std::cerr << x.getValue() << std::endl;
DB::WriteBufferFromOwnString wb;
cont.writeText(wb);
cont.write(wb);
std::cerr << "dump: " << wb.str() << std::endl;
}

View File

@ -15,6 +15,17 @@
using namespace DB;
namespace
{
std::vector<UInt64> getVectorWithNumbersUpToN(size_t n)
{
std::vector<UInt64> res(n);
std::iota(res.begin(), res.end(), 0);
return res;
}
}
/// To test dump functionality without using other hashes that can change
template <typename T>
@ -371,3 +382,48 @@ TEST(HashTable, Resize)
ASSERT_EQ(actual, expected);
}
}
using HashSetContent = std::vector<UInt64>;
class TwoLevelHashSetFixture : public ::testing::TestWithParam<HashSetContent>
{
};
TEST_P(TwoLevelHashSetFixture, WriteAsSingleLevel)
{
using Key = UInt64;
{
const auto & hash_set_content = GetParam();
TwoLevelHashSet<Key, HashCRC32<Key>> two_level;
for (const auto & elem : hash_set_content)
two_level.insert(elem);
WriteBufferFromOwnString wb;
two_level.writeAsSingleLevel(wb);
ReadBufferFromString rb(wb.str());
HashSet<Key, HashCRC32<Key>> single_level;
single_level.read(rb);
EXPECT_EQ(single_level.size(), hash_set_content.size());
for (const auto & elem : hash_set_content)
EXPECT_NE(single_level.find(elem), nullptr);
}
}
INSTANTIATE_TEST_SUITE_P(
TwoLevelHashSetTests,
TwoLevelHashSetFixture,
::testing::Values(
HashSetContent{},
getVectorWithNumbersUpToN(1),
getVectorWithNumbersUpToN(100),
getVectorWithNumbersUpToN(1000),
getVectorWithNumbersUpToN(10000),
getVectorWithNumbersUpToN(100000),
getVectorWithNumbersUpToN(1000000)));

View File

@ -2508,6 +2508,8 @@ void NO_INLINE Aggregator::mergeDataOnlyExistingKeysImpl(
void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const
{
ThreadPool thread_pool{params.max_threads};
AggregatedDataVariantsPtr & res = non_empty_data[0];
/// We merge all aggregation results to the first.
@ -2517,7 +2519,15 @@ void NO_INLINE Aggregator::mergeWithoutKeyDataImpl(
AggregatedDataWithoutKey & current_data = non_empty_data[result_num]->without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res_data + offsets_of_aggregate_states[i], current_data + offsets_of_aggregate_states[i], res->aggregates_pool);
if (aggregate_functions[i]->isAbleToParallelizeMerge())
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i],
current_data + offsets_of_aggregate_states[i],
thread_pool,
res->aggregates_pool);
else
aggregate_functions[i]->merge(
res_data + offsets_of_aggregate_states[i], current_data + offsets_of_aggregate_states[i], res->aggregates_pool);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(current_data + offsets_of_aggregate_states[i]);

View File

@ -0,0 +1,228 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
with_zookeeper=False,
image="yandex/clickhouse-server",
tag="19.16.9.37",
stay_alive=True,
with_installed_binary=True,
)
node2 = cluster.add_instance(
"node2",
with_zookeeper=False,
image="yandex/clickhouse-server",
tag="19.16.9.37",
stay_alive=True,
with_installed_binary=True,
)
node3 = cluster.add_instance("node3", with_zookeeper=False)
node4 = cluster.add_instance("node4", with_zookeeper=False)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
# We will test that serialization of internal state of "avg" function is compatible between different versions.
# TODO Implement versioning of serialization format for aggregate function states.
# NOTE This test is too ad-hoc.
def test_backward_compatability_for_avg(start_cluster):
node1.query("create table tab (x UInt64) engine = Memory")
node2.query("create table tab (x UInt64) engine = Memory")
node3.query("create table tab (x UInt64) engine = Memory")
node4.query("create table tab (x UInt64) engine = Memory")
node1.query("INSERT INTO tab VALUES (1)")
node2.query("INSERT INTO tab VALUES (2)")
node3.query("INSERT INTO tab VALUES (3)")
node4.query("INSERT INTO tab VALUES (4)")
assert (
node1.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
assert (
node2.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
assert (
node3.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
assert (
node4.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
# Also check with persisted aggregate function state
node1.query("create table state (x AggregateFunction(avg, UInt64)) engine = Log")
node1.query(
"INSERT INTO state SELECT avgState(arrayJoin(CAST([1, 2, 3, 4] AS Array(UInt64))))"
)
assert node1.query("SELECT avgMerge(x) FROM state") == "2.5\n"
node1.restart_with_latest_version(fix_metadata=True)
assert node1.query("SELECT avgMerge(x) FROM state") == "2.5\n"
node1.query("drop table tab")
node1.query("drop table state")
node2.query("drop table tab")
node3.query("drop table tab")
node4.query("drop table tab")
@pytest.mark.parametrize("uniq_keys", [1000, 500000])
def test_backward_compatability_for_uniq_exact(start_cluster, uniq_keys):
node1.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64) Engine = Memory")
node2.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64) Engine = Memory")
node3.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64) Engine = Memory")
node4.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64) Engine = Memory")
node1.query(
f"INSERT INTO tab_{uniq_keys} SELECT number FROM numbers_mt(0, {uniq_keys})"
)
node2.query(
f"INSERT INTO tab_{uniq_keys} SELECT number FROM numbers_mt(1, {uniq_keys})"
)
node3.query(
f"INSERT INTO tab_{uniq_keys} SELECT number FROM numbers_mt(2, {uniq_keys})"
)
node4.query(
f"INSERT INTO tab_{uniq_keys} SELECT number FROM numbers_mt(3, {uniq_keys})"
)
assert (
node1.query(
f"SELECT uniqExact(x) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node2.query(
f"SELECT uniqExact(x) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node3.query(
f"SELECT uniqExact(x) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node4.query(
f"SELECT uniqExact(x) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
# Also check with persisted aggregate function state
node1.query(
f"CREATE TABLE state_{uniq_keys} (x AggregateFunction(uniqExact, UInt64)) Engine = Log"
)
node1.query(
f"INSERT INTO state_{uniq_keys} SELECT uniqExactState(number) FROM numbers_mt({uniq_keys})"
)
assert (
node1.query(f"SELECT uniqExactMerge(x) FROM state_{uniq_keys}")
== f"{uniq_keys}\n"
)
node1.restart_with_latest_version()
assert (
node1.query(f"SELECT uniqExactMerge(x) FROM state_{uniq_keys}")
== f"{uniq_keys}\n"
)
node1.query(f"DROP TABLE state_{uniq_keys}")
node1.query(f"DROP TABLE tab_{uniq_keys}")
node2.query(f"DROP TABLE tab_{uniq_keys}")
node3.query(f"DROP TABLE tab_{uniq_keys}")
node4.query(f"DROP TABLE tab_{uniq_keys}")
@pytest.mark.parametrize("uniq_keys", [1000, 500000])
def test_backward_compatability_for_uniq_exact_variadic(start_cluster, uniq_keys):
node1.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64, y UInt64) Engine = Memory")
node2.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64, y UInt64) Engine = Memory")
node3.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64, y UInt64) Engine = Memory")
node4.query(f"CREATE TABLE tab_{uniq_keys} (x UInt64, y UInt64) Engine = Memory")
node1.query(
f"INSERT INTO tab_{uniq_keys} SELECT number, number/2 FROM numbers_mt(0, {uniq_keys})"
)
node2.query(
f"INSERT INTO tab_{uniq_keys} SELECT number, number/2 FROM numbers_mt(1, {uniq_keys})"
)
node3.query(
f"INSERT INTO tab_{uniq_keys} SELECT number, number/2 FROM numbers_mt(2, {uniq_keys})"
)
node4.query(
f"INSERT INTO tab_{uniq_keys} SELECT number, number/2 FROM numbers_mt(3, {uniq_keys})"
)
assert (
node1.query(
f"SELECT uniqExact(x, y) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node2.query(
f"SELECT uniqExact(x, y) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node3.query(
f"SELECT uniqExact(x, y) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
assert (
node4.query(
f"SELECT uniqExact(x, y) FROM remote('node{{1..4}}', default, tab_{uniq_keys})"
)
== f"{uniq_keys + 3}\n"
)
# Also check with persisted aggregate function state
node1.query(
f"CREATE TABLE state_{uniq_keys} (x AggregateFunction(uniqExact, UInt64, UInt64)) Engine = Log"
)
node1.query(
f"INSERT INTO state_{uniq_keys} SELECT uniqExactState(number, intDiv(number,2)) FROM numbers_mt({uniq_keys})"
)
assert (
node1.query(f"SELECT uniqExactMerge(x) FROM state_{uniq_keys}")
== f"{uniq_keys}\n"
)
node1.restart_with_latest_version()
assert (
node1.query(f"SELECT uniqExactMerge(x) FROM state_{uniq_keys}")
== f"{uniq_keys}\n"
)
node1.query(f"DROP TABLE state_{uniq_keys}")
node1.query(f"DROP TABLE tab_{uniq_keys}")
node2.query(f"DROP TABLE tab_{uniq_keys}")
node3.query(f"DROP TABLE tab_{uniq_keys}")
node4.query(f"DROP TABLE tab_{uniq_keys}")

View File

@ -1,82 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
with_zookeeper=False,
image="yandex/clickhouse-server",
tag="19.16.9.37",
stay_alive=True,
with_installed_binary=True,
)
node2 = cluster.add_instance(
"node2",
with_zookeeper=False,
image="yandex/clickhouse-server",
tag="19.16.9.37",
stay_alive=True,
with_installed_binary=True,
)
node3 = cluster.add_instance("node3", with_zookeeper=False)
node4 = cluster.add_instance("node4", with_zookeeper=False)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
# We will test that serialization of internal state of "avg" function is compatible between different versions.
# TODO Implement versioning of serialization format for aggregate function states.
# NOTE This test is too ad-hoc.
def test_backward_compatability(start_cluster):
node1.query("create table tab (x UInt64) engine = Memory")
node2.query("create table tab (x UInt64) engine = Memory")
node3.query("create table tab (x UInt64) engine = Memory")
node4.query("create table tab (x UInt64) engine = Memory")
node1.query("INSERT INTO tab VALUES (1)")
node2.query("INSERT INTO tab VALUES (2)")
node3.query("INSERT INTO tab VALUES (3)")
node4.query("INSERT INTO tab VALUES (4)")
assert (
node1.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
assert (
node2.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
assert (
node3.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
assert (
node4.query("SELECT avg(x) FROM remote('node{1..4}', default, tab)") == "2.5\n"
)
# Also check with persisted aggregate function state
node1.query("create table state (x AggregateFunction(avg, UInt64)) engine = Log")
node1.query(
"INSERT INTO state SELECT avgState(arrayJoin(CAST([1, 2, 3, 4] AS Array(UInt64))))"
)
assert node1.query("SELECT avgMerge(x) FROM state") == "2.5\n"
node1.restart_with_latest_version(fix_metadata=True)
assert node1.query("SELECT avgMerge(x) FROM state") == "2.5\n"
node1.query("drop table tab")
node1.query("drop table state")
node2.query("drop table tab")
node3.query("drop table tab")
node4.query("drop table tab")

View File

@ -0,0 +1,33 @@
<test>
<substitutions>
<substitution>
<name>uniq_keys</name>
<values>
<value>10000</value>
<value>50000</value>
<value>100000</value>
<value>250000</value>
<value>500000</value>
<value>1000000</value>
</values>
</substitution>
</substitutions>
<create_query>create table t_{uniq_keys}(a UInt64) engine=MergeTree order by tuple()</create_query>
<fill_query>insert into t_{uniq_keys} select number % {uniq_keys} from numbers_mt(5e7)</fill_query>
<query>SELECT count(distinct a) FROM t_{uniq_keys} GROUP BY a FORMAT Null</query>
<query>SELECT uniqExact(a) FROM t_{uniq_keys} GROUP BY a FORMAT Null</query>
<query>SELECT count(distinct a) FROM t_{uniq_keys}</query>
<query>SELECT uniqExact(a) FROM t_{uniq_keys}</query>
<query>SELECT uniqExact(number) from numbers_mt(1e7)</query>
<query>SELECT uniqExact(number) from numbers_mt(5e7)</query>
<query>SELECT uniqExact(number, number) from numbers_mt(5e6)</query>
<query>SELECT uniqExact(number, number) from numbers_mt(1e7)</query>
<drop_query>drop table t_{uniq_keys}</drop_query>
</test>