This commit is contained in:
Sofia Antipushina 2020-05-14 22:37:53 +03:00
parent 4739b87732
commit 6e2b93e5af

View File

@ -11,24 +11,35 @@
namespace DB
{
namespace ErrorCodes
{
namespace ErrorCodes {
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
struct AggregateFunctionDistinctData {
using Key = UInt128;
HashSet<
Key,
UInt128TrivialHash,
HashTableGrower<3>,
HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 3)>
> data;
std::mutex mutex;
bool ALWAYS_INLINE TryToInsert(const Key& key) {
std::lock_guard lock(mutex);
return data.insert(key).second;
}
};
/** Adaptor for aggregate functions.
* Adding -Distinct suffix to aggregate function
**/
class AggregateFunctionDistinct final : public IAggregateFunctionHelper<AggregateFunctionDistinct> {
private:
mutable std::mutex mutex;
AggregateFunctionPtr nested_func;
mutable HashSet<
UInt128,
UInt128TrivialHash,
HashTableGrower<3>,
HashTableAllocatorWithStackMemory<sizeof(UInt128) * (1 << 3)>> storage;
mutable AggregateFunctionDistinctData storage;
public:
AggregateFunctionDistinct(AggregateFunctionPtr nested, const DataTypes & arguments)
@ -71,17 +82,14 @@ public:
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const override {
UInt128 key;
SipHash hash;
columns[0]->updateHashWithValue(row_num, hash);
UInt128 key;
hash.get128(key.low, key.high);
{
std::lock_guard lock(mutex);
if (!storage.insert(key).second) {
return;
}
}
nested_func->add(place, columns, row_num, arena);
if (storage.TryToInsert(key))
nested_func->add(place, columns, row_num, arena);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const override {