ClickHouse/src/Interpreters/Aggregator.h

1431 lines
56 KiB
C++
Raw Normal View History

2011-09-19 01:42:16 +00:00
#pragma once
2015-01-08 18:52:48 +00:00
#include <mutex>
#include <memory>
2015-04-16 14:27:56 +00:00
#include <functional>
2012-03-05 07:58:34 +00:00
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2021-10-02 07:13:14 +00:00
#include <base/StringRef.h>
#include <Common/Arena.h>
A Proper lookup table that uses HashTable's API This is the first step of allowing heterogeneous cells in hash tables. performance test results are ``` 1. HashMap<UInt16, UInt8, TrivialHash, HashTableFixedGrower<16>>; 2. NewLookupMap<UInt16, UInt8> ResolutionWidth 30000 1 .................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................223550276.46 ResolutionWidth 30000 2 .................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................248772721.24 Best: 2 - 24877272124 ResolutionWidth 100000 1 ..........................................................................................................................................................................................................................................................238498413.99 ResolutionWidth 100000 2 ..........................................................................................................................................................................................................................................................261808889.98 Best: 2 - 26180888998 ResolutionWidth 300000 1 ...................................................................................239307348.81 ResolutionWidth 300000 2 ...................................................................................257592761.30 Best: 2 - 25759276130 ResolutionWidth 1000000 1 .........................240144759.26 ResolutionWidth 1000000 2 .........................257093531.91 Best: 2 - 25709353191 ResolutionWidth 5000000 1 .....241573260.35 ResolutionWidth 5000000 2 .....259314162.79 Best: 2 - 25931416279 ResolutionDepth 30000 1 .................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................217108119.84 ResolutionDepth 30000 2 .................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................249459504.41 Best: 2 - 24945950441 ResolutionDepth 100000 1 ..........................................................................................................................................................................................................................................................229065162.17 ResolutionDepth 100000 2 ..........................................................................................................................................................................................................................................................253769105.64 Best: 2 - 25376910564 ResolutionDepth 300000 1 ...................................................................................233079225.18 ResolutionDepth 300000 2 ...................................................................................256316273.78 Best: 2 - 25631627378 ResolutionDepth 1000000 1 .........................234184633.51 ResolutionDepth 1000000 2 .........................261100491.57 Best: 2 - 26110049157 ResolutionDepth 5000000 1 .....233118795.66 ResolutionDepth 5000000 2 .....252436160.41 Best: 2 - 25243616041 ```
2019-02-28 09:35:38 +00:00
#include <Common/HashTable/FixedHashMap.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/TwoLevelHashMap.h>
#include <Common/HashTable/StringHashMap.h>
#include <Common/HashTable/TwoLevelStringHashMap.h>
#include <Common/ThreadPool.h>
2019-01-21 10:39:24 +00:00
#include <Common/ColumnsHashing.h>
#include <Common/assert_cast.h>
2019-10-01 18:51:33 +00:00
#include <Common/filesystemHelpers.h>
#include <Core/ColumnNumbers.h>
2021-10-15 20:18:20 +00:00
#include <QueryPipeline/SizeLimits.h>
#include <Disks/SingleDiskVolume.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/JIT/compileFunction.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnLowCardinality.h>
#include <Parsers/IAST_fwd.h>
2011-09-19 01:42:16 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
}
2017-06-02 21:37:28 +00:00
/** Different data structures that can be used for aggregation
* For efficiency, the aggregation data itself is put into the pool.
* Data and pool ownership (states of aggregate functions)
* is acquired later - in `convertToBlocks` function, by the ColumnAggregateFunction object.
*
2017-06-02 21:37:28 +00:00
* Most data structures exist in two versions: normal and two-level (TwoLevel).
* A two-level hash table works a little slower with a small number of different keys,
* but with a large number of different keys scales better, because it allows
* parallelize some operations (merging, post-processing) in a natural way.
*
2017-06-02 21:37:28 +00:00
* To ensure efficient work over a wide range of conditions,
* first single-level hash tables are used,
* and when the number of different keys is large enough,
* they are converted to two-level ones.
*
2017-06-02 21:37:28 +00:00
* PS. There are many different approaches to the effective implementation of parallel and distributed aggregation,
* best suited for different cases, and this approach is just one of them, chosen for a combination of reasons.
2011-12-19 08:06:31 +00:00
*/
using AggregatedDataWithoutKey = AggregateDataPtr;
using AggregatedDataWithUInt8Key = FixedImplicitZeroHashMapWithCalculatedSize<UInt8, AggregateDataPtr>;
2020-07-30 21:54:54 +00:00
using AggregatedDataWithUInt16Key = FixedImplicitZeroHashMap<UInt16, AggregateDataPtr>;
2020-03-23 13:44:21 +00:00
using AggregatedDataWithUInt32Key = HashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
using AggregatedDataWithUInt64Key = HashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>;
using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDataPtr>;
using AggregatedDataWithKeys128 = HashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
2021-01-27 00:54:57 +00:00
using AggregatedDataWithKeys256 = HashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
2020-03-23 13:44:21 +00:00
using AggregatedDataWithUInt32KeyTwoLevel = TwoLevelHashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>;
using AggregatedDataWithUInt64KeyTwoLevel = TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
using AggregatedDataWithShortStringKeyTwoLevel = TwoLevelStringHashMap<AggregateDataPtr>;
using AggregatedDataWithStringKeyTwoLevel = TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr>;
using AggregatedDataWithKeys128TwoLevel = TwoLevelHashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
2021-01-27 00:54:57 +00:00
using AggregatedDataWithKeys256TwoLevel = TwoLevelHashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
2011-09-26 07:25:22 +00:00
/** Variants with better hash function, using more than 32 bits for hash.
* Using for merging phase of external aggregation, where number of keys may be far greater than 4 billion,
* but we keep in memory and merge only sub-partition of them simultaneously.
* TODO We need to switch for better hash function not only for external aggregation,
* but also for huge aggregation results on machines with terabytes of RAM.
*/
using AggregatedDataWithUInt64KeyHash64 = HashMap<UInt64, AggregateDataPtr, DefaultHash<UInt64>>;
using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash<StringRef, AggregateDataPtr, StringRefHash64>;
using AggregatedDataWithKeys128Hash64 = HashMap<UInt128, AggregateDataPtr, UInt128Hash>;
2021-01-27 00:54:57 +00:00
using AggregatedDataWithKeys256Hash64 = HashMap<UInt256, AggregateDataPtr, UInt256Hash>;
template <typename Base>
struct AggregationDataWithNullKey : public Base
{
using Base::Base;
bool & hasNullKeyData() { return has_null_key_data; }
AggregateDataPtr & getNullKeyData() { return null_key_data; }
bool hasNullKeyData() const { return has_null_key_data; }
2018-12-07 20:31:59 +00:00
const AggregateDataPtr & getNullKeyData() const { return null_key_data; }
size_t size() const { return Base::size() + (has_null_key_data ? 1 : 0); }
bool empty() const { return Base::empty() && !has_null_key_data; }
void clear()
{
Base::clear();
has_null_key_data = false;
}
void clearAndShrink()
{
Base::clearAndShrink();
has_null_key_data = false;
}
private:
bool has_null_key_data = false;
AggregateDataPtr null_key_data = nullptr;
};
template <typename Base>
struct AggregationDataWithNullKeyTwoLevel : public Base
{
using Base::Base;
using Base::impls;
2019-07-28 17:46:48 +00:00
2021-01-22 09:13:22 +00:00
AggregationDataWithNullKeyTwoLevel() = default;
template <typename Other>
explicit AggregationDataWithNullKeyTwoLevel(const Other & other) : Base(other)
{
impls[0].hasNullKeyData() = other.hasNullKeyData();
impls[0].getNullKeyData() = other.getNullKeyData();
}
bool & hasNullKeyData() { return impls[0].hasNullKeyData(); }
AggregateDataPtr & getNullKeyData() { return impls[0].getNullKeyData(); }
bool hasNullKeyData() const { return impls[0].hasNullKeyData(); }
2018-12-07 20:31:59 +00:00
const AggregateDataPtr & getNullKeyData() const { return impls[0].getNullKeyData(); }
};
template <typename ... Types>
using HashTableWithNullKey = AggregationDataWithNullKey<HashMapTable<Types ...>>;
template <typename ... Types>
using StringHashTableWithNullKey = AggregationDataWithNullKey<StringHashMap<Types ...>>;
using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey<AggregatedDataWithUInt8Key>;
using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey<AggregatedDataWithUInt16Key>;
using AggregatedDataWithNullableUInt64Key = AggregationDataWithNullKey<AggregatedDataWithUInt64Key>;
using AggregatedDataWithNullableStringKey = AggregationDataWithNullKey<AggregatedDataWithStringKey>;
using AggregatedDataWithNullableUInt64KeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>,
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
using AggregatedDataWithNullableShortStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
TwoLevelStringHashMap<AggregateDataPtr, HashTableAllocator, StringHashTableWithNullKey>>;
using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTwoLevel<
TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr, DefaultHash<StringRef>,
TwoLevelHashTableGrower<>, HashTableAllocator, HashTableWithNullKey>>;
2014-10-29 02:35:16 +00:00
2017-06-02 21:37:28 +00:00
/// For the case where there is one numeric key.
/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
template <typename FieldType, typename TData,
bool consecutive_keys_optimization = true>
struct AggregationMethodOneNumber
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
2021-01-22 09:13:22 +00:00
AggregationMethodOneNumber() = default;
explicit AggregationMethodOneNumber(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodOneNumber(const Other & other) : data(other.data)
{
}
2017-06-02 21:37:28 +00:00
/// To use one `Method` in different threads, use different `State`.
using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type,
Mapped, FieldType, consecutive_keys_optimization>;
/// Use optimization for low cardinality.
2018-09-21 09:49:43 +00:00
static const bool low_cardinality_optimization = false;
2021-03-10 11:00:24 +00:00
/// Shuffle key columns before `insertKeyIntoColumns` call if needed.
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
2019-01-21 10:39:24 +00:00
// Insert the key from the hash table into columns.
2021-03-10 11:00:24 +00:00
static void insertKeyIntoColumns(const Key & key, std::vector<IColumn *> & key_columns, const Sizes & /*key_sizes*/)
{
2021-01-22 09:13:22 +00:00
const auto * key_holder = reinterpret_cast<const char *>(&key);
2021-03-10 11:00:24 +00:00
auto * column = static_cast<ColumnVectorHelper *>(key_columns[0]);
column->insertRawData<sizeof(FieldType)>(key_holder);
}
};
2017-06-02 21:37:28 +00:00
/// For the case where there is one string key.
template <typename TData>
struct AggregationMethodString
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
2021-01-22 09:13:22 +00:00
AggregationMethodString() = default;
template <typename Other>
explicit AggregationMethodString(const Other & other) : data(other.data)
{
}
explicit AggregationMethodString(size_t size_hint) : data(size_hint) { }
2019-01-21 10:39:53 +00:00
using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped>;
2018-09-21 09:49:43 +00:00
static const bool low_cardinality_optimization = false;
2021-03-10 11:00:24 +00:00
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(const StringRef & key, std::vector<IColumn *> & key_columns, const Sizes &)
{
2021-03-10 11:00:24 +00:00
static_cast<ColumnString *>(key_columns[0])->insertData(key.data, key.size);
}
};
/// Same as above but without cache
template <typename TData>
struct AggregationMethodStringNoCache
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
2021-01-22 09:13:22 +00:00
AggregationMethodStringNoCache() = default;
explicit AggregationMethodStringNoCache(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodStringNoCache(const Other & other) : data(other.data)
{
}
using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped, true, false>;
static const bool low_cardinality_optimization = false;
2021-03-10 11:00:24 +00:00
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(const StringRef & key, std::vector<IColumn *> & key_columns, const Sizes &)
{
2021-03-10 11:00:24 +00:00
static_cast<ColumnString *>(key_columns[0])->insertData(key.data, key.size);
}
};
2017-06-02 21:37:28 +00:00
/// For the case where there is one fixed-length string key.
template <typename TData>
struct AggregationMethodFixedString
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
2021-01-22 09:13:22 +00:00
AggregationMethodFixedString() = default;
explicit AggregationMethodFixedString(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodFixedString(const Other & other) : data(other.data)
{
}
2019-01-21 10:39:53 +00:00
using State = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped>;
2018-09-21 09:49:43 +00:00
static const bool low_cardinality_optimization = false;
2021-03-10 11:00:24 +00:00
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(const StringRef & key, std::vector<IColumn *> & key_columns, const Sizes &)
{
2021-03-10 11:00:24 +00:00
static_cast<ColumnFixedString *>(key_columns[0])->insertData(key.data, key.size);
}
};
/// Same as above but without cache
template <typename TData>
struct AggregationMethodFixedStringNoCache
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
2021-01-22 09:13:22 +00:00
AggregationMethodFixedStringNoCache() = default;
explicit AggregationMethodFixedStringNoCache(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodFixedStringNoCache(const Other & other) : data(other.data)
{
}
using State = ColumnsHashing::HashMethodFixedString<typename Data::value_type, Mapped, true, false>;
static const bool low_cardinality_optimization = false;
2021-03-10 11:00:24 +00:00
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(const StringRef & key, std::vector<IColumn *> & key_columns, const Sizes &)
{
2021-03-10 11:00:24 +00:00
static_cast<ColumnFixedString *>(key_columns[0])->insertData(key.data, key.size);
}
};
/// Single low cardinality column.
template <typename SingleColumnMethod>
struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
{
using Base = SingleColumnMethod;
using BaseState = typename Base::State;
using Data = typename Base::Data;
using Key = typename Base::Key;
using Mapped = typename Base::Mapped;
using Base::data;
AggregationMethodSingleLowCardinalityColumn() = default;
template <typename Other>
explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {}
2019-01-21 10:39:53 +00:00
using State = ColumnsHashing::HashMethodSingleLowCardinalityColumn<BaseState, Mapped, true>;
static const bool low_cardinality_optimization = true;
2021-03-10 11:00:24 +00:00
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(const Key & key,
2021-03-10 11:00:24 +00:00
std::vector<IColumn *> & key_columns_low_cardinality, const Sizes & /*key_sizes*/)
{
2021-03-10 11:00:24 +00:00
auto * col = assert_cast<ColumnLowCardinality *>(key_columns_low_cardinality[0]);
if constexpr (std::is_same_v<Key, StringRef>)
{
col->insertData(key.data, key.size);
}
else
{
col->insertData(reinterpret_cast<const char *>(&key), sizeof(key));
}
}
};
2017-06-02 21:37:28 +00:00
/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits.
2020-03-23 13:44:21 +00:00
template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false, bool use_cache = true>
struct AggregationMethodKeysFixed
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
static constexpr bool has_nullable_keys = has_nullable_keys_;
static constexpr bool has_low_cardinality = has_low_cardinality_;
Data data;
2021-01-22 09:13:22 +00:00
AggregationMethodKeysFixed() = default;
explicit AggregationMethodKeysFixed(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodKeysFixed(const Other & other) : data(other.data)
{
}
using State = ColumnsHashing::HashMethodKeysFixed<
typename Data::value_type,
Key,
Mapped,
has_nullable_keys,
has_low_cardinality,
use_cache>;
2018-09-21 09:49:43 +00:00
static const bool low_cardinality_optimization = false;
2021-03-10 11:00:24 +00:00
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> & key_columns, const Sizes & key_sizes)
2021-02-26 08:52:50 +00:00
{
2021-03-10 11:00:24 +00:00
return State::shuffleKeyColumns(key_columns, key_sizes);
2021-02-26 08:52:50 +00:00
}
static void insertKeyIntoColumns(const Key & key, std::vector<IColumn *> & key_columns, const Sizes & key_sizes)
{
size_t keys_size = key_columns.size();
static constexpr auto bitmap_size = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
/// In any hash key value, column values to be read start just after the bitmap, if it exists.
size_t pos = bitmap_size;
for (size_t i = 0; i < keys_size; ++i)
{
IColumn * observed_column;
ColumnUInt8 * null_map;
2019-07-02 12:31:20 +00:00
bool column_nullable = false;
if constexpr (has_nullable_keys)
column_nullable = isColumnNullable(*key_columns[i]);
/// If we have a nullable column, get its nested column and its null map.
2019-07-02 12:31:20 +00:00
if (column_nullable)
{
ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*key_columns[i]);
observed_column = &nullable_col.getNestedColumn();
null_map = assert_cast<ColumnUInt8 *>(&nullable_col.getNullMapColumn());
}
else
{
2021-02-26 08:52:50 +00:00
observed_column = key_columns[i];
null_map = nullptr;
}
2019-07-02 12:31:20 +00:00
bool is_null = false;
if (column_nullable)
{
/// The current column is nullable. Check if the value of the
/// corresponding key is nullable. Update the null map accordingly.
size_t bucket = i / 8;
size_t offset = i % 8;
UInt8 val = (reinterpret_cast<const UInt8 *>(&key)[bucket] >> offset) & 1;
null_map->insertValue(val);
is_null = val == 1;
}
if (has_nullable_keys && is_null)
observed_column->insertDefault();
else
{
size_t size = key_sizes[i];
observed_column->insertData(reinterpret_cast<const char *>(&key) + pos, size);
pos += size;
}
}
}
};
2017-06-02 21:37:28 +00:00
/** Aggregates by concatenating serialized key values.
* The serialized value differs in that it uniquely allows to deserialize it, having only the position with which it starts.
* That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
* Therefore, when aggregating by several strings, there is no ambiguity.
*/
template <typename TData>
struct AggregationMethodSerialized
{
using Data = TData;
using Key = typename Data::key_type;
using Mapped = typename Data::mapped_type;
Data data;
2021-01-22 09:13:22 +00:00
AggregationMethodSerialized() = default;
explicit AggregationMethodSerialized(size_t size_hint) : data(size_hint) { }
template <typename Other>
explicit AggregationMethodSerialized(const Other & other) : data(other.data)
{
}
2019-01-21 10:39:53 +00:00
using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped>;
2018-09-21 09:49:43 +00:00
static const bool low_cardinality_optimization = false;
2021-03-10 11:00:24 +00:00
std::optional<Sizes> shuffleKeyColumns(std::vector<IColumn *> &, const Sizes &) { return {}; }
static void insertKeyIntoColumns(const StringRef & key, std::vector<IColumn *> & key_columns, const Sizes &)
{
2021-01-22 09:13:22 +00:00
const auto * pos = key.data;
2019-01-21 10:39:24 +00:00
for (auto & column : key_columns)
pos = column->deserializeAndInsertFromArena(pos);
}
};
class Aggregator;
2011-09-26 07:25:22 +00:00
2019-01-21 10:39:24 +00:00
using ColumnsHashing::HashMethodContext;
using ColumnsHashing::HashMethodContextPtr;
struct AggregatedDataVariants : private boost::noncopyable
2011-09-26 07:25:22 +00:00
{
2017-06-02 21:37:28 +00:00
/** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way:
* - when aggregating, states are created in the pool using IAggregateFunction::create (inside - `placement new` of arbitrary structure);
* - they must then be destroyed using IAggregateFunction::destroy (inside - calling the destructor of arbitrary structure);
* - if aggregation is complete, then, in the Aggregator::convertToBlocks function, pointers to the states of aggregate functions
* are written to ColumnAggregateFunction; ColumnAggregateFunction "acquires ownership" of them, that is - calls `destroy` in its destructor.
* - if during the aggregation, before call to Aggregator::convertToBlocks, an exception was thrown,
* then the states of aggregate functions must still be destroyed,
* otherwise, for complex states (eg, AggregateFunctionUniq), there will be memory leaks;
* - in this case, to destroy states, the destructor calls Aggregator::destroyAggregateStates method,
* but only if the variable aggregator (see below) is not nullptr;
* - that is, until you transfer ownership of the aggregate function states in the ColumnAggregateFunction, set the variable `aggregator`,
* so that when an exception occurs, the states are correctly destroyed.
*
2017-06-02 21:37:28 +00:00
* PS. This can be corrected by making a pool that knows about which states of aggregate functions and in which order are put in it, and knows how to destroy them.
* But this can hardly be done simply because it is planned to put variable-length strings into the same pool.
* In this case, the pool will not be able to know with what offsets objects are stored.
*/
const Aggregator * aggregator = nullptr;
2019-04-22 16:07:09 +00:00
size_t keys_size{}; /// Number of keys. NOTE do we need this field?
2017-06-02 21:37:28 +00:00
Sizes key_sizes; /// Dimensions of keys, if keys of fixed length
2017-06-02 21:37:28 +00:00
/// Pools for states of aggregate functions. Ownership will be later transferred to ColumnAggregateFunction.
Arenas aggregates_pools;
2019-04-22 16:07:09 +00:00
Arena * aggregates_pool{}; /// The pool that is currently used for allocation.
2017-06-02 21:37:28 +00:00
/** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by.
*/
AggregatedDataWithoutKey without_key = nullptr;
// Disable consecutive key optimization for Uint8/16, because they use a FixedHashMap
// and the lookup there is almost free, so we don't need to cache the last lookup result
std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>> key8;
std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>> key16;
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>> key32;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>> key64;
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKey>> key_string;
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKey>> key_fixed_string;
2020-03-23 13:44:21 +00:00
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt16Key, false, false, false>> keys16;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt32Key>> keys32;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt64Key>> keys64;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized;
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level;
std::unique_ptr<AggregationMethodStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_string_two_level;
std::unique_ptr<AggregationMethodFixedStringNoCache<AggregatedDataWithShortStringKeyTwoLevel>> key_fixed_string_two_level;
2020-03-23 13:44:21 +00:00
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt32KeyTwoLevel>> keys32_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithUInt64KeyTwoLevel>> keys64_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level;
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>> key64_hash64;
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyHash64>> key_string_hash64;
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyHash64>> key_fixed_string_hash64;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128Hash64>> keys128_hash64;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256Hash64>> keys256_hash64;
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>> serialized_hash64;
/// Support for nullable keys.
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, true>> nullable_keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, true>> nullable_keys256;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>> nullable_keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>> nullable_keys256_two_level;
/// Support for low cardinality.
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key, false>>> low_cardinality_key8;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key, false>>> low_cardinality_key16;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key32;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>> low_cardinality_key64;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKey>>> low_cardinality_key_fixed_string;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key32_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyTwoLevel>>> low_cardinality_key64_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_string_two_level;
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithNullableStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>> low_cardinality_keys128;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>> low_cardinality_keys256;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, false, true>> low_cardinality_keys128_two_level;
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, false, true>> low_cardinality_keys256_two_level;
2017-06-02 21:37:28 +00:00
/// In this and similar macros, the option without_key is not considered.
#define APPLY_FOR_AGGREGATED_VARIANTS(M) \
M(key8, false) \
M(key16, false) \
M(key32, false) \
M(key64, false) \
M(key_string, false) \
M(key_fixed_string, false) \
2020-03-23 13:44:21 +00:00
M(keys16, false) \
M(keys32, false) \
M(keys64, false) \
M(keys128, false) \
M(keys256, false) \
M(serialized, false) \
M(key32_two_level, true) \
M(key64_two_level, true) \
M(key_string_two_level, true) \
M(key_fixed_string_two_level, true) \
2020-03-23 13:44:21 +00:00
M(keys32_two_level, true) \
M(keys64_two_level, true) \
M(keys128_two_level, true) \
M(keys256_two_level, true) \
M(serialized_two_level, true) \
M(key64_hash64, false) \
M(key_string_hash64, false) \
M(key_fixed_string_hash64, false) \
M(keys128_hash64, false) \
M(keys256_hash64, false) \
M(serialized_hash64, false) \
M(nullable_keys128, false) \
M(nullable_keys256, false) \
M(nullable_keys128_two_level, true) \
M(nullable_keys256_two_level, true) \
M(low_cardinality_key8, false) \
M(low_cardinality_key16, false) \
M(low_cardinality_key32, false) \
M(low_cardinality_key64, false) \
M(low_cardinality_keys128, false) \
M(low_cardinality_keys256, false) \
M(low_cardinality_key_string, false) \
M(low_cardinality_key_fixed_string, false) \
M(low_cardinality_key32_two_level, true) \
M(low_cardinality_key64_two_level, true) \
M(low_cardinality_keys128_two_level, true) \
M(low_cardinality_keys256_two_level, true) \
M(low_cardinality_key_string_two_level, true) \
M(low_cardinality_key_fixed_string_two_level, true) \
enum class Type
{
EMPTY = 0,
without_key,
#define M(NAME, IS_TWO_LEVEL) NAME,
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
};
Type type = Type::EMPTY;
AggregatedDataVariants() : aggregates_pools(1, std::make_shared<Arena>()), aggregates_pool(aggregates_pools.back().get()) {}
bool empty() const { return type == Type::EMPTY; }
void invalidate() { type = Type::EMPTY; }
~AggregatedDataVariants();
void init(Type type_, std::optional<size_t> size_hint = std::nullopt);
2017-06-02 21:37:28 +00:00
/// Number of rows (different keys).
size_t size() const
{
switch (type)
{
case Type::EMPTY: return 0;
case Type::without_key: return 1;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: return (NAME)->data.size() + (without_key != nullptr);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
2017-06-02 21:37:28 +00:00
/// The size without taking into account the row in which data is written for the calculation of TOTALS.
size_t sizeWithoutOverflowRow() const
{
switch (type)
{
case Type::EMPTY: return 0;
case Type::without_key: return 1;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: return (NAME)->data.size();
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
const char * getMethodName() const
{
switch (type)
{
case Type::EMPTY: return "EMPTY";
case Type::without_key: return "without_key";
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: return #NAME;
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
bool isTwoLevel() const
{
switch (type)
{
case Type::EMPTY: return false;
case Type::without_key: return false;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: return IS_TWO_LEVEL;
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
}
__builtin_unreachable();
}
#define APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
2020-03-23 13:44:21 +00:00
M(keys32) \
M(keys64) \
M(keys128) \
M(keys256) \
M(serialized) \
M(nullable_keys128) \
M(nullable_keys256) \
M(low_cardinality_key32) \
M(low_cardinality_key64) \
M(low_cardinality_keys128) \
M(low_cardinality_keys256) \
M(low_cardinality_key_string) \
M(low_cardinality_key_fixed_string) \
/// NOLINTNEXTLINE
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
M(key8) \
M(key16) \
2020-03-23 13:44:21 +00:00
M(keys16) \
M(key64_hash64) \
M(key_string_hash64)\
M(key_fixed_string_hash64) \
M(keys128_hash64) \
M(keys256_hash64) \
M(serialized_hash64) \
M(low_cardinality_key8) \
M(low_cardinality_key16) \
/// NOLINTNEXTLINE
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
bool isConvertibleToTwoLevel() const
{
switch (type)
{
#define M(NAME) \
case Type::NAME: return true;
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
#undef M
default:
return false;
}
}
void convertToTwoLevel();
/// NOLINTNEXTLINE
#define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \
M(key32_two_level) \
M(key64_two_level) \
M(key_string_two_level) \
M(key_fixed_string_two_level) \
2020-03-23 13:44:21 +00:00
M(keys32_two_level) \
M(keys64_two_level) \
M(keys128_two_level) \
M(keys256_two_level) \
M(serialized_two_level) \
M(nullable_keys128_two_level) \
M(nullable_keys256_two_level) \
M(low_cardinality_key32_two_level) \
M(low_cardinality_key64_two_level) \
M(low_cardinality_keys128_two_level) \
M(low_cardinality_keys256_two_level) \
M(low_cardinality_key_string_two_level) \
M(low_cardinality_key_fixed_string_two_level) \
#define APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) \
M(low_cardinality_key8) \
M(low_cardinality_key16) \
M(low_cardinality_key32) \
M(low_cardinality_key64) \
M(low_cardinality_keys128) \
M(low_cardinality_keys256) \
M(low_cardinality_key_string) \
M(low_cardinality_key_fixed_string) \
M(low_cardinality_key32_two_level) \
M(low_cardinality_key64_two_level) \
M(low_cardinality_keys128_two_level) \
M(low_cardinality_keys256_two_level) \
M(low_cardinality_key_string_two_level) \
2021-01-22 09:13:22 +00:00
M(low_cardinality_key_fixed_string_two_level)
2021-01-22 09:13:22 +00:00
bool isLowCardinality() const
{
switch (type)
{
#define M(NAME) \
case Type::NAME: return true;
APPLY_FOR_LOW_CARDINALITY_VARIANTS(M)
#undef M
default:
return false;
}
}
2019-01-21 10:39:24 +00:00
static HashMethodContextPtr createCache(Type type, const HashMethodContext::Settings & settings)
{
switch (type)
{
case Type::without_key: return nullptr;
#define M(NAME, IS_TWO_LEVEL) \
case Type::NAME: \
{ \
using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \
using T ## NAME = typename TPtr ## NAME ::element_type; \
2019-01-21 10:39:24 +00:00
return T ## NAME ::State::createContext(settings); \
}
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
default:
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
}
2011-09-26 07:25:22 +00:00
};
using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants>;
2012-02-27 06:28:20 +00:00
class CompiledAggregateFunctionsHolder;
2021-10-08 17:21:19 +00:00
class NativeWriter;
2017-06-02 21:37:28 +00:00
/** How are "total" values calculated with WITH TOTALS?
2020-06-01 16:31:06 +00:00
* (For more details, see TotalsHavingTransform.)
2015-11-09 18:45:55 +00:00
*
2017-06-02 21:37:28 +00:00
* In the absence of group_by_overflow_mode = 'any', the data is aggregated as usual, but the states of the aggregate functions are not finalized.
* Later, the aggregate function states for all rows (passed through HAVING) are merged into one - this will be TOTALS.
2015-11-09 18:45:55 +00:00
*
2017-06-02 21:37:28 +00:00
* If there is group_by_overflow_mode = 'any', the data is aggregated as usual, except for the keys that did not fit in max_rows_to_group_by.
* For these keys, the data is aggregated into one additional row - see below under the names `overflow_row`, `overflows`...
* Later, the aggregate function states for all rows (passed through HAVING) are merged into one,
* also overflow_row is added or not added (depending on the totals_mode setting) also - this will be TOTALS.
2015-11-09 18:45:55 +00:00
*/
2011-09-19 01:42:16 +00:00
2017-06-02 21:37:28 +00:00
/** Aggregates the source of the blocks.
2011-09-19 01:42:16 +00:00
*/
2021-04-25 22:17:24 +00:00
class Aggregator final
2011-09-19 01:42:16 +00:00
{
public:
struct Params
{
/// Data structure of source blocks.
Block src_header;
/// Data structure of intermediate blocks before merge.
Block intermediate_header;
2017-06-02 21:37:28 +00:00
/// What to count.
2022-05-10 16:12:03 +00:00
const ColumnNumbers keys;
const AggregateDescriptions aggregates;
2022-05-05 18:13:00 +00:00
const size_t keys_size;
const size_t aggregates_size;
2017-06-02 21:37:28 +00:00
/// The settings of approximate calculation of GROUP BY.
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
const size_t max_rows_to_group_by;
const OverflowMode group_by_overflow_mode;
2017-06-02 21:37:28 +00:00
/// Two-level aggregation settings (used for a large number of keys).
/** With how many keys or the size of the aggregation state in bytes,
* two-level aggregation begins to be used. Enough to reach of at least one of the thresholds.
* 0 - the corresponding threshold is not specified.
*/
size_t group_by_two_level_threshold;
size_t group_by_two_level_threshold_bytes;
/// Settings to flush temporary data to the filesystem (external aggregation).
2017-06-02 21:37:28 +00:00
const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation.
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
2020-07-08 14:25:23 +00:00
VolumePtr tmp_volume;
2018-09-20 17:59:47 +00:00
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
const size_t min_free_disk_space;
2021-05-31 08:05:40 +00:00
bool compile_aggregate_expressions;
size_t min_count_to_compile_aggregate_expression;
struct StatsCollectingParams
{
StatsCollectingParams();
StatsCollectingParams(
const ASTPtr & select_query_,
bool collect_hash_table_stats_during_aggregation_,
size_t max_entries_for_hash_table_stats_,
size_t max_size_to_preallocate_for_aggregation_);
bool isCollectionAndUseEnabled() const { return key != 0; }
void disable() { key = 0; }
UInt64 key = 0;
const size_t max_entries_for_hash_table_stats = 0;
const size_t max_size_to_preallocate_for_aggregation = 0;
};
StatsCollectingParams stats_collecting_params;
2021-05-13 14:51:07 +00:00
Params(
const Block & src_header_,
2021-05-16 19:44:20 +00:00
const ColumnNumbers & keys_,
2021-12-14 19:15:14 +00:00
const AggregateDescriptions & aggregates_,
bool overflow_row_,
size_t max_rows_to_group_by_,
OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_,
size_t group_by_two_level_threshold_bytes_,
2021-05-13 14:51:07 +00:00
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
2021-12-14 19:15:14 +00:00
VolumePtr tmp_volume_,
size_t max_threads_,
2021-05-15 16:41:22 +00:00
size_t min_free_disk_space_,
bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_,
const Block & intermediate_header_ = {},
2022-05-10 16:00:00 +00:00
const StatsCollectingParams & stats_collecting_params_ = {})
2021-12-14 19:15:14 +00:00
: src_header(src_header_)
, intermediate_header(intermediate_header_)
, keys(keys_)
, aggregates(aggregates_)
, keys_size(keys.size())
, aggregates_size(aggregates.size())
, overflow_row(overflow_row_)
, max_rows_to_group_by(max_rows_to_group_by_)
, group_by_overflow_mode(group_by_overflow_mode_)
, group_by_two_level_threshold(group_by_two_level_threshold_)
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
, max_bytes_before_external_group_by(max_bytes_before_external_group_by_)
, empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_)
, tmp_volume(tmp_volume_)
, max_threads(max_threads_)
, min_free_disk_space(min_free_disk_space_)
, compile_aggregate_expressions(compile_aggregate_expressions_)
, min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
, stats_collecting_params(stats_collecting_params_)
{
}
2021-05-13 14:51:07 +00:00
2017-06-02 21:37:28 +00:00
/// Only parameters that matter during merge.
2022-05-05 13:56:16 +00:00
Params(
const Block & intermediate_header_,
const ColumnNumbers & keys_,
const AggregateDescriptions & aggregates_,
bool overflow_row_,
size_t max_threads_)
2022-05-10 16:00:00 +00:00
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, {}, {})
{
intermediate_header = intermediate_header_;
}
static Block getHeader(
const Block & src_header,
const Block & intermediate_header,
const ColumnNumbers & keys,
const AggregateDescriptions & aggregates,
bool final);
Block getHeader(bool final) const
{
2022-05-10 16:00:00 +00:00
return getHeader(src_header, intermediate_header, keys, aggregates, final);
}
/// Returns keys and aggregated for EXPLAIN query
2020-07-07 19:51:32 +00:00
void explain(WriteBuffer & out, size_t indent) const;
void explain(JSONBuilder::JSONMap & map) const;
};
2021-04-25 22:17:24 +00:00
explicit Aggregator(const Params & params_);
using AggregateColumns = std::vector<ColumnRawPtrs>;
using AggregateColumnsData = std::vector<ColumnAggregateFunction::Container *>;
using AggregateColumnsConstData = std::vector<const ColumnAggregateFunction::Container *>;
2021-06-06 21:49:55 +00:00
using AggregateFunctionsPlainPtrs = std::vector<const IAggregateFunction *>;
2017-06-02 21:37:28 +00:00
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys) const;
bool executeOnBlock(Columns columns,
size_t row_begin, size_t row_end,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
bool & no_more_keys) const;
/// Used for aggregate projection.
bool mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const;
2019-08-31 08:58:16 +00:00
2017-06-02 21:37:28 +00:00
/** Convert the aggregation data structure into a block.
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
*
2017-06-02 21:37:28 +00:00
* If final = false, then ColumnAggregateFunction is created as the aggregation columns with the state of the calculations,
* which can then be combined with other states (for distributed query processing).
* If final = true, then columns with ready values are created as aggregate columns.
*/
BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
ManyAggregatedDataVariants prepareVariantsToMerge(ManyAggregatedDataVariants & data_variants) const;
2019-03-04 16:06:28 +00:00
using BucketToBlocks = std::map<Int32, BlocksList>;
/// Merge partially aggregated blocks separated to buckets into one data structure.
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);
/// Merge several partially aggregated blocks into one.
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
/// (either all blocks are from overflow data or none blocks are).
/// The resulting block has the same value of is_overflows flag.
Block mergeBlocks(BlocksList & blocks, bool final);
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block) const;
2017-06-02 21:37:28 +00:00
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const;
void writeToTemporaryFile(AggregatedDataVariants & data_variants) const;
bool hasTemporaryFiles() const { return !temporary_files.empty(); }
struct TemporaryFiles
{
std::vector<std::unique_ptr<Poco::TemporaryFile>> files;
size_t sum_size_uncompressed = 0;
size_t sum_size_compressed = 0;
mutable std::mutex mutex;
bool empty() const
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
return files.empty();
}
};
const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }
/// Get data structure of the result.
Block getHeader(bool final) const;
2021-04-25 22:17:24 +00:00
private:
friend struct AggregatedDataVariants;
friend class ConvertingAggregatedToChunksTransform;
friend class ConvertingAggregatedToChunksSource;
2020-04-18 09:51:21 +00:00
friend class AggregatingInOrderTransform;
Params params;
2018-08-27 17:42:13 +00:00
AggregatedDataVariants::Type method_chosen;
Sizes key_sizes;
2019-01-21 10:39:24 +00:00
HashMethodContextPtr aggregation_state_cache;
AggregateFunctionsPlainPtrs aggregate_functions;
2017-06-02 21:37:28 +00:00
/** This array serves two purposes.
*
2021-04-25 22:17:24 +00:00
* Function arguments are collected side by side, and they do not need to be collected from different places. Also the array is made zero-terminated.
2017-06-02 21:37:28 +00:00
* The inner loop (for the case without_key) is almost twice as compact; performance gain of about 30%.
*/
struct AggregateFunctionInstruction
{
2021-05-02 22:42:01 +00:00
const IAggregateFunction * that{};
size_t state_offset{};
const IColumn ** arguments{};
const IAggregateFunction * batch_that{};
const IColumn ** batch_arguments{};
const UInt64 * offsets{};
2021-03-12 16:33:41 +00:00
bool has_sparse_arguments = false;
};
using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>;
2020-05-15 19:27:18 +00:00
using NestedColumnsHolder = std::vector<std::vector<const IColumn *>>;
2017-06-02 21:37:28 +00:00
Sizes offsets_of_aggregate_states; /// The offset to the n-th aggregate function in a row of aggregate functions.
size_t total_size_of_aggregate_states = 0; /// The total size of the row from the aggregate functions.
2018-08-05 08:45:15 +00:00
2018-09-01 03:17:43 +00:00
// add info to track alignment requirement
2020-05-05 14:35:23 +00:00
// If there are states whose alignment are v1, ..vn, align_aggregate_states will be max(v1, ... vn)
2018-08-05 08:45:15 +00:00
size_t align_aggregate_states = 1;
bool all_aggregates_has_trivial_destructor = false;
2017-06-02 21:37:28 +00:00
/// How many RAM were used to process the query before processing the first block.
Int64 memory_usage_before_aggregation = 0;
2020-05-30 21:57:37 +00:00
Poco::Logger * log = &Poco::Logger::get("Aggregator");
2017-06-02 21:37:28 +00:00
/// For external aggregation.
mutable TemporaryFiles temporary_files;
2021-06-04 10:43:11 +00:00
#if USE_EMBEDDED_COMPILER
std::shared_ptr<CompiledAggregateFunctionsHolder> compiled_aggregate_functions_holder;
2021-06-04 10:43:11 +00:00
#endif
std::vector<bool> is_aggregate_function_compiled;
/** Try to compile aggregate functions.
*/
void compileAggregateFunctionsIfNeeded();
2017-06-02 21:37:28 +00:00
/** Select the aggregation method based on the number and types of keys. */
AggregatedDataVariants::Type chooseAggregationMethod();
2017-06-02 21:37:28 +00:00
/** Create states of aggregate functions for one key.
*/
template <bool skip_compiled_aggregate_functions = false>
void createAggregateStates(AggregateDataPtr & aggregate_data) const;
2017-06-02 21:37:28 +00:00
/** Call `destroy` methods for states of aggregate functions.
* Used in the exception handler for aggregation, since RAII in this case is not applicable.
*/
void destroyAllAggregateStates(AggregatedDataVariants & result) const;
/// Used for optimize_aggregation_in_order:
/// - No two-level aggregation
/// - No external aggregation
/// - No without_key support (it is implemented using executeOnIntervalWithoutKeyImpl())
void executeOnBlockSmall(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions) const;
void executeImpl(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys = false,
AggregateDataPtr overflow_row = nullptr) const;
2017-06-02 21:37:28 +00:00
/// Process one data block, aggregate the data into a hash table.
template <typename Method>
void executeImpl(
Method & method,
Arena * aggregates_pool,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
AggregateDataPtr overflow_row) const;
2017-06-02 21:37:28 +00:00
/// Specialization for a particular value no_more_keys.
template <bool no_more_keys, bool use_compiled_functions, typename Method>
2021-03-18 09:31:14 +00:00
void executeImplBatch(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row) const;
2017-06-02 21:37:28 +00:00
/// For case when there are no keys (all aggregate into one row).
template <bool use_compiled_functions>
void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
Fix memory leak in AggregatingInOrderTransform Reproducer: # NOTE: we need clickhouse from 33957 since right now LSan is broken due to getauxval(). $ url=https://s3.amazonaws.com/clickhouse-builds/33957/e04b862673644d313712607a0078f5d1c48b5377/package_asan/clickhouse $ wget $url -o clickhouse-asan $ chmod +x clickhouse-asan $ ./clickhouse-asan server & $ ./clickhouse-asan client :) create table data (key Int, value String) engine=MergeTree() order by key :) insert into data select number%5, toString(number) from numbers(10e6) # usually it is enough one query, benchmark is just for stability of the results # note, that if the exception was not happen from AggregatingInOrderTransform then add --continue_on_errors and wait $ ./clickhouse-asan benchmark --query 'select key, uniqCombined64(value), groupArray(value) from data group by key' --optimize_aggregation_in_order=1 --memory_tracker_fault_probability=0.01, max_untracked_memory='2Mi' LSan report: ==24595==ERROR: LeakSanitizer: detected memory leaks Direct leak of 3932160 byte(s) in 6 object(s) allocated from: 0 0xcadba93 in realloc () 1 0xcc108d9 in Allocator<false, false>::realloc() obj-x86_64-linux-gnu/../src/Common/Allocator.h:134:30 2 0xde19eae in void DB::PODArrayBase<>::realloc<DB::Arena*&>(unsigned long, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:161:25 3 0xde5f039 in void DB::PODArrayBase<>::reserveForNextSize<DB::Arena*&>(DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h 4 0xde5f039 in void DB::PODArray<>::push_back<>(DB::GroupArrayNodeString*&, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:432:19 5 0xde5f039 in DB::GroupArrayGeneralImpl<>::add() const obj-x86_64-linux-gnu/../src/AggregateFunctions/AggregateFunctionGroupArray.h:465:31 6 0xde5f039 in DB::IAggregateFunctionHelper<>::addBatchSinglePlaceFromInterval() const obj-x86_64-linux-gnu/../src/AggregateFunctions/IAggregateFunction.h:481:53 7 0x299df134 in DB::Aggregator::executeOnIntervalWithoutKeyImpl() obj-x86_64-linux-gnu/../src/Interpreters/Aggregator.cpp:869:31 8 0x2ca75f7d in DB::AggregatingInOrderTransform::consume() obj-x86_64-linux-gnu/../src/Processors/Transforms/AggregatingInOrderTransform.cpp:124:13 ... SUMMARY: AddressSanitizer: 4523184 byte(s) leaked in 12 allocation(s). Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-01 15:40:48 +00:00
void executeOnIntervalWithoutKeyImpl(
AggregatedDataVariants & data_variants,
2020-04-18 09:51:21 +00:00
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Fix memory leak in AggregatingInOrderTransform Reproducer: # NOTE: we need clickhouse from 33957 since right now LSan is broken due to getauxval(). $ url=https://s3.amazonaws.com/clickhouse-builds/33957/e04b862673644d313712607a0078f5d1c48b5377/package_asan/clickhouse $ wget $url -o clickhouse-asan $ chmod +x clickhouse-asan $ ./clickhouse-asan server & $ ./clickhouse-asan client :) create table data (key Int, value String) engine=MergeTree() order by key :) insert into data select number%5, toString(number) from numbers(10e6) # usually it is enough one query, benchmark is just for stability of the results # note, that if the exception was not happen from AggregatingInOrderTransform then add --continue_on_errors and wait $ ./clickhouse-asan benchmark --query 'select key, uniqCombined64(value), groupArray(value) from data group by key' --optimize_aggregation_in_order=1 --memory_tracker_fault_probability=0.01, max_untracked_memory='2Mi' LSan report: ==24595==ERROR: LeakSanitizer: detected memory leaks Direct leak of 3932160 byte(s) in 6 object(s) allocated from: 0 0xcadba93 in realloc () 1 0xcc108d9 in Allocator<false, false>::realloc() obj-x86_64-linux-gnu/../src/Common/Allocator.h:134:30 2 0xde19eae in void DB::PODArrayBase<>::realloc<DB::Arena*&>(unsigned long, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:161:25 3 0xde5f039 in void DB::PODArrayBase<>::reserveForNextSize<DB::Arena*&>(DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h 4 0xde5f039 in void DB::PODArray<>::push_back<>(DB::GroupArrayNodeString*&, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:432:19 5 0xde5f039 in DB::GroupArrayGeneralImpl<>::add() const obj-x86_64-linux-gnu/../src/AggregateFunctions/AggregateFunctionGroupArray.h:465:31 6 0xde5f039 in DB::IAggregateFunctionHelper<>::addBatchSinglePlaceFromInterval() const obj-x86_64-linux-gnu/../src/AggregateFunctions/IAggregateFunction.h:481:53 7 0x299df134 in DB::Aggregator::executeOnIntervalWithoutKeyImpl() obj-x86_64-linux-gnu/../src/Interpreters/Aggregator.cpp:869:31 8 0x2ca75f7d in DB::AggregatingInOrderTransform::consume() obj-x86_64-linux-gnu/../src/Processors/Transforms/AggregatingInOrderTransform.cpp:124:13 ... SUMMARY: AddressSanitizer: 4523184 byte(s) leaked in 12 allocation(s). Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-01 15:40:48 +00:00
Arena * arena) const;
2020-04-18 09:51:21 +00:00
template <typename Method>
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
2021-10-08 17:21:19 +00:00
NativeWriter & out) const;
/// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table>
void mergeDataNullKey(
Table & table_dst,
Table & table_src,
Arena * arena) const;
2017-06-02 21:37:28 +00:00
/// Merge data from hash table `src` into `dst`.
template <typename Method, bool use_compiled_functions, typename Table>
void mergeDataImpl(
Table & table_dst,
Table & table_src,
Arena * arena) const;
2017-06-02 21:37:28 +00:00
/// Merge data from hash table `src` into `dst`, but only for keys that already exist in dst. In other cases, merge the data into `overflows`.
template <typename Method, typename Table>
void mergeDataNoMoreKeysImpl(
Table & table_dst,
AggregatedDataWithoutKey & overflows,
Table & table_src,
Arena * arena) const;
2017-06-02 21:37:28 +00:00
/// Same, but ignores the rest of the keys.
template <typename Method, typename Table>
void mergeDataOnlyExistingKeysImpl(
Table & table_dst,
Table & table_src,
Arena * arena) const;
void mergeWithoutKeyDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
template <typename Method>
void mergeSingleLevelDataImpl(
ManyAggregatedDataVariants & non_empty_data) const;
template <typename Method, typename Table>
void convertToBlockImpl(
Method & method,
Table & data,
MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns,
2020-06-17 19:36:27 +00:00
Arena * arena,
bool final) const;
template <typename Mapped>
void insertAggregatesIntoColumns(
Mapped & mapped,
2020-06-17 19:36:27 +00:00
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <typename Method, bool use_compiled_functions, typename Table>
void convertToBlockImplFinal(
Method & method,
Table & data,
2021-03-10 11:00:24 +00:00
std::vector<IColumn *> key_columns,
2020-06-17 19:36:27 +00:00
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <typename Method, typename Table>
void convertToBlockImplNotFinal(
Method & method,
Table & data,
2021-03-10 11:00:24 +00:00
std::vector<IColumn *> key_columns,
2018-08-27 18:05:28 +00:00
AggregateColumnsData & aggregate_columns) const;
template <typename Filler>
Block prepareBlockAndFill(
AggregatedDataVariants & data_variants,
bool final,
size_t rows,
Filler && filler) const;
template <typename Method>
Block convertOneBucketToBlock(
AggregatedDataVariants & data_variants,
Method & method,
Arena * arena,
bool final,
size_t bucket) const;
Block mergeAndConvertOneBucketToBlock(
ManyAggregatedDataVariants & variants,
Arena * arena,
bool final,
2020-01-10 20:24:59 +00:00
size_t bucket,
std::atomic<bool> * is_cancelled = nullptr) const;
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
template <typename Method>
BlocksList prepareBlocksAndFillTwoLevelImpl(
AggregatedDataVariants & data_variants,
Method & method,
bool final,
ThreadPool * thread_pool) const;
template <bool no_more_keys, typename Method, typename Table>
void mergeStreamsImplCase(
Block & block,
Arena * aggregates_pool,
Method & method,
Table & data,
AggregateDataPtr overflow_row) const;
template <typename Method, typename Table>
void mergeStreamsImpl(
Block & block,
Arena * aggregates_pool,
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const;
void mergeWithoutKeyStreamsImpl(
Block & block,
AggregatedDataVariants & result) const;
template <typename Method>
void mergeBucketImpl(
2020-01-10 20:24:59 +00:00
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena, std::atomic<bool> * is_cancelled = nullptr) const;
template <typename Method>
void convertBlockToTwoLevelImpl(
Method & method,
Arena * pool,
ColumnRawPtrs & key_columns,
const Block & source,
std::vector<Block> & destinations) const;
template <typename Method, typename Table>
2017-12-01 20:21:35 +00:00
void destroyImpl(Table & table) const;
void destroyWithoutKey(
AggregatedDataVariants & result) const;
2017-06-02 21:37:28 +00:00
/** Checks constraints on the maximum number of keys for aggregation.
* If it is exceeded, then, depending on the group_by_overflow_mode, either
* - throws an exception;
* - returns false, which means that execution must be aborted;
* - sets the variable no_more_keys to true.
*/
bool checkLimits(size_t result_size, bool & no_more_keys) const;
2020-04-18 09:51:21 +00:00
2020-05-07 20:13:51 +00:00
void prepareAggregateInstructions(
2020-05-07 14:54:15 +00:00
Columns columns,
AggregateColumns & aggregate_columns,
Columns & materialized_columns,
2020-05-15 19:27:18 +00:00
AggregateFunctionInstructions & instructions,
NestedColumnsHolder & nested_columns_holder) const;
2020-05-07 14:54:15 +00:00
2021-04-01 22:16:54 +00:00
void addSingleKeyToAggregateColumns(
Fix memory leak in AggregatingInOrderTransform Reproducer: # NOTE: we need clickhouse from 33957 since right now LSan is broken due to getauxval(). $ url=https://s3.amazonaws.com/clickhouse-builds/33957/e04b862673644d313712607a0078f5d1c48b5377/package_asan/clickhouse $ wget $url -o clickhouse-asan $ chmod +x clickhouse-asan $ ./clickhouse-asan server & $ ./clickhouse-asan client :) create table data (key Int, value String) engine=MergeTree() order by key :) insert into data select number%5, toString(number) from numbers(10e6) # usually it is enough one query, benchmark is just for stability of the results # note, that if the exception was not happen from AggregatingInOrderTransform then add --continue_on_errors and wait $ ./clickhouse-asan benchmark --query 'select key, uniqCombined64(value), groupArray(value) from data group by key' --optimize_aggregation_in_order=1 --memory_tracker_fault_probability=0.01, max_untracked_memory='2Mi' LSan report: ==24595==ERROR: LeakSanitizer: detected memory leaks Direct leak of 3932160 byte(s) in 6 object(s) allocated from: 0 0xcadba93 in realloc () 1 0xcc108d9 in Allocator<false, false>::realloc() obj-x86_64-linux-gnu/../src/Common/Allocator.h:134:30 2 0xde19eae in void DB::PODArrayBase<>::realloc<DB::Arena*&>(unsigned long, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:161:25 3 0xde5f039 in void DB::PODArrayBase<>::reserveForNextSize<DB::Arena*&>(DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h 4 0xde5f039 in void DB::PODArray<>::push_back<>(DB::GroupArrayNodeString*&, DB::Arena*&) obj-x86_64-linux-gnu/../src/Common/PODArray.h:432:19 5 0xde5f039 in DB::GroupArrayGeneralImpl<>::add() const obj-x86_64-linux-gnu/../src/AggregateFunctions/AggregateFunctionGroupArray.h:465:31 6 0xde5f039 in DB::IAggregateFunctionHelper<>::addBatchSinglePlaceFromInterval() const obj-x86_64-linux-gnu/../src/AggregateFunctions/IAggregateFunction.h:481:53 7 0x299df134 in DB::Aggregator::executeOnIntervalWithoutKeyImpl() obj-x86_64-linux-gnu/../src/Interpreters/Aggregator.cpp:869:31 8 0x2ca75f7d in DB::AggregatingInOrderTransform::consume() obj-x86_64-linux-gnu/../src/Processors/Transforms/AggregatingInOrderTransform.cpp:124:13 ... SUMMARY: AddressSanitizer: 4523184 byte(s) leaked in 12 allocation(s). Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-01 15:40:48 +00:00
AggregatedDataVariants & data_variants,
2021-04-02 12:10:49 +00:00
MutableColumns & aggregate_columns) const;
2021-03-18 20:17:09 +00:00
2021-03-22 22:12:14 +00:00
void addArenasToAggregateColumns(
const AggregatedDataVariants & data_variants,
2021-04-02 12:10:49 +00:00
MutableColumns & aggregate_columns) const;
2020-04-18 09:51:21 +00:00
void createStatesAndFillKeyColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
2020-05-08 13:28:18 +00:00
Columns & key_columns, size_t key_row,
2021-04-02 12:10:49 +00:00
MutableColumns & final_key_columns) const;
static bool hasSparseArguments(AggregateFunctionInstruction * aggregate_instructions);
2011-09-19 01:42:16 +00:00
};
2017-06-02 21:37:28 +00:00
/** Get the aggregation variant by its type. */
template <typename Method> Method & getDataVariant(AggregatedDataVariants & variants);
#define M(NAME, IS_TWO_LEVEL) \
template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant<decltype(AggregatedDataVariants::NAME)::element_type>(AggregatedDataVariants & variants) { return *variants.NAME; } /// NOLINT
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
struct HashTablesCacheStatistics
{
size_t entries = 0;
size_t hits = 0;
size_t misses = 0;
};
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics();
2011-09-19 01:42:16 +00:00
}