2011-09-19 01:42:16 +00:00
|
|
|
#pragma once
|
|
|
|
|
2015-01-08 18:52:48 +00:00
|
|
|
#include <mutex>
|
2015-01-11 02:00:26 +00:00
|
|
|
#include <memory>
|
2015-04-16 14:27:56 +00:00
|
|
|
#include <functional>
|
2012-03-05 07:58:34 +00:00
|
|
|
|
2015-11-30 19:57:46 +00:00
|
|
|
#include <Poco/TemporaryFile.h>
|
|
|
|
|
2015-09-29 19:19:54 +00:00
|
|
|
#include <common/logger_useful.h>
|
2012-09-05 19:51:09 +00:00
|
|
|
|
2017-06-23 20:22:35 +00:00
|
|
|
#include <common/StringRef.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/Arena.h>
|
|
|
|
#include <Common/HashTable/HashMap.h>
|
|
|
|
#include <Common/HashTable/TwoLevelHashMap.h>
|
2017-06-23 20:22:35 +00:00
|
|
|
#include <common/ThreadPool.h>
|
2018-09-07 10:08:09 +00:00
|
|
|
#include <Common/UInt128.h>
|
2018-09-12 13:27:00 +00:00
|
|
|
#include <Common/LRUCache.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
|
|
|
|
#include <DataStreams/IBlockInputStream.h>
|
2018-03-11 00:15:26 +00:00
|
|
|
#include <DataStreams/SizeLimits.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
|
|
|
|
#include <Interpreters/AggregateDescription.h>
|
|
|
|
#include <Interpreters/AggregationCommon.h>
|
|
|
|
#include <Interpreters/Compiler.h>
|
|
|
|
|
|
|
|
#include <Columns/ColumnString.h>
|
|
|
|
#include <Columns/ColumnFixedString.h>
|
|
|
|
#include <Columns/ColumnAggregateFunction.h>
|
|
|
|
#include <Columns/ColumnVector.h>
|
|
|
|
#include <Columns/ColumnNullable.h>
|
2018-08-21 14:53:51 +00:00
|
|
|
#include <Columns/ColumnWithDictionary.h>
|
2014-05-10 00:31:22 +00:00
|
|
|
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2017-01-21 04:24:28 +00:00
|
|
|
class IBlockOutputStream;
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Different data structures that can be used for aggregation
|
|
|
|
* For efficiency, the aggregation data itself is put into the pool.
|
|
|
|
* Data and pool ownership (states of aggregate functions)
|
|
|
|
* is acquired later - in `convertToBlocks` function, by the ColumnAggregateFunction object.
|
2015-01-02 00:35:33 +00:00
|
|
|
*
|
2017-06-02 21:37:28 +00:00
|
|
|
* Most data structures exist in two versions: normal and two-level (TwoLevel).
|
|
|
|
* A two-level hash table works a little slower with a small number of different keys,
|
|
|
|
* but with a large number of different keys scales better, because it allows
|
|
|
|
* parallelize some operations (merging, post-processing) in a natural way.
|
2015-01-02 00:35:33 +00:00
|
|
|
*
|
2017-06-02 21:37:28 +00:00
|
|
|
* To ensure efficient work over a wide range of conditions,
|
|
|
|
* first single-level hash tables are used,
|
|
|
|
* and when the number of different keys is large enough,
|
|
|
|
* they are converted to two-level ones.
|
2015-01-02 00:35:33 +00:00
|
|
|
*
|
2017-06-02 21:37:28 +00:00
|
|
|
* PS. There are many different approaches to the effective implementation of parallel and distributed aggregation,
|
|
|
|
* best suited for different cases, and this approach is just one of them, chosen for a combination of reasons.
|
2011-12-19 08:06:31 +00:00
|
|
|
*/
|
2016-09-23 05:49:55 +00:00
|
|
|
|
2016-05-28 10:35:44 +00:00
|
|
|
using AggregatedDataWithoutKey = AggregateDataPtr;
|
2014-12-30 10:16:23 +00:00
|
|
|
|
2016-09-23 05:49:55 +00:00
|
|
|
using AggregatedDataWithUInt8Key = HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<8>>;
|
|
|
|
using AggregatedDataWithUInt16Key = HashMap<UInt64, AggregateDataPtr, TrivialHash, HashTableFixedGrower<16>>;
|
|
|
|
|
2016-05-28 10:35:44 +00:00
|
|
|
using AggregatedDataWithUInt64Key = HashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
|
|
|
|
using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDataPtr>;
|
|
|
|
using AggregatedDataWithKeys128 = HashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
|
|
|
|
using AggregatedDataWithKeys256 = HashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
|
2014-12-30 10:16:23 +00:00
|
|
|
|
2016-05-28 10:35:44 +00:00
|
|
|
using AggregatedDataWithUInt64KeyTwoLevel = TwoLevelHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>;
|
|
|
|
using AggregatedDataWithStringKeyTwoLevel = TwoLevelHashMapWithSavedHash<StringRef, AggregateDataPtr>;
|
|
|
|
using AggregatedDataWithKeys128TwoLevel = TwoLevelHashMap<UInt128, AggregateDataPtr, UInt128HashCRC32>;
|
|
|
|
using AggregatedDataWithKeys256TwoLevel = TwoLevelHashMap<UInt256, AggregateDataPtr, UInt256HashCRC32>;
|
2011-09-26 07:25:22 +00:00
|
|
|
|
2016-09-23 05:49:55 +00:00
|
|
|
/** Variants with better hash function, using more than 32 bits for hash.
|
|
|
|
* Using for merging phase of external aggregation, where number of keys may be far greater than 4 billion,
|
|
|
|
* but we keep in memory and merge only sub-partition of them simultaneously.
|
|
|
|
* TODO We need to switch for better hash function not only for external aggregation,
|
|
|
|
* but also for huge aggregation results on machines with terabytes of RAM.
|
|
|
|
*/
|
|
|
|
|
|
|
|
using AggregatedDataWithUInt64KeyHash64 = HashMap<UInt64, AggregateDataPtr, DefaultHash<UInt64>>;
|
|
|
|
using AggregatedDataWithStringKeyHash64 = HashMapWithSavedHash<StringRef, AggregateDataPtr, StringRefHash64>;
|
|
|
|
using AggregatedDataWithKeys128Hash64 = HashMap<UInt128, AggregateDataPtr, UInt128Hash>;
|
|
|
|
using AggregatedDataWithKeys256Hash64 = HashMap<UInt256, AggregateDataPtr, UInt256Hash>;
|
2014-10-29 01:18:50 +00:00
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
/// Cache which can be used by aggregations method's states. Object is shared in all threads.
|
2018-08-24 15:45:17 +00:00
|
|
|
struct AggregationStateCache
|
|
|
|
{
|
|
|
|
virtual ~AggregationStateCache() = default;
|
2018-09-12 13:27:00 +00:00
|
|
|
|
|
|
|
struct Settings
|
|
|
|
{
|
|
|
|
size_t max_threads;
|
|
|
|
};
|
2018-08-24 15:45:17 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
using AggregationStateCachePtr = std::shared_ptr<AggregationStateCache>;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// For the case where there is one numeric key.
|
|
|
|
template <typename FieldType, typename TData> /// UInt8/16/32/64 for any type with corresponding bit width.
|
2014-10-29 01:18:50 +00:00
|
|
|
struct AggregationMethodOneNumber
|
2014-05-10 00:31:22 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
using Data = TData;
|
|
|
|
using Key = typename Data::key_type;
|
|
|
|
using Mapped = typename Data::mapped_type;
|
|
|
|
using iterator = typename Data::iterator;
|
|
|
|
using const_iterator = typename Data::const_iterator;
|
|
|
|
|
|
|
|
Data data;
|
|
|
|
|
|
|
|
AggregationMethodOneNumber() {}
|
|
|
|
|
|
|
|
template <typename Other>
|
|
|
|
AggregationMethodOneNumber(const Other & other) : data(other.data) {}
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// To use one `Method` in different threads, use different `State`.
|
2017-04-01 07:20:54 +00:00
|
|
|
struct State
|
|
|
|
{
|
|
|
|
const FieldType * vec;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Called at the start of each block processing.
|
2017-12-17 05:21:04 +00:00
|
|
|
* Sets the variables needed for the other methods called in inner loops.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2017-12-13 01:27:53 +00:00
|
|
|
void init(ColumnRawPtrs & key_columns)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
vec = &static_cast<const ColumnVector<FieldType> *>(key_columns[0])->getData()[0];
|
|
|
|
}
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Get the key from the key columns for insertion into the hash table.
|
2018-09-03 00:00:56 +00:00
|
|
|
ALWAYS_INLINE Key getKey(
|
2017-12-13 01:27:53 +00:00
|
|
|
const ColumnRawPtrs & /*key_columns*/,
|
2017-12-01 19:34:51 +00:00
|
|
|
size_t /*keys_size*/, /// Number of key columns.
|
2017-06-02 21:37:28 +00:00
|
|
|
size_t i, /// From which row of the block, get the key.
|
2017-12-01 19:34:51 +00:00
|
|
|
const Sizes & /*key_sizes*/, /// If the keys of a fixed length - their lengths. It is not used in aggregation methods for variable length keys.
|
|
|
|
StringRefs & /*keys*/, /// Here references to key data in columns can be written. They can be used in the future.
|
|
|
|
Arena & /*pool*/) const
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
return unionCastToUInt64(vec[i]);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// From the value in the hash table, get AggregateDataPtr.
|
2017-04-01 07:20:54 +00:00
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Place additional data, if necessary, in case a new key was inserted into the hash table.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onNewKey(typename Data::value_type & /*value*/, size_t /*keys_size*/, StringRefs & /*keys*/, Arena & /*pool*/)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** The action to be taken if the key is not new. For example, roll back the memory allocation in the pool.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onExistingKey(const Key & /*key*/, StringRefs & /*keys*/, Arena & /*pool*/) {}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Do not use optimization for consecutive keys.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2018-08-23 13:22:03 +00:00
|
|
|
static const bool no_consecutive_keys_optimization = true;
|
|
|
|
/// Use optimization for low cardinality.
|
|
|
|
static constexpr bool low_cardinality_optimization = false;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Insert the key from the hash table into columns.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2017-12-15 03:47:43 +00:00
|
|
|
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t /*keys_size*/, const Sizes & /*key_sizes*/)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-12-15 03:47:43 +00:00
|
|
|
static_cast<ColumnVector<FieldType> *>(key_columns[0].get())->insertData(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2018-08-21 14:53:51 +00:00
|
|
|
|
|
|
|
static StringRef getRef(const typename Data::value_type & value)
|
|
|
|
{
|
|
|
|
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
|
|
|
|
}
|
2018-09-12 13:27:00 +00:00
|
|
|
|
|
|
|
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
2014-05-10 00:31:22 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// For the case where there is one string key.
|
2014-12-30 10:16:23 +00:00
|
|
|
template <typename TData>
|
2014-05-10 00:31:22 +00:00
|
|
|
struct AggregationMethodString
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
using Data = TData;
|
|
|
|
using Key = typename Data::key_type;
|
|
|
|
using Mapped = typename Data::mapped_type;
|
|
|
|
using iterator = typename Data::iterator;
|
|
|
|
using const_iterator = typename Data::const_iterator;
|
|
|
|
|
|
|
|
Data data;
|
|
|
|
|
|
|
|
AggregationMethodString() {}
|
|
|
|
|
|
|
|
template <typename Other>
|
|
|
|
AggregationMethodString(const Other & other) : data(other.data) {}
|
|
|
|
|
|
|
|
struct State
|
|
|
|
{
|
2017-12-15 21:32:25 +00:00
|
|
|
const ColumnString::Offsets * offsets;
|
2017-04-01 07:20:54 +00:00
|
|
|
const ColumnString::Chars_t * chars;
|
|
|
|
|
2017-12-13 01:27:53 +00:00
|
|
|
void init(ColumnRawPtrs & key_columns)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
const IColumn & column = *key_columns[0];
|
|
|
|
const ColumnString & column_string = static_cast<const ColumnString &>(column);
|
|
|
|
offsets = &column_string.getOffsets();
|
|
|
|
chars = &column_string.getChars();
|
|
|
|
}
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
ALWAYS_INLINE Key getKey(
|
2017-12-13 01:27:53 +00:00
|
|
|
const ColumnRawPtrs & /*key_columns*/,
|
2017-12-01 19:34:51 +00:00
|
|
|
size_t /*keys_size*/,
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t i,
|
2017-12-01 19:34:51 +00:00
|
|
|
const Sizes & /*key_sizes*/,
|
|
|
|
StringRefs & /*keys*/,
|
|
|
|
Arena & /*pool*/) const
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
return StringRef(
|
|
|
|
&(*chars)[i == 0 ? 0 : (*offsets)[i - 1]],
|
|
|
|
(i == 0 ? (*offsets)[i] : ((*offsets)[i] - (*offsets)[i - 1])) - 1);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onNewKey(typename Data::value_type & value, size_t /*keys_size*/, StringRefs & /*keys*/, Arena & pool)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
value.first.data = pool.insert(value.first.data, value.first.size);
|
|
|
|
}
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onExistingKey(const Key & /*key*/, StringRefs & /*keys*/, Arena & /*pool*/) {}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
static const bool no_consecutive_keys_optimization = false;
|
2018-08-23 13:22:03 +00:00
|
|
|
static constexpr bool low_cardinality_optimization = false;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-08-21 14:53:51 +00:00
|
|
|
static StringRef getRef(const typename Data::value_type & value)
|
|
|
|
{
|
|
|
|
return StringRef(value.first.data, value.first.size);
|
|
|
|
}
|
|
|
|
|
2017-12-15 03:47:43 +00:00
|
|
|
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
key_columns[0]->insertData(value.first.data, value.first.size);
|
|
|
|
}
|
2018-09-12 13:27:00 +00:00
|
|
|
|
|
|
|
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
2014-05-10 00:31:22 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// For the case where there is one fixed-length string key.
|
2014-12-30 10:16:23 +00:00
|
|
|
template <typename TData>
|
2014-05-10 00:31:22 +00:00
|
|
|
struct AggregationMethodFixedString
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
using Data = TData;
|
|
|
|
using Key = typename Data::key_type;
|
|
|
|
using Mapped = typename Data::mapped_type;
|
|
|
|
using iterator = typename Data::iterator;
|
|
|
|
using const_iterator = typename Data::const_iterator;
|
|
|
|
|
|
|
|
Data data;
|
|
|
|
|
|
|
|
AggregationMethodFixedString() {}
|
|
|
|
|
|
|
|
template <typename Other>
|
|
|
|
AggregationMethodFixedString(const Other & other) : data(other.data) {}
|
|
|
|
|
|
|
|
struct State
|
|
|
|
{
|
|
|
|
size_t n;
|
|
|
|
const ColumnFixedString::Chars_t * chars;
|
|
|
|
|
2017-12-13 01:27:53 +00:00
|
|
|
void init(ColumnRawPtrs & key_columns)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
const IColumn & column = *key_columns[0];
|
|
|
|
const ColumnFixedString & column_string = static_cast<const ColumnFixedString &>(column);
|
|
|
|
n = column_string.getN();
|
|
|
|
chars = &column_string.getChars();
|
|
|
|
}
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
ALWAYS_INLINE Key getKey(
|
2017-12-13 01:27:53 +00:00
|
|
|
const ColumnRawPtrs &,
|
2017-12-01 19:34:51 +00:00
|
|
|
size_t,
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t i,
|
2017-12-01 19:34:51 +00:00
|
|
|
const Sizes &,
|
|
|
|
StringRefs &,
|
|
|
|
Arena &) const
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
return StringRef(&(*chars)[i * n], n);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onNewKey(typename Data::value_type & value, size_t, StringRefs &, Arena & pool)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
value.first.data = pool.insert(value.first.data, value.first.size);
|
|
|
|
}
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
static const bool no_consecutive_keys_optimization = false;
|
2018-08-23 13:22:03 +00:00
|
|
|
static constexpr bool low_cardinality_optimization = false;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-08-21 14:53:51 +00:00
|
|
|
static StringRef getRef(const typename Data::value_type & value)
|
|
|
|
{
|
|
|
|
return StringRef(value.first.data, value.first.size);
|
|
|
|
}
|
|
|
|
|
2017-12-15 03:47:43 +00:00
|
|
|
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t, const Sizes &)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
key_columns[0]->insertData(value.first.data, value.first.size);
|
|
|
|
}
|
2018-09-12 13:27:00 +00:00
|
|
|
|
|
|
|
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
2014-05-10 00:31:22 +00:00
|
|
|
};
|
|
|
|
|
2018-09-13 17:05:39 +00:00
|
|
|
/// Cache stores dictionaries and saved_hash per dictionary key.
|
2018-09-12 13:27:00 +00:00
|
|
|
class LowCardinalityDictionaryCache : public AggregationStateCache
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
/// Will assume that dictionaries with same hash has the same keys.
|
|
|
|
/// Just in case, check that they have also the same size.
|
|
|
|
struct DictionaryKey
|
|
|
|
{
|
|
|
|
UInt128 hash;
|
|
|
|
UInt64 size;
|
|
|
|
|
|
|
|
bool operator== (const DictionaryKey & other) const { return hash == other.hash && size == other.size; }
|
|
|
|
};
|
|
|
|
|
|
|
|
struct DictionaryKeyHash
|
|
|
|
{
|
|
|
|
size_t operator()(const DictionaryKey & key) const
|
|
|
|
{
|
|
|
|
SipHash hash;
|
|
|
|
hash.update(key.hash.low);
|
|
|
|
hash.update(key.hash.high);
|
|
|
|
hash.update(key.size);
|
|
|
|
return hash.get64();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
struct CachedValues
|
|
|
|
{
|
|
|
|
/// Store ptr to dictionary to be sure it won't be deleted.
|
|
|
|
ColumnPtr dictionary_holder;
|
|
|
|
/// Hashes for dictionary keys.
|
|
|
|
const UInt64 * saved_hash = nullptr;
|
|
|
|
};
|
|
|
|
|
|
|
|
using CachedValuesPtr = std::shared_ptr<CachedValues>;
|
|
|
|
|
2018-09-13 17:05:39 +00:00
|
|
|
explicit LowCardinalityDictionaryCache(const AggregationStateCache::Settings & settings) : cache(settings.max_threads) {}
|
2018-09-12 13:27:00 +00:00
|
|
|
|
|
|
|
CachedValuesPtr get(const DictionaryKey & key) { return cache.get(key); }
|
|
|
|
void set(const DictionaryKey & key, const CachedValuesPtr & mapped) { cache.set(key, mapped); }
|
|
|
|
|
|
|
|
private:
|
|
|
|
using Cache = LRUCache<DictionaryKey, CachedValues, DictionaryKeyHash>;
|
|
|
|
Cache cache;
|
|
|
|
};
|
2018-08-21 14:53:51 +00:00
|
|
|
|
|
|
|
/// Single low cardinality column.
|
|
|
|
template <typename SingleColumnMethod>
|
|
|
|
struct AggregationMethodSingleLowCardinalityColumn : public SingleColumnMethod
|
|
|
|
{
|
|
|
|
using Base = SingleColumnMethod;
|
|
|
|
using BaseState = typename Base::State;
|
|
|
|
|
|
|
|
using Data = typename Base::Data;
|
|
|
|
using Key = typename Base::Key;
|
|
|
|
using Mapped = typename Base::Mapped;
|
|
|
|
using iterator = typename Base::iterator;
|
|
|
|
using const_iterator = typename Base::const_iterator;
|
|
|
|
|
2018-08-23 13:22:03 +00:00
|
|
|
using Base::data;
|
2018-08-21 14:53:51 +00:00
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & settings)
|
|
|
|
{
|
|
|
|
return std::make_shared<LowCardinalityDictionaryCache>(settings);
|
|
|
|
}
|
|
|
|
|
2018-08-21 14:53:51 +00:00
|
|
|
AggregationMethodSingleLowCardinalityColumn() = default;
|
|
|
|
|
|
|
|
template <typename Other>
|
|
|
|
explicit AggregationMethodSingleLowCardinalityColumn(const Other & other) : Base(other) {}
|
|
|
|
|
|
|
|
struct State : public BaseState
|
|
|
|
{
|
|
|
|
ColumnRawPtrs key;
|
2018-08-23 13:22:03 +00:00
|
|
|
const IColumn * positions = nullptr;
|
|
|
|
size_t size_of_index_type = 0;
|
2018-08-21 14:53:51 +00:00
|
|
|
|
2018-09-13 17:05:39 +00:00
|
|
|
/// saved hash is from current column or from cache.
|
2018-08-24 15:45:17 +00:00
|
|
|
const UInt64 * saved_hash = nullptr;
|
2018-09-13 17:05:39 +00:00
|
|
|
/// Hold dictionary in case saved_hash is from cache to be sure it won't be deleted.
|
2018-09-12 13:27:00 +00:00
|
|
|
ColumnPtr dictionary_holder;
|
|
|
|
|
2018-09-13 17:05:39 +00:00
|
|
|
/// Cache AggregateDataPtr for current column in order to decrease the number of hash table usages.
|
2018-08-24 15:45:17 +00:00
|
|
|
PaddedPODArray<AggregateDataPtr> aggregate_data;
|
|
|
|
PaddedPODArray<AggregateDataPtr> * aggregate_data_cache;
|
|
|
|
|
|
|
|
void init(ColumnRawPtrs &)
|
|
|
|
{
|
|
|
|
throw Exception("Expected cache for AggregationMethodSingleLowCardinalityColumn::init", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
void init(ColumnRawPtrs & key_columns, const AggregationStateCachePtr & cache_ptr)
|
2018-08-21 14:53:51 +00:00
|
|
|
{
|
2018-08-23 13:22:03 +00:00
|
|
|
auto column = typeid_cast<const ColumnWithDictionary *>(key_columns[0]);
|
2018-08-21 14:53:51 +00:00
|
|
|
if (!column)
|
|
|
|
throw Exception("Invalid aggregation key type for AggregationMethodSingleLowCardinalityColumn method. "
|
|
|
|
"Excepted LowCardinality, got " + key_columns[0]->getName(), ErrorCodes::LOGICAL_ERROR);
|
2018-08-24 15:45:17 +00:00
|
|
|
|
|
|
|
if (!cache_ptr)
|
2018-09-12 13:27:00 +00:00
|
|
|
throw Exception("Cache wasn't created for AggregationMethodSingleLowCardinalityColumn", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
auto cache = typeid_cast<LowCardinalityDictionaryCache *>(cache_ptr.get());
|
|
|
|
if (!cache)
|
|
|
|
{
|
|
|
|
const auto & cached_val = *cache_ptr;
|
|
|
|
throw Exception("Invalid type for AggregationMethodSingleLowCardinalityColumn cache: "
|
|
|
|
+ demangle(typeid(cached_val).name()), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
2018-08-24 15:45:17 +00:00
|
|
|
|
2018-09-07 10:08:09 +00:00
|
|
|
auto * dict = column->getDictionary().getNestedColumn().get();
|
|
|
|
key = {dict};
|
2018-08-24 15:45:17 +00:00
|
|
|
bool is_shared_dict = column->isSharedDictionary();
|
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
typename LowCardinalityDictionaryCache::DictionaryKey dictionary_key;
|
|
|
|
typename LowCardinalityDictionaryCache::CachedValuesPtr cached_values;
|
2018-08-24 15:45:17 +00:00
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
if (is_shared_dict)
|
2018-08-24 15:45:17 +00:00
|
|
|
{
|
2018-09-12 13:27:00 +00:00
|
|
|
dictionary_key = {column->getDictionary().getHash(), dict->size()};
|
|
|
|
cached_values = cache->get(dictionary_key);
|
2018-08-24 15:45:17 +00:00
|
|
|
}
|
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
if (cached_values)
|
|
|
|
{
|
|
|
|
saved_hash = cached_values->saved_hash;
|
|
|
|
dictionary_holder = cached_values->dictionary_holder;
|
|
|
|
}
|
|
|
|
else
|
2018-08-24 15:45:17 +00:00
|
|
|
{
|
|
|
|
saved_hash = column->getDictionary().tryGetSavedHash();
|
2018-09-12 13:27:00 +00:00
|
|
|
dictionary_holder = column->getDictionaryPtr();
|
2018-08-24 15:45:17 +00:00
|
|
|
|
|
|
|
if (is_shared_dict)
|
|
|
|
{
|
2018-09-12 13:27:00 +00:00
|
|
|
cached_values = std::make_shared<typename LowCardinalityDictionaryCache::CachedValues>();
|
|
|
|
cached_values->saved_hash = saved_hash;
|
|
|
|
cached_values->dictionary_holder = dictionary_holder;
|
|
|
|
|
|
|
|
cache->set(dictionary_key, cached_values);
|
2018-08-24 15:45:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
AggregateDataPtr default_data = nullptr;
|
|
|
|
aggregate_data.assign(key[0]->size(), default_data);
|
|
|
|
aggregate_data_cache = &aggregate_data;
|
|
|
|
|
2018-08-23 13:22:03 +00:00
|
|
|
size_of_index_type = column->getSizeOfIndexType();
|
2018-08-24 15:45:17 +00:00
|
|
|
positions = column->getIndexesPtr().get();
|
2018-08-21 14:53:51 +00:00
|
|
|
|
|
|
|
BaseState::init(key);
|
2018-08-23 13:22:03 +00:00
|
|
|
}
|
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
ALWAYS_INLINE size_t getIndexAt(size_t row) const
|
2018-08-23 13:22:03 +00:00
|
|
|
{
|
|
|
|
switch (size_of_index_type)
|
|
|
|
{
|
|
|
|
case sizeof(UInt8): return static_cast<const ColumnUInt8 *>(positions)->getElement(row);
|
|
|
|
case sizeof(UInt16): return static_cast<const ColumnUInt16 *>(positions)->getElement(row);
|
|
|
|
case sizeof(UInt32): return static_cast<const ColumnUInt32 *>(positions)->getElement(row);
|
|
|
|
case sizeof(UInt64): return static_cast<const ColumnUInt64 *>(positions)->getElement(row);
|
|
|
|
default: throw Exception("Unexpected size of index type for low cardinality column.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
2018-08-21 14:53:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Get the key from the key columns for insertion into the hash table.
|
2018-09-12 13:27:00 +00:00
|
|
|
ALWAYS_INLINE Key getKey(
|
2018-08-23 13:22:03 +00:00
|
|
|
const ColumnRawPtrs & /*key_columns*/,
|
|
|
|
size_t /*keys_size*/,
|
|
|
|
size_t i,
|
|
|
|
const Sizes & key_sizes,
|
|
|
|
StringRefs & keys,
|
|
|
|
Arena & pool) const
|
2018-08-21 14:53:51 +00:00
|
|
|
{
|
2018-08-23 13:22:03 +00:00
|
|
|
size_t row = getIndexAt(i);
|
2018-08-21 14:53:51 +00:00
|
|
|
return BaseState::getKey(key, 1, row, key_sizes, keys, pool);
|
|
|
|
}
|
2018-08-23 13:22:03 +00:00
|
|
|
|
|
|
|
template <typename D>
|
2018-09-12 13:27:00 +00:00
|
|
|
ALWAYS_INLINE AggregateDataPtr * emplaceKeyFromRow(
|
2018-08-23 13:22:03 +00:00
|
|
|
D & data,
|
|
|
|
size_t i,
|
|
|
|
bool & inserted,
|
|
|
|
size_t keys_size,
|
|
|
|
StringRefs & keys,
|
|
|
|
Arena & pool)
|
|
|
|
{
|
|
|
|
size_t row = getIndexAt(i);
|
2018-08-24 15:45:17 +00:00
|
|
|
if ((*aggregate_data_cache)[row])
|
2018-08-23 13:22:03 +00:00
|
|
|
{
|
|
|
|
inserted = false;
|
2018-08-24 15:45:17 +00:00
|
|
|
return &(*aggregate_data_cache)[row];
|
2018-08-23 13:22:03 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
ColumnRawPtrs key_columns;
|
|
|
|
Sizes key_sizes;
|
|
|
|
auto key = getKey(key_columns, 0, i, key_sizes, keys, pool);
|
|
|
|
|
|
|
|
typename D::iterator it;
|
|
|
|
if (saved_hash)
|
|
|
|
data.emplace(key, it, inserted, saved_hash[row]);
|
|
|
|
else
|
|
|
|
data.emplace(key, it, inserted);
|
|
|
|
|
|
|
|
if (inserted)
|
|
|
|
Base::onNewKey(*it, keys_size, keys, pool);
|
|
|
|
else
|
2018-08-24 15:45:17 +00:00
|
|
|
(*aggregate_data_cache)[row] = Base::getAggregateData(it->second);
|
2018-08-23 13:22:03 +00:00
|
|
|
|
|
|
|
return &Base::getAggregateData(it->second);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
ALWAYS_INLINE void cacheAggregateData(size_t i, AggregateDataPtr data)
|
2018-08-23 13:22:03 +00:00
|
|
|
{
|
|
|
|
size_t row = getIndexAt(i);
|
2018-08-24 15:45:17 +00:00
|
|
|
(*aggregate_data_cache)[row] = data;
|
2018-08-23 13:22:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
template <typename D>
|
2018-09-12 13:27:00 +00:00
|
|
|
ALWAYS_INLINE AggregateDataPtr * findFromRow(D & data, size_t i)
|
2018-08-23 13:22:03 +00:00
|
|
|
{
|
|
|
|
size_t row = getIndexAt(i);
|
2018-08-24 15:45:17 +00:00
|
|
|
if (!(*aggregate_data_cache)[row])
|
2018-08-23 13:22:03 +00:00
|
|
|
{
|
|
|
|
ColumnRawPtrs key_columns;
|
|
|
|
Sizes key_sizes;
|
|
|
|
StringRefs keys;
|
|
|
|
Arena pool;
|
|
|
|
auto key = getKey(key_columns, 0, i, key_sizes, keys, pool);
|
|
|
|
|
|
|
|
typename D::iterator it;
|
|
|
|
if (saved_hash)
|
|
|
|
it = data.find(key, saved_hash[row]);
|
|
|
|
else
|
|
|
|
it = data.find(key);
|
|
|
|
|
|
|
|
if (it != data.end())
|
2018-08-24 15:45:17 +00:00
|
|
|
(*aggregate_data_cache)[row] = Base::getAggregateData(it->second);
|
2018-08-23 13:22:03 +00:00
|
|
|
}
|
2018-08-24 15:45:17 +00:00
|
|
|
return &(*aggregate_data_cache)[row];
|
2018-08-23 13:22:03 +00:00
|
|
|
}
|
2018-08-21 14:53:51 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return Base::getAggregateData(value); }
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return Base::getAggregateData(value); }
|
|
|
|
|
|
|
|
static void onNewKey(typename Data::value_type & value, size_t keys_size, StringRefs & keys, Arena & pool)
|
|
|
|
{
|
|
|
|
return Base::onNewKey(value, keys_size, keys, pool);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void onExistingKey(const Key & key, StringRefs & keys, Arena & pool)
|
|
|
|
{
|
|
|
|
return Base::onExistingKey(key, keys, pool);
|
|
|
|
}
|
|
|
|
|
2018-08-23 13:22:03 +00:00
|
|
|
static const bool no_consecutive_keys_optimization = true;
|
|
|
|
static const bool low_cardinality_optimization = true;
|
2018-08-21 14:53:51 +00:00
|
|
|
|
|
|
|
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t /*keys_size*/, const Sizes & /*key_sizes*/)
|
|
|
|
{
|
|
|
|
auto ref = Base::getRef(value);
|
|
|
|
static_cast<ColumnWithDictionary *>(key_columns[0].get())->insertData(ref.data, ref.size);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
|
2016-10-21 16:50:41 +00:00
|
|
|
namespace aggregator_impl
|
|
|
|
{
|
|
|
|
|
2016-10-18 10:09:48 +00:00
|
|
|
/// This class is designed to provide the functionality that is required for
|
|
|
|
/// supporting nullable keys in AggregationMethodKeysFixed. If there are
|
|
|
|
/// no nullable keys, this class is merely implemented as an empty shell.
|
|
|
|
template <typename Key, bool has_nullable_keys>
|
|
|
|
class BaseStateKeysFixed;
|
|
|
|
|
|
|
|
/// Case where nullable keys are supported.
|
|
|
|
template <typename Key>
|
|
|
|
class BaseStateKeysFixed<Key, true>
|
|
|
|
{
|
|
|
|
protected:
|
2017-12-13 01:27:53 +00:00
|
|
|
void init(const ColumnRawPtrs & key_columns)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
null_maps.reserve(key_columns.size());
|
|
|
|
actual_columns.reserve(key_columns.size());
|
|
|
|
|
|
|
|
for (const auto & col : key_columns)
|
|
|
|
{
|
2017-12-09 10:14:45 +00:00
|
|
|
if (col->isColumnNullable())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
const auto & nullable_col = static_cast<const ColumnNullable &>(*col);
|
2017-12-14 03:56:56 +00:00
|
|
|
actual_columns.push_back(&nullable_col.getNestedColumn());
|
|
|
|
null_maps.push_back(&nullable_col.getNullMapColumn());
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
actual_columns.push_back(col);
|
|
|
|
null_maps.push_back(nullptr);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Return the columns which actually contain the values of the keys.
|
|
|
|
/// For a given key column, if it is nullable, we return its nested
|
|
|
|
/// column. Otherwise we return the key column itself.
|
2017-12-13 01:27:53 +00:00
|
|
|
inline const ColumnRawPtrs & getActualColumns() const
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
return actual_columns;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a bitmap that indicates whether, for a particular row,
|
|
|
|
/// a key column bears a null value or not.
|
|
|
|
KeysNullMap<Key> createBitmap(size_t row) const
|
|
|
|
{
|
|
|
|
KeysNullMap<Key> bitmap{};
|
|
|
|
|
|
|
|
for (size_t k = 0; k < null_maps.size(); ++k)
|
|
|
|
{
|
|
|
|
if (null_maps[k] != nullptr)
|
|
|
|
{
|
|
|
|
const auto & null_map = static_cast<const ColumnUInt8 &>(*null_maps[k]).getData();
|
|
|
|
if (null_map[row] == 1)
|
|
|
|
{
|
|
|
|
size_t bucket = k / 8;
|
|
|
|
size_t offset = k % 8;
|
|
|
|
bitmap[bucket] |= UInt8(1) << offset;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return bitmap;
|
|
|
|
}
|
2016-10-18 10:09:48 +00:00
|
|
|
|
|
|
|
private:
|
2017-12-13 01:27:53 +00:00
|
|
|
ColumnRawPtrs actual_columns;
|
|
|
|
ColumnRawPtrs null_maps;
|
2016-10-18 10:09:48 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
/// Case where nullable keys are not supported.
|
|
|
|
template <typename Key>
|
|
|
|
class BaseStateKeysFixed<Key, false>
|
|
|
|
{
|
|
|
|
protected:
|
2017-12-13 01:27:53 +00:00
|
|
|
void init(const ColumnRawPtrs &)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
throw Exception{"Internal error: calling init() for non-nullable"
|
|
|
|
" keys is forbidden", ErrorCodes::LOGICAL_ERROR};
|
|
|
|
}
|
|
|
|
|
2017-12-13 01:27:53 +00:00
|
|
|
const ColumnRawPtrs & getActualColumns() const
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
throw Exception{"Internal error: calling getActualColumns() for non-nullable"
|
|
|
|
" keys is forbidden", ErrorCodes::LOGICAL_ERROR};
|
|
|
|
}
|
|
|
|
|
2017-12-01 19:34:51 +00:00
|
|
|
KeysNullMap<Key> createBitmap(size_t) const
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
throw Exception{"Internal error: calling createBitmap() for non-nullable keys"
|
|
|
|
" is forbidden", ErrorCodes::LOGICAL_ERROR};
|
|
|
|
}
|
2016-10-18 10:09:48 +00:00
|
|
|
};
|
2014-05-10 00:31:22 +00:00
|
|
|
|
2016-10-21 16:50:41 +00:00
|
|
|
}
|
|
|
|
|
2018-09-14 13:02:03 +00:00
|
|
|
// Oprional mask for low cardinality columns.
|
|
|
|
template <bool has_low_cardinality>
|
|
|
|
struct LowCardinalityKeys
|
|
|
|
{
|
|
|
|
ColumnRawPtrs nested_columns;
|
|
|
|
ColumnRawPtrs positions;
|
|
|
|
Sizes position_sizes;
|
|
|
|
};
|
|
|
|
|
|
|
|
template <>
|
|
|
|
struct LowCardinalityKeys<false> {};
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// For the case where all keys are of fixed length, and they fit in N (for example, 128) bits.
|
2018-09-14 13:02:03 +00:00
|
|
|
template <typename TData, bool has_nullable_keys_ = false, bool has_low_cardinality_ = false>
|
2015-02-22 05:53:16 +00:00
|
|
|
struct AggregationMethodKeysFixed
|
2014-05-10 00:31:22 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
using Data = TData;
|
|
|
|
using Key = typename Data::key_type;
|
|
|
|
using Mapped = typename Data::mapped_type;
|
|
|
|
using iterator = typename Data::iterator;
|
|
|
|
using const_iterator = typename Data::const_iterator;
|
|
|
|
static constexpr bool has_nullable_keys = has_nullable_keys_;
|
2018-09-14 13:02:03 +00:00
|
|
|
static constexpr bool has_low_cardinality = has_low_cardinality_;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
Data data;
|
|
|
|
|
|
|
|
AggregationMethodKeysFixed() {}
|
|
|
|
|
|
|
|
template <typename Other>
|
|
|
|
AggregationMethodKeysFixed(const Other & other) : data(other.data) {}
|
|
|
|
|
|
|
|
class State final : private aggregator_impl::BaseStateKeysFixed<Key, has_nullable_keys>
|
|
|
|
{
|
2018-09-14 13:02:03 +00:00
|
|
|
LowCardinalityKeys<has_low_cardinality> low_cardinality_keys;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
public:
|
|
|
|
using Base = aggregator_impl::BaseStateKeysFixed<Key, has_nullable_keys>;
|
|
|
|
|
2017-12-13 01:27:53 +00:00
|
|
|
void init(ColumnRawPtrs & key_columns)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-09-14 13:02:03 +00:00
|
|
|
if constexpr (has_low_cardinality)
|
|
|
|
{
|
|
|
|
low_cardinality_keys.nested_columns.resize(key_columns.size());
|
|
|
|
low_cardinality_keys.positions.assign(key_columns.size(), nullptr);
|
|
|
|
low_cardinality_keys.position_sizes.resize(key_columns.size());
|
|
|
|
for (size_t i = 0; i < key_columns.size(); ++i)
|
|
|
|
{
|
|
|
|
if (auto * low_cardinality_col = typeid_cast<const ColumnWithDictionary *>(key_columns[i]))
|
|
|
|
{
|
|
|
|
low_cardinality_keys.nested_columns[i] = low_cardinality_col->getDictionary().getNestedColumn().get();
|
|
|
|
low_cardinality_keys.positions[i] = &low_cardinality_col->getIndexes();
|
|
|
|
low_cardinality_keys.position_sizes[i] = low_cardinality_col->getSizeOfIndexType();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
low_cardinality_keys.nested_columns[i] = key_columns[i];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (has_nullable_keys)
|
|
|
|
Base::init(key_columns);
|
|
|
|
}
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
ALWAYS_INLINE Key getKey(
|
2017-12-13 01:27:53 +00:00
|
|
|
const ColumnRawPtrs & key_columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t keys_size,
|
|
|
|
size_t i,
|
|
|
|
const Sizes & key_sizes,
|
2017-12-01 19:34:51 +00:00
|
|
|
StringRefs &,
|
|
|
|
Arena &) const
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
if (has_nullable_keys)
|
|
|
|
{
|
|
|
|
auto bitmap = Base::createBitmap(i);
|
|
|
|
return packFixed<Key>(i, keys_size, Base::getActualColumns(), key_sizes, bitmap);
|
|
|
|
}
|
|
|
|
else
|
2018-09-14 13:02:03 +00:00
|
|
|
{
|
|
|
|
if constexpr (has_low_cardinality)
|
|
|
|
return packFixed<Key, true>(i, keys_size, low_cardinality_keys.nested_columns, key_sizes,
|
|
|
|
&low_cardinality_keys.positions, &low_cardinality_keys.position_sizes);
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return packFixed<Key>(i, keys_size, key_columns, key_sizes);
|
2018-09-14 13:02:03 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onNewKey(typename Data::value_type &, size_t, StringRefs &, Arena &)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onExistingKey(const Key &, StringRefs &, Arena &) {}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
static const bool no_consecutive_keys_optimization = false;
|
2018-08-23 13:22:03 +00:00
|
|
|
static constexpr bool low_cardinality_optimization = false;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-12-15 03:47:43 +00:00
|
|
|
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t keys_size, const Sizes & key_sizes)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
static constexpr auto bitmap_size = has_nullable_keys ? std::tuple_size<KeysNullMap<Key>>::value : 0;
|
|
|
|
/// In any hash key value, column values to be read start just after the bitmap, if it exists.
|
|
|
|
size_t pos = bitmap_size;
|
|
|
|
|
|
|
|
for (size_t i = 0; i < keys_size; ++i)
|
|
|
|
{
|
|
|
|
IColumn * observed_column;
|
|
|
|
ColumnUInt8 * null_map;
|
|
|
|
|
|
|
|
/// If we have a nullable column, get its nested column and its null map.
|
2017-12-09 10:14:45 +00:00
|
|
|
if (has_nullable_keys && key_columns[i]->isColumnNullable())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(*key_columns[i]);
|
2017-12-14 03:56:56 +00:00
|
|
|
observed_column = &nullable_col.getNestedColumn();
|
2017-12-14 04:25:22 +00:00
|
|
|
null_map = static_cast<ColumnUInt8 *>(&nullable_col.getNullMapColumn());
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2017-12-15 03:47:43 +00:00
|
|
|
observed_column = key_columns[i].get();
|
2017-04-01 07:20:54 +00:00
|
|
|
null_map = nullptr;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool is_null;
|
2017-12-09 10:14:45 +00:00
|
|
|
if (has_nullable_keys && key_columns[i]->isColumnNullable())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
/// The current column is nullable. Check if the value of the
|
|
|
|
/// corresponding key is nullable. Update the null map accordingly.
|
|
|
|
size_t bucket = i / 8;
|
|
|
|
size_t offset = i % 8;
|
|
|
|
UInt8 val = (reinterpret_cast<const UInt8 *>(&value.first)[bucket] >> offset) & 1;
|
|
|
|
null_map->insert(val);
|
|
|
|
is_null = val == 1;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
is_null = false;
|
|
|
|
|
|
|
|
if (has_nullable_keys && is_null)
|
|
|
|
observed_column->insertDefault();
|
|
|
|
else
|
|
|
|
{
|
|
|
|
size_t size = key_sizes[i];
|
|
|
|
observed_column->insertData(reinterpret_cast<const char *>(&value.first) + pos, size);
|
|
|
|
pos += size;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-09-12 13:27:00 +00:00
|
|
|
|
|
|
|
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
2014-05-10 00:31:22 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Aggregates by concatenating serialized key values.
|
|
|
|
* The serialized value differs in that it uniquely allows to deserialize it, having only the position with which it starts.
|
|
|
|
* That is, for example, for strings, it contains first the serialized length of the string, and then the bytes.
|
|
|
|
* Therefore, when aggregating by several strings, there is no ambiguity.
|
2015-10-04 06:10:48 +00:00
|
|
|
*/
|
|
|
|
template <typename TData>
|
|
|
|
struct AggregationMethodSerialized
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
using Data = TData;
|
|
|
|
using Key = typename Data::key_type;
|
|
|
|
using Mapped = typename Data::mapped_type;
|
|
|
|
using iterator = typename Data::iterator;
|
|
|
|
using const_iterator = typename Data::const_iterator;
|
|
|
|
|
|
|
|
Data data;
|
|
|
|
|
|
|
|
AggregationMethodSerialized() {}
|
|
|
|
|
|
|
|
template <typename Other>
|
|
|
|
AggregationMethodSerialized(const Other & other) : data(other.data) {}
|
|
|
|
|
|
|
|
struct State
|
|
|
|
{
|
2017-12-13 01:27:53 +00:00
|
|
|
void init(ColumnRawPtrs &)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
ALWAYS_INLINE Key getKey(
|
2017-12-13 01:27:53 +00:00
|
|
|
const ColumnRawPtrs & key_columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t keys_size,
|
|
|
|
size_t i,
|
2017-12-01 19:34:51 +00:00
|
|
|
const Sizes &,
|
|
|
|
StringRefs &,
|
2017-04-01 07:20:54 +00:00
|
|
|
Arena & pool) const
|
|
|
|
{
|
2017-12-01 17:49:12 +00:00
|
|
|
return serializeKeysToPoolContiguous(i, keys_size, key_columns, pool);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
static AggregateDataPtr & getAggregateData(Mapped & value) { return value; }
|
|
|
|
static const AggregateDataPtr & getAggregateData(const Mapped & value) { return value; }
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onNewKey(typename Data::value_type &, size_t, StringRefs &, Arena &)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2018-09-03 00:00:56 +00:00
|
|
|
static ALWAYS_INLINE void onExistingKey(const Key & key, StringRefs &, Arena & pool)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
pool.rollback(key.size);
|
|
|
|
}
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// If the key already was, it is removed from the pool (overwritten), and the next key can not be compared with it.
|
2017-04-01 07:20:54 +00:00
|
|
|
static const bool no_consecutive_keys_optimization = true;
|
2018-08-23 13:22:03 +00:00
|
|
|
static constexpr bool low_cardinality_optimization = false;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-12-15 03:47:43 +00:00
|
|
|
static void insertKeyIntoColumns(const typename Data::value_type & value, MutableColumns & key_columns, size_t keys_size, const Sizes &)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
auto pos = value.first.data;
|
|
|
|
for (size_t i = 0; i < keys_size; ++i)
|
|
|
|
pos = key_columns[i]->deserializeAndInsertFromArena(pos);
|
|
|
|
}
|
2018-09-12 13:27:00 +00:00
|
|
|
|
|
|
|
static AggregationStateCachePtr createCache(const AggregationStateCache::Settings & /*settings*/) { return nullptr; }
|
2015-10-04 06:10:48 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2014-05-10 00:31:22 +00:00
|
|
|
class Aggregator;
|
2011-09-26 07:25:22 +00:00
|
|
|
|
2013-12-16 02:32:00 +00:00
|
|
|
struct AggregatedDataVariants : private boost::noncopyable
|
2011-09-26 07:25:22 +00:00
|
|
|
{
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Working with states of aggregate functions in the pool is arranged in the following (inconvenient) way:
|
|
|
|
* - when aggregating, states are created in the pool using IAggregateFunction::create (inside - `placement new` of arbitrary structure);
|
|
|
|
* - they must then be destroyed using IAggregateFunction::destroy (inside - calling the destructor of arbitrary structure);
|
|
|
|
* - if aggregation is complete, then, in the Aggregator::convertToBlocks function, pointers to the states of aggregate functions
|
|
|
|
* are written to ColumnAggregateFunction; ColumnAggregateFunction "acquires ownership" of them, that is - calls `destroy` in its destructor.
|
|
|
|
* - if during the aggregation, before call to Aggregator::convertToBlocks, an exception was thrown,
|
|
|
|
* then the states of aggregate functions must still be destroyed,
|
|
|
|
* otherwise, for complex states (eg, AggregateFunctionUniq), there will be memory leaks;
|
|
|
|
* - in this case, to destroy states, the destructor calls Aggregator::destroyAggregateStates method,
|
|
|
|
* but only if the variable aggregator (see below) is not nullptr;
|
|
|
|
* - that is, until you transfer ownership of the aggregate function states in the ColumnAggregateFunction, set the variable `aggregator`,
|
|
|
|
* so that when an exception occurs, the states are correctly destroyed.
|
2017-04-01 07:20:54 +00:00
|
|
|
*
|
2017-06-02 21:37:28 +00:00
|
|
|
* PS. This can be corrected by making a pool that knows about which states of aggregate functions and in which order are put in it, and knows how to destroy them.
|
|
|
|
* But this can hardly be done simply because it is planned to put variable-length strings into the same pool.
|
|
|
|
* In this case, the pool will not be able to know with what offsets objects are stored.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
Aggregator * aggregator = nullptr;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
size_t keys_size; /// Number of keys. NOTE do we need this field?
|
|
|
|
Sizes key_sizes; /// Dimensions of keys, if keys of fixed length
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Pools for states of aggregate functions. Ownership will be later transferred to ColumnAggregateFunction.
|
2017-04-01 07:20:54 +00:00
|
|
|
Arenas aggregates_pools;
|
2017-06-02 21:37:28 +00:00
|
|
|
Arena * aggregates_pool; /// The pool that is currently used for allocation.
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
AggregatedDataWithoutKey without_key = nullptr;
|
|
|
|
|
2017-12-15 03:47:43 +00:00
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>> key8;
|
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>> key16;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-12-15 03:47:43 +00:00
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>> key32;
|
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>> key64;
|
2017-04-01 07:20:54 +00:00
|
|
|
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKey>> key_string;
|
2017-12-15 03:47:43 +00:00
|
|
|
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKey>> key_fixed_string;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128>> keys128;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256>> keys256;
|
2017-04-01 07:20:54 +00:00
|
|
|
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKey>> serialized;
|
|
|
|
|
2017-12-15 03:47:43 +00:00
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>> key32_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>> key64_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyTwoLevel>> key_string_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyTwoLevel>> key_fixed_string_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel>> keys128_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel>> keys256_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyTwoLevel>> serialized_two_level;
|
|
|
|
|
|
|
|
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyHash64>> key64_hash64;
|
|
|
|
std::unique_ptr<AggregationMethodString<AggregatedDataWithStringKeyHash64>> key_string_hash64;
|
2017-04-01 07:20:54 +00:00
|
|
|
std::unique_ptr<AggregationMethodFixedString<AggregatedDataWithStringKeyHash64>> key_fixed_string_hash64;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128Hash64>> keys128_hash64;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256Hash64>> keys256_hash64;
|
2017-12-15 03:47:43 +00:00
|
|
|
std::unique_ptr<AggregationMethodSerialized<AggregatedDataWithStringKeyHash64>> serialized_hash64;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/// Support for nullable keys.
|
2017-12-15 03:47:43 +00:00
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, true>> nullable_keys128;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, true>> nullable_keys256;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, true>> nullable_keys128_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, true>> nullable_keys256_two_level;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-08-21 14:53:51 +00:00
|
|
|
/// Support for low cardinality.
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>>> low_cardinality_key8;
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>>> low_cardinality_key16;
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>>> low_cardinality_key32;
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>>> low_cardinality_key64;
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithStringKey>>> low_cardinality_key_string;
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithStringKey>>> low_cardinality_key_fixed_string;
|
|
|
|
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64KeyTwoLevel>>> low_cardinality_key32_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64KeyTwoLevel>>> low_cardinality_key64_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodString<AggregatedDataWithStringKeyTwoLevel>>> low_cardinality_key_string_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodSingleLowCardinalityColumn<AggregationMethodFixedString<AggregatedDataWithStringKeyTwoLevel>>> low_cardinality_key_fixed_string_two_level;
|
|
|
|
|
2018-09-14 13:02:03 +00:00
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128, false, true>> low_cardinality_keys128;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256, false, true>> low_cardinality_keys256;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys128TwoLevel, false, true>> low_cardinality_keys128_two_level;
|
|
|
|
std::unique_ptr<AggregationMethodKeysFixed<AggregatedDataWithKeys256TwoLevel, false, true>> low_cardinality_keys256_two_level;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// In this and similar macros, the option without_key is not considered.
|
2017-04-01 07:20:54 +00:00
|
|
|
#define APPLY_FOR_AGGREGATED_VARIANTS(M) \
|
2017-12-15 03:47:43 +00:00
|
|
|
M(key8, false) \
|
|
|
|
M(key16, false) \
|
|
|
|
M(key32, false) \
|
|
|
|
M(key64, false) \
|
|
|
|
M(key_string, false) \
|
|
|
|
M(key_fixed_string, false) \
|
|
|
|
M(keys128, false) \
|
|
|
|
M(keys256, false) \
|
|
|
|
M(serialized, false) \
|
|
|
|
M(key32_two_level, true) \
|
|
|
|
M(key64_two_level, true) \
|
|
|
|
M(key_string_two_level, true) \
|
|
|
|
M(key_fixed_string_two_level, true) \
|
|
|
|
M(keys128_two_level, true) \
|
|
|
|
M(keys256_two_level, true) \
|
|
|
|
M(serialized_two_level, true) \
|
|
|
|
M(key64_hash64, false) \
|
|
|
|
M(key_string_hash64, false) \
|
2017-04-01 07:20:54 +00:00
|
|
|
M(key_fixed_string_hash64, false) \
|
2017-12-15 03:47:43 +00:00
|
|
|
M(keys128_hash64, false) \
|
|
|
|
M(keys256_hash64, false) \
|
|
|
|
M(serialized_hash64, false) \
|
|
|
|
M(nullable_keys128, false) \
|
|
|
|
M(nullable_keys256, false) \
|
|
|
|
M(nullable_keys128_two_level, true) \
|
|
|
|
M(nullable_keys256_two_level, true) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(low_cardinality_key8, false) \
|
|
|
|
M(low_cardinality_key16, false) \
|
|
|
|
M(low_cardinality_key32, false) \
|
|
|
|
M(low_cardinality_key64, false) \
|
2018-09-14 13:02:03 +00:00
|
|
|
M(low_cardinality_keys128, false) \
|
|
|
|
M(low_cardinality_keys256, false) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(low_cardinality_key_string, false) \
|
|
|
|
M(low_cardinality_key_fixed_string, false) \
|
|
|
|
M(low_cardinality_key32_two_level, true) \
|
|
|
|
M(low_cardinality_key64_two_level, true) \
|
2018-09-14 13:02:03 +00:00
|
|
|
M(low_cardinality_keys128_two_level, true) \
|
|
|
|
M(low_cardinality_keys256_two_level, true) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(low_cardinality_key_string_two_level, true) \
|
|
|
|
M(low_cardinality_key_fixed_string_two_level, true) \
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
enum class Type
|
|
|
|
{
|
|
|
|
EMPTY = 0,
|
|
|
|
without_key,
|
|
|
|
|
|
|
|
#define M(NAME, IS_TWO_LEVEL) NAME,
|
|
|
|
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
|
|
|
#undef M
|
|
|
|
};
|
|
|
|
Type type = Type::EMPTY;
|
|
|
|
|
|
|
|
AggregatedDataVariants() : aggregates_pools(1, std::make_shared<Arena>()), aggregates_pool(aggregates_pools.back().get()) {}
|
|
|
|
bool empty() const { return type == Type::EMPTY; }
|
|
|
|
void invalidate() { type = Type::EMPTY; }
|
|
|
|
|
|
|
|
~AggregatedDataVariants();
|
|
|
|
|
|
|
|
void init(Type type_)
|
|
|
|
{
|
|
|
|
switch (type_)
|
|
|
|
{
|
2018-02-16 20:53:47 +00:00
|
|
|
case Type::EMPTY: break;
|
|
|
|
case Type::without_key: break;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
#define M(NAME, IS_TWO_LEVEL) \
|
|
|
|
case Type::NAME: NAME = std::make_unique<decltype(NAME)::element_type>(); break;
|
|
|
|
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
|
|
|
#undef M
|
|
|
|
|
|
|
|
default:
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
}
|
|
|
|
|
|
|
|
type = type_;
|
|
|
|
}
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Number of rows (different keys).
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t size() const
|
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
2018-02-16 20:53:47 +00:00
|
|
|
case Type::EMPTY: return 0;
|
|
|
|
case Type::without_key: return 1;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
#define M(NAME, IS_TWO_LEVEL) \
|
|
|
|
case Type::NAME: return NAME->data.size() + (without_key != nullptr);
|
|
|
|
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
|
|
|
#undef M
|
|
|
|
|
|
|
|
default:
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// The size without taking into account the row in which data is written for the calculation of TOTALS.
|
2017-04-01 07:20:54 +00:00
|
|
|
size_t sizeWithoutOverflowRow() const
|
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
2018-02-16 20:53:47 +00:00
|
|
|
case Type::EMPTY: return 0;
|
|
|
|
case Type::without_key: return 1;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
#define M(NAME, IS_TWO_LEVEL) \
|
|
|
|
case Type::NAME: return NAME->data.size();
|
|
|
|
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
|
|
|
#undef M
|
|
|
|
|
|
|
|
default:
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const char * getMethodName() const
|
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
2018-02-16 20:53:47 +00:00
|
|
|
case Type::EMPTY: return "EMPTY";
|
|
|
|
case Type::without_key: return "without_key";
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
#define M(NAME, IS_TWO_LEVEL) \
|
|
|
|
case Type::NAME: return #NAME;
|
|
|
|
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
|
|
|
#undef M
|
|
|
|
|
|
|
|
default:
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool isTwoLevel() const
|
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
2018-02-16 20:53:47 +00:00
|
|
|
case Type::EMPTY: return false;
|
|
|
|
case Type::without_key: return false;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
#define M(NAME, IS_TWO_LEVEL) \
|
|
|
|
case Type::NAME: return IS_TWO_LEVEL;
|
|
|
|
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
|
|
|
#undef M
|
|
|
|
|
|
|
|
default:
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#define APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
|
|
|
M(key32) \
|
|
|
|
M(key64) \
|
2018-01-09 00:19:58 +00:00
|
|
|
M(key_string) \
|
|
|
|
M(key_fixed_string) \
|
|
|
|
M(keys128) \
|
|
|
|
M(keys256) \
|
|
|
|
M(serialized) \
|
|
|
|
M(nullable_keys128) \
|
|
|
|
M(nullable_keys256) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(low_cardinality_key32) \
|
|
|
|
M(low_cardinality_key64) \
|
2018-09-14 13:02:03 +00:00
|
|
|
M(low_cardinality_keys128) \
|
|
|
|
M(low_cardinality_keys256) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(low_cardinality_key_string) \
|
|
|
|
M(low_cardinality_key_fixed_string) \
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
#define APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
2018-01-09 00:19:58 +00:00
|
|
|
M(key8) \
|
2017-04-01 07:20:54 +00:00
|
|
|
M(key16) \
|
|
|
|
M(key64_hash64) \
|
2018-01-09 00:19:58 +00:00
|
|
|
M(key_string_hash64)\
|
2017-04-01 07:20:54 +00:00
|
|
|
M(key_fixed_string_hash64) \
|
2018-01-09 00:19:58 +00:00
|
|
|
M(keys128_hash64) \
|
|
|
|
M(keys256_hash64) \
|
2017-04-01 07:20:54 +00:00
|
|
|
M(serialized_hash64) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(low_cardinality_key8) \
|
|
|
|
M(low_cardinality_key16) \
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
#define APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) \
|
|
|
|
APPLY_FOR_VARIANTS_NOT_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
|
|
|
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M) \
|
|
|
|
|
|
|
|
bool isConvertibleToTwoLevel() const
|
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
|
|
|
#define M(NAME) \
|
|
|
|
case Type::NAME: return true;
|
|
|
|
|
|
|
|
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
|
|
|
|
|
|
|
|
#undef M
|
|
|
|
default:
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
void convertToTwoLevel();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
#define APPLY_FOR_VARIANTS_TWO_LEVEL(M) \
|
2018-01-09 00:19:58 +00:00
|
|
|
M(key32_two_level) \
|
|
|
|
M(key64_two_level) \
|
|
|
|
M(key_string_two_level) \
|
|
|
|
M(key_fixed_string_two_level) \
|
|
|
|
M(keys128_two_level) \
|
|
|
|
M(keys256_two_level) \
|
|
|
|
M(serialized_two_level) \
|
|
|
|
M(nullable_keys128_two_level) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(nullable_keys256_two_level) \
|
|
|
|
M(low_cardinality_key32_two_level) \
|
|
|
|
M(low_cardinality_key64_two_level) \
|
2018-09-14 13:02:03 +00:00
|
|
|
M(low_cardinality_keys128_two_level) \
|
|
|
|
M(low_cardinality_keys256_two_level) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(low_cardinality_key_string_two_level) \
|
|
|
|
M(low_cardinality_key_fixed_string_two_level) \
|
|
|
|
|
|
|
|
#define APPLY_FOR_LOW_CARDINALITY_VARIANTS(M) \
|
|
|
|
M(low_cardinality_key8) \
|
|
|
|
M(low_cardinality_key16) \
|
|
|
|
M(low_cardinality_key32) \
|
|
|
|
M(low_cardinality_key64) \
|
2018-09-14 13:02:03 +00:00
|
|
|
M(low_cardinality_keys128) \
|
|
|
|
M(low_cardinality_keys256) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(low_cardinality_key_string) \
|
|
|
|
M(low_cardinality_key_fixed_string) \
|
|
|
|
M(low_cardinality_key32_two_level) \
|
|
|
|
M(low_cardinality_key64_two_level) \
|
2018-09-14 13:02:03 +00:00
|
|
|
M(low_cardinality_keys128_two_level) \
|
|
|
|
M(low_cardinality_keys256_two_level) \
|
2018-08-21 14:53:51 +00:00
|
|
|
M(low_cardinality_key_string_two_level) \
|
|
|
|
M(low_cardinality_key_fixed_string_two_level) \
|
|
|
|
|
|
|
|
bool isLowCardinality()
|
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
|
|
|
#define M(NAME) \
|
|
|
|
case Type::NAME: return true;
|
|
|
|
|
|
|
|
APPLY_FOR_LOW_CARDINALITY_VARIANTS(M)
|
|
|
|
#undef M
|
|
|
|
default:
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
2018-09-12 13:27:00 +00:00
|
|
|
|
|
|
|
static AggregationStateCachePtr createCache(Type type, const AggregationStateCache::Settings & settings)
|
|
|
|
{
|
|
|
|
switch (type)
|
|
|
|
{
|
2018-09-14 13:02:03 +00:00
|
|
|
case Type::without_key: return nullptr;
|
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
#define M(NAME, IS_TWO_LEVEL) \
|
|
|
|
case Type::NAME: \
|
|
|
|
{ \
|
|
|
|
using TPtr ## NAME = decltype(AggregatedDataVariants::NAME); \
|
|
|
|
using T ## NAME = typename TPtr ## NAME ::element_type; \
|
|
|
|
return T ## NAME ::createCache(settings); \
|
|
|
|
}
|
|
|
|
|
|
|
|
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
|
|
|
#undef M
|
2018-09-14 13:02:03 +00:00
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
default:
|
|
|
|
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
|
|
|
|
}
|
|
|
|
}
|
2011-09-26 07:25:22 +00:00
|
|
|
};
|
|
|
|
|
2016-05-28 14:14:18 +00:00
|
|
|
using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
|
2016-05-28 10:35:44 +00:00
|
|
|
using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
|
2012-02-27 06:28:20 +00:00
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** How are "total" values calculated with WITH TOTALS?
|
|
|
|
* (For more details, see TotalsHavingBlockInputStream.)
|
2015-11-09 18:45:55 +00:00
|
|
|
*
|
2017-06-02 21:37:28 +00:00
|
|
|
* In the absence of group_by_overflow_mode = 'any', the data is aggregated as usual, but the states of the aggregate functions are not finalized.
|
|
|
|
* Later, the aggregate function states for all rows (passed through HAVING) are merged into one - this will be TOTALS.
|
2015-11-09 18:45:55 +00:00
|
|
|
*
|
2017-06-02 21:37:28 +00:00
|
|
|
* If there is group_by_overflow_mode = 'any', the data is aggregated as usual, except for the keys that did not fit in max_rows_to_group_by.
|
|
|
|
* For these keys, the data is aggregated into one additional row - see below under the names `overflow_row`, `overflows`...
|
|
|
|
* Later, the aggregate function states for all rows (passed through HAVING) are merged into one,
|
|
|
|
* also overflow_row is added or not added (depending on the totals_mode setting) also - this will be TOTALS.
|
2015-11-09 18:45:55 +00:00
|
|
|
*/
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Aggregates the source of the blocks.
|
2011-09-19 01:42:16 +00:00
|
|
|
*/
|
2011-09-19 03:34:23 +00:00
|
|
|
class Aggregator
|
2011-09-19 01:42:16 +00:00
|
|
|
{
|
|
|
|
public:
|
2017-04-01 07:20:54 +00:00
|
|
|
struct Params
|
|
|
|
{
|
2018-01-06 18:10:44 +00:00
|
|
|
/// Data structure of source blocks.
|
|
|
|
Block src_header;
|
|
|
|
/// Data structure of intermediate blocks before merge.
|
|
|
|
Block intermediate_header;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// What to count.
|
2018-01-06 18:10:44 +00:00
|
|
|
ColumnNumbers keys;
|
2017-04-01 07:20:54 +00:00
|
|
|
AggregateDescriptions aggregates;
|
|
|
|
size_t keys_size;
|
|
|
|
size_t aggregates_size;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// The settings of approximate calculation of GROUP BY.
|
|
|
|
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
|
2017-04-01 07:20:54 +00:00
|
|
|
const size_t max_rows_to_group_by;
|
|
|
|
const OverflowMode group_by_overflow_mode;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// For dynamic compilation.
|
2017-04-01 07:20:54 +00:00
|
|
|
Compiler * compiler;
|
|
|
|
const UInt32 min_count_to_compile;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Two-level aggregation settings (used for a large number of keys).
|
|
|
|
/** With how many keys or the size of the aggregation state in bytes,
|
|
|
|
* two-level aggregation begins to be used. Enough to reach of at least one of the thresholds.
|
|
|
|
* 0 - the corresponding threshold is not specified.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
const size_t group_by_two_level_threshold;
|
|
|
|
const size_t group_by_two_level_threshold_bytes;
|
|
|
|
|
2017-06-21 19:07:08 +00:00
|
|
|
/// Settings to flush temporary data to the filesystem (external aggregation).
|
2017-06-02 21:37:28 +00:00
|
|
|
const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation.
|
2018-02-18 05:35:48 +00:00
|
|
|
|
|
|
|
/// Return empty result when aggregating without keys on empty set.
|
|
|
|
bool empty_result_for_aggregation_by_empty_set;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
const std::string tmp_path;
|
|
|
|
|
2018-09-20 17:59:47 +00:00
|
|
|
/// Settings is used to determine cache size. No threads are created.
|
2018-09-12 13:27:00 +00:00
|
|
|
size_t max_threads;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
Params(
|
2018-01-06 18:10:44 +00:00
|
|
|
const Block & src_header_,
|
|
|
|
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
|
2017-04-01 07:20:54 +00:00
|
|
|
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
|
|
|
|
Compiler * compiler_, UInt32 min_count_to_compile_,
|
|
|
|
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
|
2018-02-18 05:35:48 +00:00
|
|
|
size_t max_bytes_before_external_group_by_,
|
|
|
|
bool empty_result_for_aggregation_by_empty_set_,
|
2018-09-12 13:27:00 +00:00
|
|
|
const std::string & tmp_path_, size_t max_threads_)
|
2018-01-06 18:10:44 +00:00
|
|
|
: src_header(src_header_),
|
|
|
|
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
|
2017-04-01 07:20:54 +00:00
|
|
|
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
|
|
|
compiler(compiler_), min_count_to_compile(min_count_to_compile_),
|
|
|
|
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
|
2018-02-18 05:35:48 +00:00
|
|
|
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
|
|
|
|
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
|
2018-09-12 13:27:00 +00:00
|
|
|
tmp_path(tmp_path_), max_threads(max_threads_)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Only parameters that matter during merge.
|
2018-01-06 18:10:44 +00:00
|
|
|
Params(const Block & intermediate_header_,
|
2018-09-12 13:27:00 +00:00
|
|
|
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
|
|
|
|
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, false, "", max_threads_)
|
2018-01-06 18:10:44 +00:00
|
|
|
{
|
|
|
|
intermediate_header = intermediate_header_;
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-01-06 18:10:44 +00:00
|
|
|
/// Calculate the column numbers in `keys` and `aggregates`.
|
2017-04-01 07:20:54 +00:00
|
|
|
void calculateColumnNumbers(const Block & block);
|
|
|
|
};
|
|
|
|
|
2018-01-06 18:10:44 +00:00
|
|
|
Aggregator(const Params & params_);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Aggregate the source. Get the result in the form of one of the data structures.
|
2017-09-08 02:29:47 +00:00
|
|
|
void execute(const BlockInputStreamPtr & stream, AggregatedDataVariants & result);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-12-13 01:27:53 +00:00
|
|
|
using AggregateColumns = std::vector<ColumnRawPtrs>;
|
2017-12-15 21:32:25 +00:00
|
|
|
using AggregateColumnsData = std::vector<ColumnAggregateFunction::Container *>;
|
|
|
|
using AggregateColumnsConstData = std::vector<const ColumnAggregateFunction::Container *>;
|
2017-04-01 07:20:54 +00:00
|
|
|
using AggregateFunctionsPlainPtrs = std::vector<IAggregateFunction *>;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
|
2018-02-18 05:35:48 +00:00
|
|
|
bool executeOnBlock(const Block & block, AggregatedDataVariants & result,
|
2017-12-13 01:27:53 +00:00
|
|
|
ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block
|
2018-02-18 04:17:11 +00:00
|
|
|
StringRefs & keys, /// - pass the corresponding objects that are initially empty.
|
2017-04-01 07:20:54 +00:00
|
|
|
bool & no_more_keys);
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Convert the aggregation data structure into a block.
|
|
|
|
* If overflow_row = true, then aggregates for rows that are not included in max_rows_to_group_by are put in the first block.
|
2017-04-01 07:20:54 +00:00
|
|
|
*
|
2017-06-02 21:37:28 +00:00
|
|
|
* If final = false, then ColumnAggregateFunction is created as the aggregation columns with the state of the calculations,
|
|
|
|
* which can then be combined with other states (for distributed query processing).
|
|
|
|
* If final = true, then columns with ready values are created as aggregate columns.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
BlocksList convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Merge several aggregation data structures and output the result as a block stream.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
std::unique_ptr<IBlockInputStream> mergeAndConvertToBlocks(ManyAggregatedDataVariants & data_variants, bool final, size_t max_threads) const;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Merge the stream of partially aggregated blocks into one data structure.
|
|
|
|
* (Pre-aggregate several blocks that represent the result of independent aggregations from remote servers.)
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
2017-09-08 02:29:47 +00:00
|
|
|
void mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-07-25 12:16:14 +00:00
|
|
|
/// Merge several partially aggregated blocks into one.
|
|
|
|
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
|
|
|
|
/// (either all blocks are from overflow data or none blocks are).
|
|
|
|
/// The resulting block has the same value of is_overflows flag.
|
2017-04-01 07:20:54 +00:00
|
|
|
Block mergeBlocks(BlocksList & blocks, bool final);
|
|
|
|
|
|
|
|
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
|
|
|
|
* This is needed to simplify merging of that data with other results, that are already two-level.
|
|
|
|
*/
|
2018-09-12 13:27:00 +00:00
|
|
|
std::vector<Block> convertBlockToTwoLevel(const Block & block);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
using CancellationHook = std::function<bool()>;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Set a function that checks whether the current task can be aborted.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
void setCancellationHook(const CancellationHook cancellation_hook);
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// For external aggregation.
|
2017-07-25 13:09:52 +00:00
|
|
|
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
bool hasTemporaryFiles() const { return !temporary_files.empty(); }
|
|
|
|
|
|
|
|
struct TemporaryFiles
|
|
|
|
{
|
|
|
|
std::vector<std::unique_ptr<Poco::TemporaryFile>> files;
|
|
|
|
size_t sum_size_uncompressed = 0;
|
|
|
|
size_t sum_size_compressed = 0;
|
|
|
|
mutable std::mutex mutex;
|
|
|
|
|
|
|
|
bool empty() const
|
|
|
|
{
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
return files.empty();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }
|
2015-12-01 14:09:05 +00:00
|
|
|
|
2018-01-06 18:10:44 +00:00
|
|
|
/// Get data structure of the result.
|
|
|
|
Block getHeader(bool final) const;
|
|
|
|
|
2013-09-15 07:17:26 +00:00
|
|
|
protected:
|
2017-04-01 07:20:54 +00:00
|
|
|
friend struct AggregatedDataVariants;
|
|
|
|
friend class MergingAndConvertingBlockInputStream;
|
|
|
|
|
|
|
|
Params params;
|
|
|
|
|
2018-08-27 17:42:13 +00:00
|
|
|
AggregatedDataVariants::Type method_chosen;
|
2018-02-18 04:17:11 +00:00
|
|
|
Sizes key_sizes;
|
|
|
|
|
2018-09-12 13:27:00 +00:00
|
|
|
AggregationStateCachePtr aggregation_state_cache;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
AggregateFunctionsPlainPtrs aggregate_functions;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** This array serves two purposes.
|
2017-04-01 07:20:54 +00:00
|
|
|
*
|
2017-06-02 21:37:28 +00:00
|
|
|
* 1. Function arguments are collected side by side, and they do not need to be collected from different places. Also the array is made zero-terminated.
|
|
|
|
* The inner loop (for the case without_key) is almost twice as compact; performance gain of about 30%.
|
2017-04-01 07:20:54 +00:00
|
|
|
*
|
2017-06-02 21:37:28 +00:00
|
|
|
* 2. Calling a function by pointer is better than a virtual call, because in the case of a virtual call,
|
|
|
|
* GCC 5.1.2 generates code that, at each iteration of the loop, reloads the function address from memory into the register
|
|
|
|
* (the offset value in the virtual function table).
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
struct AggregateFunctionInstruction
|
|
|
|
{
|
|
|
|
const IAggregateFunction * that;
|
|
|
|
IAggregateFunction::AddFunc func;
|
|
|
|
size_t state_offset;
|
|
|
|
const IColumn ** arguments;
|
|
|
|
};
|
|
|
|
|
|
|
|
using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
Sizes offsets_of_aggregate_states; /// The offset to the n-th aggregate function in a row of aggregate functions.
|
|
|
|
size_t total_size_of_aggregate_states = 0; /// The total size of the row from the aggregate functions.
|
2018-08-05 08:45:15 +00:00
|
|
|
|
2018-09-01 03:17:43 +00:00
|
|
|
// add info to track alignment requirement
|
|
|
|
// If there are states whose alignmentment are v1, ..vn, align_aggregate_states will be max(v1, ... vn)
|
2018-08-05 08:45:15 +00:00
|
|
|
size_t align_aggregate_states = 1;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
bool all_aggregates_has_trivial_destructor = false;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// How many RAM were used to process the query before processing the first block.
|
2017-04-01 07:20:54 +00:00
|
|
|
Int64 memory_usage_before_aggregation = 0;
|
|
|
|
|
|
|
|
std::mutex mutex;
|
|
|
|
|
|
|
|
Logger * log = &Logger::get("Aggregator");
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Dynamically compiled library for aggregation, if any.
|
|
|
|
* The meaning of dynamic compilation is to specialize code
|
|
|
|
* for a specific list of aggregate functions.
|
|
|
|
* This allows you to expand the loop to create and update states of aggregate functions,
|
|
|
|
* and also use inline-code instead of virtual calls.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
struct CompiledData
|
|
|
|
{
|
|
|
|
SharedLibraryPtr compiled_aggregator;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Obtained with dlsym. It is still necessary to make reinterpret_cast to the function pointer.
|
2017-04-01 07:20:54 +00:00
|
|
|
void * compiled_method_ptr = nullptr;
|
|
|
|
void * compiled_two_level_method_ptr = nullptr;
|
|
|
|
};
|
2017-06-02 21:37:28 +00:00
|
|
|
/// shared_ptr - to pass into a callback, that can survive Aggregator.
|
2017-04-01 07:20:54 +00:00
|
|
|
std::shared_ptr<CompiledData> compiled_data { new CompiledData };
|
|
|
|
|
|
|
|
bool compiled_if_possible = false;
|
|
|
|
void compileIfPossible(AggregatedDataVariants::Type type);
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Returns true if you can abort the current task.
|
2017-04-01 07:20:54 +00:00
|
|
|
CancellationHook isCancelled;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// For external aggregation.
|
2017-04-01 07:20:54 +00:00
|
|
|
TemporaryFiles temporary_files;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Select the aggregation method based on the number and types of keys. */
|
2018-02-18 04:17:11 +00:00
|
|
|
AggregatedDataVariants::Type chooseAggregationMethod();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Create states of aggregate functions for one key.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
void createAggregateStates(AggregateDataPtr & aggregate_data) const;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Call `destroy` methods for states of aggregate functions.
|
|
|
|
* Used in the exception handler for aggregation, since RAII in this case is not applicable.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
void destroyAllAggregateStates(AggregatedDataVariants & result);
|
|
|
|
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Process one data block, aggregate the data into a hash table.
|
2017-04-01 07:20:54 +00:00
|
|
|
template <typename Method>
|
|
|
|
void executeImpl(
|
|
|
|
Method & method,
|
|
|
|
Arena * aggregates_pool,
|
|
|
|
size_t rows,
|
2017-12-13 01:27:53 +00:00
|
|
|
ColumnRawPtrs & key_columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
AggregateFunctionInstruction * aggregate_instructions,
|
|
|
|
StringRefs & keys,
|
|
|
|
bool no_more_keys,
|
|
|
|
AggregateDataPtr overflow_row) const;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Specialization for a particular value no_more_keys.
|
2017-04-01 07:20:54 +00:00
|
|
|
template <bool no_more_keys, typename Method>
|
|
|
|
void executeImplCase(
|
|
|
|
Method & method,
|
|
|
|
typename Method::State & state,
|
|
|
|
Arena * aggregates_pool,
|
|
|
|
size_t rows,
|
2017-12-13 01:27:53 +00:00
|
|
|
ColumnRawPtrs & key_columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
AggregateFunctionInstruction * aggregate_instructions,
|
|
|
|
StringRefs & keys,
|
|
|
|
AggregateDataPtr overflow_row) const;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// For case when there are no keys (all aggregate into one row).
|
2017-04-01 07:20:54 +00:00
|
|
|
void executeWithoutKeyImpl(
|
|
|
|
AggregatedDataWithoutKey & res,
|
|
|
|
size_t rows,
|
|
|
|
AggregateFunctionInstruction * aggregate_instructions,
|
|
|
|
Arena * arena) const;
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
void writeToTemporaryFileImpl(
|
|
|
|
AggregatedDataVariants & data_variants,
|
|
|
|
Method & method,
|
2017-12-01 21:13:25 +00:00
|
|
|
IBlockOutputStream & out);
|
2015-11-30 19:57:46 +00:00
|
|
|
|
2015-01-11 00:57:21 +00:00
|
|
|
public:
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Templates that are instantiated by dynamic code compilation - see SpecializedAggregator.h
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
template <typename Method, typename AggregateFunctionsList>
|
|
|
|
void executeSpecialized(
|
|
|
|
Method & method,
|
|
|
|
Arena * aggregates_pool,
|
|
|
|
size_t rows,
|
2017-12-13 01:27:53 +00:00
|
|
|
ColumnRawPtrs & key_columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
AggregateColumns & aggregate_columns,
|
|
|
|
StringRefs & keys,
|
|
|
|
bool no_more_keys,
|
|
|
|
AggregateDataPtr overflow_row) const;
|
|
|
|
|
|
|
|
template <bool no_more_keys, typename Method, typename AggregateFunctionsList>
|
|
|
|
void executeSpecializedCase(
|
|
|
|
Method & method,
|
|
|
|
typename Method::State & state,
|
|
|
|
Arena * aggregates_pool,
|
|
|
|
size_t rows,
|
2017-12-13 01:27:53 +00:00
|
|
|
ColumnRawPtrs & key_columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
AggregateColumns & aggregate_columns,
|
|
|
|
StringRefs & keys,
|
|
|
|
AggregateDataPtr overflow_row) const;
|
|
|
|
|
|
|
|
template <typename AggregateFunctionsList>
|
|
|
|
void executeSpecializedWithoutKey(
|
|
|
|
AggregatedDataWithoutKey & res,
|
|
|
|
size_t rows,
|
|
|
|
AggregateColumns & aggregate_columns,
|
|
|
|
Arena * arena) const;
|
2015-01-13 03:03:45 +00:00
|
|
|
|
2015-01-11 00:57:21 +00:00
|
|
|
protected:
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Merge data from hash table `src` into `dst`.
|
2017-04-01 07:20:54 +00:00
|
|
|
template <typename Method, typename Table>
|
|
|
|
void mergeDataImpl(
|
|
|
|
Table & table_dst,
|
|
|
|
Table & table_src,
|
|
|
|
Arena * arena) const;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Merge data from hash table `src` into `dst`, but only for keys that already exist in dst. In other cases, merge the data into `overflows`.
|
2017-04-01 07:20:54 +00:00
|
|
|
template <typename Method, typename Table>
|
|
|
|
void mergeDataNoMoreKeysImpl(
|
|
|
|
Table & table_dst,
|
|
|
|
AggregatedDataWithoutKey & overflows,
|
|
|
|
Table & table_src,
|
|
|
|
Arena * arena) const;
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/// Same, but ignores the rest of the keys.
|
2017-04-01 07:20:54 +00:00
|
|
|
template <typename Method, typename Table>
|
|
|
|
void mergeDataOnlyExistingKeysImpl(
|
|
|
|
Table & table_dst,
|
|
|
|
Table & table_src,
|
|
|
|
Arena * arena) const;
|
|
|
|
|
|
|
|
void mergeWithoutKeyDataImpl(
|
|
|
|
ManyAggregatedDataVariants & non_empty_data) const;
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
void mergeSingleLevelDataImpl(
|
|
|
|
ManyAggregatedDataVariants & non_empty_data) const;
|
|
|
|
|
|
|
|
template <typename Method, typename Table>
|
|
|
|
void convertToBlockImpl(
|
|
|
|
Method & method,
|
|
|
|
Table & data,
|
2017-12-15 03:47:43 +00:00
|
|
|
MutableColumns & key_columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
AggregateColumnsData & aggregate_columns,
|
2017-12-15 03:47:43 +00:00
|
|
|
MutableColumns & final_aggregate_columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
bool final) const;
|
|
|
|
|
|
|
|
template <typename Method, typename Table>
|
|
|
|
void convertToBlockImplFinal(
|
|
|
|
Method & method,
|
|
|
|
Table & data,
|
2017-12-15 03:47:43 +00:00
|
|
|
MutableColumns & key_columns,
|
2018-08-27 18:05:28 +00:00
|
|
|
MutableColumns & final_aggregate_columns) const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
template <typename Method, typename Table>
|
|
|
|
void convertToBlockImplNotFinal(
|
|
|
|
Method & method,
|
|
|
|
Table & data,
|
2017-12-15 03:47:43 +00:00
|
|
|
MutableColumns & key_columns,
|
2018-08-27 18:05:28 +00:00
|
|
|
AggregateColumnsData & aggregate_columns) const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
template <typename Filler>
|
|
|
|
Block prepareBlockAndFill(
|
|
|
|
AggregatedDataVariants & data_variants,
|
|
|
|
bool final,
|
|
|
|
size_t rows,
|
|
|
|
Filler && filler) const;
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
Block convertOneBucketToBlock(
|
|
|
|
AggregatedDataVariants & data_variants,
|
|
|
|
Method & method,
|
|
|
|
bool final,
|
|
|
|
size_t bucket) const;
|
|
|
|
|
2017-07-25 12:16:14 +00:00
|
|
|
Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool is_overflows) const;
|
|
|
|
Block prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_variants, bool final) const;
|
2017-04-01 07:20:54 +00:00
|
|
|
BlocksList prepareBlocksAndFillTwoLevel(AggregatedDataVariants & data_variants, bool final, ThreadPool * thread_pool) const;
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
BlocksList prepareBlocksAndFillTwoLevelImpl(
|
|
|
|
AggregatedDataVariants & data_variants,
|
|
|
|
Method & method,
|
|
|
|
bool final,
|
|
|
|
ThreadPool * thread_pool) const;
|
|
|
|
|
|
|
|
template <bool no_more_keys, typename Method, typename Table>
|
|
|
|
void mergeStreamsImplCase(
|
|
|
|
Block & block,
|
|
|
|
Arena * aggregates_pool,
|
|
|
|
Method & method,
|
|
|
|
Table & data,
|
|
|
|
AggregateDataPtr overflow_row) const;
|
|
|
|
|
|
|
|
template <typename Method, typename Table>
|
|
|
|
void mergeStreamsImpl(
|
|
|
|
Block & block,
|
|
|
|
Arena * aggregates_pool,
|
|
|
|
Method & method,
|
|
|
|
Table & data,
|
|
|
|
AggregateDataPtr overflow_row,
|
|
|
|
bool no_more_keys) const;
|
|
|
|
|
|
|
|
void mergeWithoutKeyStreamsImpl(
|
|
|
|
Block & block,
|
|
|
|
AggregatedDataVariants & result) const;
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
void mergeBucketImpl(
|
|
|
|
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const;
|
|
|
|
|
|
|
|
template <typename Method>
|
|
|
|
void convertBlockToTwoLevelImpl(
|
|
|
|
Method & method,
|
|
|
|
Arena * pool,
|
2017-12-13 01:27:53 +00:00
|
|
|
ColumnRawPtrs & key_columns,
|
2017-04-01 07:20:54 +00:00
|
|
|
StringRefs & keys,
|
|
|
|
const Block & source,
|
|
|
|
std::vector<Block> & destinations) const;
|
|
|
|
|
|
|
|
template <typename Method, typename Table>
|
2017-12-01 20:21:35 +00:00
|
|
|
void destroyImpl(Table & table) const;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
void destroyWithoutKey(
|
|
|
|
AggregatedDataVariants & result) const;
|
|
|
|
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Checks constraints on the maximum number of keys for aggregation.
|
|
|
|
* If it is exceeded, then, depending on the group_by_overflow_mode, either
|
|
|
|
* - throws an exception;
|
|
|
|
* - returns false, which means that execution must be aborted;
|
|
|
|
* - sets the variable no_more_keys to true.
|
2017-04-01 07:20:54 +00:00
|
|
|
*/
|
|
|
|
bool checkLimits(size_t result_size, bool & no_more_keys) const;
|
2011-09-19 01:42:16 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2017-06-02 21:37:28 +00:00
|
|
|
/** Get the aggregation variant by its type. */
|
2015-07-30 23:41:02 +00:00
|
|
|
template <typename Method> Method & getDataVariant(AggregatedDataVariants & variants);
|
|
|
|
|
|
|
|
#define M(NAME, IS_TWO_LEVEL) \
|
2017-04-01 07:20:54 +00:00
|
|
|
template <> inline decltype(AggregatedDataVariants::NAME)::element_type & getDataVariant<decltype(AggregatedDataVariants::NAME)::element_type>(AggregatedDataVariants & variants) { return *variants.NAME; }
|
2015-07-30 23:41:02 +00:00
|
|
|
|
|
|
|
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
|
|
|
|
|
|
|
#undef M
|
|
|
|
|
|
|
|
|
2011-09-19 01:42:16 +00:00
|
|
|
}
|