Prefetching in aggregation (#39304)

* impl

* stash

* clean up

* do not apply when HT is small

* make branch static

* also in merge

* do not hardcode look ahead value

* fix

* apply to methods with cheap key calculation

* more tests

* silence tidy

* fix build

* support HashMethodKeysFixed

* apply during merge only for cheap

* stash

* fixes

* rename method

* add feature flag

* cache prefetch threshold value

* fix

* fix

* Update HashMap.h

* fix typo

* 256KB as default l2 size

Co-authored-by: Alexey Milovidov <milovidov@clickhouse.com>
This commit is contained in:
Nikita Taranov 2022-09-21 18:59:07 +02:00 committed by GitHub
parent 1204f643f1
commit 100c055510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 337 additions and 51 deletions

View File

@ -36,6 +36,8 @@ struct HashMethodOneNumber
using Self = HashMethodOneNumber<Value, Mapped, FieldType, use_cache, need_offset>; using Self = HashMethodOneNumber<Value, Mapped, FieldType, use_cache, need_offset>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache, need_offset>; using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache, need_offset>;
static constexpr bool has_cheap_key_calculation = true;
const char * vec; const char * vec;
/// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise. /// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise.
@ -78,6 +80,8 @@ struct HashMethodString
using Self = HashMethodString<Value, Mapped, place_string_to_arena, use_cache, need_offset>; using Self = HashMethodString<Value, Mapped, place_string_to_arena, use_cache, need_offset>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache, need_offset>; using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache, need_offset>;
static constexpr bool has_cheap_key_calculation = false;
const IColumn::Offset * offsets; const IColumn::Offset * offsets;
const UInt8 * chars; const UInt8 * chars;
@ -117,6 +121,8 @@ struct HashMethodFixedString
using Self = HashMethodFixedString<Value, Mapped, place_string_to_arena, use_cache, need_offset>; using Self = HashMethodFixedString<Value, Mapped, place_string_to_arena, use_cache, need_offset>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache, need_offset>; using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache, need_offset>;
static constexpr bool has_cheap_key_calculation = false;
size_t n; size_t n;
const ColumnFixedString::Chars * chars; const ColumnFixedString::Chars * chars;
@ -210,6 +216,8 @@ struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod
using EmplaceResult = columns_hashing_impl::EmplaceResultImpl<Mapped>; using EmplaceResult = columns_hashing_impl::EmplaceResultImpl<Mapped>;
using FindResult = columns_hashing_impl::FindResultImpl<Mapped>; using FindResult = columns_hashing_impl::FindResultImpl<Mapped>;
static constexpr bool has_cheap_key_calculation = Base::has_cheap_key_calculation;
static HashMethodContextPtr createContext(const HashMethodContext::Settings & settings) static HashMethodContextPtr createContext(const HashMethodContext::Settings & settings)
{ {
return std::make_shared<LowCardinalityDictionaryCache>(settings); return std::make_shared<LowCardinalityDictionaryCache>(settings);
@ -479,6 +487,8 @@ struct HashMethodKeysFixed
static constexpr bool has_nullable_keys = has_nullable_keys_; static constexpr bool has_nullable_keys = has_nullable_keys_;
static constexpr bool has_low_cardinality = has_low_cardinality_; static constexpr bool has_low_cardinality = has_low_cardinality_;
static constexpr bool has_cheap_key_calculation = true;
LowCardinalityKeys<has_low_cardinality> low_cardinality_keys; LowCardinalityKeys<has_low_cardinality> low_cardinality_keys;
Sizes key_sizes; Sizes key_sizes;
size_t keys_size; size_t keys_size;
@ -653,13 +663,14 @@ struct HashMethodSerialized
using Self = HashMethodSerialized<Value, Mapped>; using Self = HashMethodSerialized<Value, Mapped>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>; using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;
static constexpr bool has_cheap_key_calculation = false;
ColumnRawPtrs key_columns; ColumnRawPtrs key_columns;
size_t keys_size; size_t keys_size;
HashMethodSerialized(const ColumnRawPtrs & key_columns_, const Sizes & /*key_sizes*/, const HashMethodContextPtr &) HashMethodSerialized(const ColumnRawPtrs & key_columns_, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
: key_columns(key_columns_), keys_size(key_columns_.size()) {} : key_columns(key_columns_), keys_size(key_columns_.size()) {}
protected:
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>; friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;
ALWAYS_INLINE SerializedKeyHolder getKeyHolder(size_t row, Arena & pool) const ALWAYS_INLINE SerializedKeyHolder getKeyHolder(size_t row, Arena & pool) const
@ -679,6 +690,8 @@ struct HashMethodHashed
using Self = HashMethodHashed<Value, Mapped, use_cache, need_offset>; using Self = HashMethodHashed<Value, Mapped, use_cache, need_offset>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache, need_offset>; using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache, need_offset>;
static constexpr bool has_cheap_key_calculation = false;
ColumnRawPtrs key_columns; ColumnRawPtrs key_columns;
HashMethodHashed(ColumnRawPtrs key_columns_, const Sizes &, const HashMethodContextPtr &) HashMethodHashed(ColumnRawPtrs key_columns_, const Sizes &, const HashMethodContextPtr &)

View File

@ -212,9 +212,9 @@ protected:
if constexpr (has_mapped) if constexpr (has_mapped)
cached = &it->getMapped(); cached = &it->getMapped();
if (inserted)
{
if constexpr (has_mapped) if constexpr (has_mapped)
{
if (inserted)
{ {
new (&it->getMapped()) Mapped(); new (&it->getMapped()) Mapped();
} }

View File

@ -109,7 +109,7 @@ public:
using Base::Base; using Base::Base;
template <typename Func> template <typename Func, bool>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func) void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{ {
for (auto it = this->begin(), end = this->end(); it != end; ++it) for (auto it = this->begin(), end = this->end(); it != end; ++it)

View File

@ -3,6 +3,7 @@
#include <Common/HashTable/Hash.h> #include <Common/HashTable/Hash.h>
#include <Common/HashTable/HashTable.h> #include <Common/HashTable/HashTable.h>
#include <Common/HashTable/HashTableAllocator.h> #include <Common/HashTable/HashTableAllocator.h>
#include <Common/HashTable/Prefetching.h>
/** NOTE HashMap could only be used for memmoveable (position independent) types. /** NOTE HashMap could only be used for memmoveable (position independent) types.
@ -189,8 +190,10 @@ public:
using Self = HashMapTable; using Self = HashMapTable;
using Base = HashTable<Key, Cell, Hash, Grower, Allocator>; using Base = HashTable<Key, Cell, Hash, Grower, Allocator>;
using LookupResult = typename Base::LookupResult; using LookupResult = typename Base::LookupResult;
using Iterator = typename Base::iterator;
using Base::Base; using Base::Base;
using Base::prefetch;
/// Merge every cell's value of current map into the destination map via emplace. /// Merge every cell's value of current map into the destination map via emplace.
/// Func should have signature void(Mapped & dst, Mapped & src, bool emplaced). /// Func should have signature void(Mapped & dst, Mapped & src, bool emplaced).
@ -198,11 +201,32 @@ public:
/// have a key equals to the given cell, a new cell gets emplaced into that map, /// have a key equals to the given cell, a new cell gets emplaced into that map,
/// and func is invoked with the third argument emplaced set to true. Otherwise /// and func is invoked with the third argument emplaced set to true. Otherwise
/// emplaced is set to false. /// emplaced is set to false.
template <typename Func> template <typename Func, bool prefetch = false>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func) void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{ {
for (auto it = this->begin(), end = this->end(); it != end; ++it) DB::PrefetchingHelper prefetching;
size_t prefetch_look_ahead = prefetching.getInitialLookAheadValue();
size_t i = 0;
auto prefetch_it = advanceIterator(this->begin(), prefetch_look_ahead);
for (auto it = this->begin(), end = this->end(); it != end; ++it, ++i)
{ {
if constexpr (prefetch)
{
if (i == prefetching.iterationsToMeasure())
{
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();
prefetch_it = advanceIterator(prefetch_it, prefetch_look_ahead - prefetching.getInitialLookAheadValue());
}
if (prefetch_it != end)
{
that.prefetchByHash(prefetch_it.getHash());
++prefetch_it;
}
}
typename Self::LookupResult res_it; typename Self::LookupResult res_it;
bool inserted; bool inserted;
that.emplace(Cell::getKey(it->getValue()), res_it, inserted, it.getHash()); that.emplace(Cell::getKey(it->getValue()), res_it, inserted, it.getHash());
@ -276,6 +300,18 @@ public:
return it->getMapped(); return it->getMapped();
throw DB::Exception("Cannot find element in HashMap::at method", DB::ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Cannot find element in HashMap::at method", DB::ErrorCodes::LOGICAL_ERROR);
} }
private:
Iterator advanceIterator(Iterator it, size_t n)
{
size_t i = 0;
while (i < n && it != this->end())
{
++i;
++it;
}
return it;
}
}; };
namespace std namespace std

View File

@ -911,10 +911,10 @@ protected:
bool ALWAYS_INLINE emplaceIfZero(const Key & x, LookupResult & it, bool & inserted, size_t hash_value) bool ALWAYS_INLINE emplaceIfZero(const Key & x, LookupResult & it, bool & inserted, size_t hash_value)
{ {
/// If it is claimed that the zero key can not be inserted into the table. /// If it is claimed that the zero key can not be inserted into the table.
if (!Cell::need_zero_value_storage) if constexpr (!Cell::need_zero_value_storage)
return false; return false;
if (Cell::isZero(x, *this)) if (unlikely(Cell::isZero(x, *this)))
{ {
it = this->zeroValue(); it = this->zeroValue();
@ -990,6 +990,11 @@ protected:
emplaceNonZeroImpl(place_value, key_holder, it, inserted, hash_value); emplaceNonZeroImpl(place_value, key_holder, it, inserted, hash_value);
} }
void ALWAYS_INLINE prefetchByHash(size_t hash_key) const
{
const auto place = grower.place(hash_key);
__builtin_prefetch(&buf[place]);
}
public: public:
void reserve(size_t num_elements) void reserve(size_t num_elements)
@ -1014,7 +1019,6 @@ public:
return res; return res;
} }
/// Reinsert node pointed to by iterator /// Reinsert node pointed to by iterator
void ALWAYS_INLINE reinsert(iterator & it, size_t hash_value) void ALWAYS_INLINE reinsert(iterator & it, size_t hash_value)
{ {
@ -1025,6 +1029,13 @@ public:
Cell::move(it.getPtr(), &buf[place_value]); Cell::move(it.getPtr(), &buf[place_value]);
} }
template <typename KeyHolder>
void ALWAYS_INLINE prefetch(KeyHolder && key_holder) const
{
const auto & key = keyHolderGetKey(key_holder);
const auto key_hash = hash(key);
prefetchByHash(key_hash);
}
/** Insert the key. /** Insert the key.
* Return values: * Return values:

View File

@ -0,0 +1,52 @@
#pragma once
#include <Common/Stopwatch.h>
#include <algorithm>
namespace DB
{
/**
* The purpose of this helper class is to provide a good value for prefetch look ahead (how distant row we should prefetch on the given iteration)
* based on the latency of a single iteration of the given cycle.
*
* Assumed usage pattern is the following:
*
* PrefetchingHelper prefetching; /// When object is created, it starts a watch to measure iteration latency.
* size_t prefetch_look_ahead = prefetching.getInitialLookAheadValue(); /// Initially it provides you with some reasonable default value.
*
* for (size_t i = 0; i < end; ++i)
* {
* if (i == prefetching.iterationsToMeasure()) /// When enough iterations passed, we are able to make a fairly accurate estimation of a single iteration latency.
* prefetch_look_ahead = prefetching.calcPrefetchLookAhead(); /// Based on this estimation we can choose a good value for prefetch_look_ahead.
*
* ... main loop body ...
* }
*
*/
class PrefetchingHelper
{
public:
size_t calcPrefetchLookAhead()
{
static constexpr auto assumed_load_latency_ns = 100;
static constexpr auto just_coefficient = 4;
const auto single_iteration_latency = std::max<double>(1.0L * watch.elapsedNanoseconds() / iterations_to_measure, 1);
return std::clamp<size_t>(
ceil(just_coefficient * assumed_load_latency_ns / single_iteration_latency), min_look_ahead_value, max_look_ahead_value);
}
static constexpr size_t getInitialLookAheadValue() { return min_look_ahead_value; }
static constexpr size_t iterationsToMeasure() { return iterations_to_measure; }
private:
static constexpr size_t iterations_to_measure = 100;
static constexpr size_t min_look_ahead_value = 4;
static constexpr size_t max_look_ahead_value = 32;
Stopwatch watch;
};
}

View File

@ -98,7 +98,7 @@ public:
/// have a key equals to the given cell, a new cell gets emplaced into that map, /// have a key equals to the given cell, a new cell gets emplaced into that map,
/// and func is invoked with the third argument emplaced set to true. Otherwise /// and func is invoked with the third argument emplaced set to true. Otherwise
/// emplaced is set to false. /// emplaced is set to false.
template <typename Func> template <typename Func, bool>
void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func) void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func)
{ {
if (this->m0.hasZero() && that.m0.hasZero()) if (this->m0.hasZero() && that.m0.hasZero())

View File

@ -17,9 +17,11 @@ class TwoLevelHashMapTable : public TwoLevelHashTable<Key, Cell, Hash, Grower, A
{ {
public: public:
using Impl = ImplTable<Key, Cell, Hash, Grower, Allocator>; using Impl = ImplTable<Key, Cell, Hash, Grower, Allocator>;
using Base = TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator, ImplTable<Key, Cell, Hash, Grower, Allocator>>;
using LookupResult = typename Impl::LookupResult; using LookupResult = typename Impl::LookupResult;
using TwoLevelHashTable<Key, Cell, Hash, Grower, Allocator, ImplTable<Key, Cell, Hash, Grower, Allocator>>::TwoLevelHashTable; using Base::Base;
using Base::prefetch;
template <typename Func> template <typename Func>
void ALWAYS_INLINE forEachMapped(Func && func) void ALWAYS_INLINE forEachMapped(Func && func)

View File

@ -227,6 +227,14 @@ public:
return res; return res;
} }
template <typename KeyHolder>
void ALWAYS_INLINE prefetch(KeyHolder && key_holder) const
{
const auto & key = keyHolderGetKey(key_holder);
const auto key_hash = hash(key);
const auto bucket = getBucketFromHash(key_hash);
impls[bucket].prefetchByHash(key_hash);
}
/** Insert the key, /** Insert the key,
* return an iterator to a position that can be used for `placement new` of value, * return an iterator to a position that can be used for `placement new` of value,

View File

@ -537,6 +537,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_size_to_preallocate_for_aggregation, 10'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \ M(UInt64, max_size_to_preallocate_for_aggregation, 10'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \
\ \
M(Bool, kafka_disable_num_consumers_limit, false, "Disable limit on kafka_num_consumers that depends on the number of available CPU cores", 0) \ M(Bool, kafka_disable_num_consumers_limit, false, "Disable limit on kafka_num_consumers that depends on the number of available CPU cores", 0) \
M(Bool, enable_software_prefetch_in_aggregation, true, "Enable use of software prefetch in aggregation", 0) \
/** Experimental feature for moving data between shards. */ \ /** Experimental feature for moving data between shards. */ \
\ \
M(Bool, allow_experimental_query_deduplication, false, "Experimental data deduplication for SELECT queries based on part UUIDs", 0) \ M(Bool, allow_experimental_query_deduplication, false, "Experimental data deduplication for SELECT queries based on part UUIDs", 0) \

View File

@ -3,6 +3,10 @@
#include <numeric> #include <numeric>
#include <Poco/Util/Application.h> #include <Poco/Util/Application.h>
#ifdef OS_LINUX
# include <unistd.h>
#endif
#include <base/sort.h> #include <base/sort.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
@ -282,6 +286,26 @@ DB::ColumnNumbers calculateKeysPositions(const DB::Block & header, const DB::Agg
keys_positions[i] = header.getPositionByName(params.keys[i]); keys_positions[i] = header.getPositionByName(params.keys[i]);
return keys_positions; return keys_positions;
} }
template <typename HashTable, typename KeyHolder>
concept HasPrefetchMemberFunc = requires
{
{std::declval<HashTable>().prefetch(std::declval<KeyHolder>())};
};
size_t getMinBytesForPrefetch()
{
size_t l2_size = 0;
#if defined(OS_LINUX) && defined(_SC_LEVEL2_CACHE_SIZE)
if (auto ret = sysconf(_SC_LEVEL2_CACHE_SIZE); ret != -1)
l2_size = ret;
#endif
/// 256KB looks like a reasonable default L2 size. 4 is empirical constant.
return 4 * std::max<size_t>(l2_size, 256 * 1024);
}
} }
namespace DB namespace DB
@ -325,7 +349,7 @@ void AggregatedDataVariants::convertToTwoLevel()
switch (type) switch (type)
{ {
#define M(NAME) \ #define M(NAME) \
case Type::NAME: \ case Type::NAME: \
NAME ## _two_level = std::make_unique<decltype(NAME ## _two_level)::element_type>(*(NAME)); \ NAME ## _two_level = std::make_unique<decltype(NAME ## _two_level)::element_type>(*(NAME)); \
(NAME).reset(); \ (NAME).reset(); \
@ -539,7 +563,10 @@ public:
#endif #endif
Aggregator::Aggregator(const Block & header_, const Params & params_) Aggregator::Aggregator(const Block & header_, const Params & params_)
: header(header_), keys_positions(calculateKeysPositions(header, params_)), params(params_) : header(header_)
, keys_positions(calculateKeysPositions(header, params_))
, params(params_)
, min_bytes_for_prefetch(getMinBytesForPrefetch())
{ {
/// Use query-level memory tracker /// Use query-level memory tracker
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker()) if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
@ -970,24 +997,38 @@ void NO_INLINE Aggregator::executeImpl(
if (!no_more_keys) if (!no_more_keys)
{ {
/// Prefetching doesn't make sense for small hash tables, because they fit in caches entirely.
const bool prefetch = Method::State::has_cheap_key_calculation && params.enable_prefetch
&& (method.data.getBufferSizeInBytes() > min_bytes_for_prefetch);
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions)) if (compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions))
{ {
executeImplBatch<false, true>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); if (prefetch)
executeImplBatch<false, true, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
else
executeImplBatch<false, true, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
} }
else else
#endif #endif
{ {
executeImplBatch<false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); if (prefetch)
executeImplBatch<false, false, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
else
executeImplBatch<false, false, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
} }
} }
else else
{ {
executeImplBatch<true, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); executeImplBatch<true, false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
} }
} }
template <bool no_more_keys, bool use_compiled_functions, typename Method> template <bool no_more_keys, bool use_compiled_functions, bool prefetch, typename Method>
void NO_INLINE Aggregator::executeImplBatch( void NO_INLINE Aggregator::executeImplBatch(
Method & method, Method & method,
typename Method::State & state, typename Method::State & state,
@ -997,6 +1038,12 @@ void NO_INLINE Aggregator::executeImplBatch(
AggregateFunctionInstruction * aggregate_instructions, AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row) const AggregateDataPtr overflow_row) const
{ {
using KeyHolder = decltype(state.getKeyHolder(0, std::declval<Arena &>()));
/// During processing of row #i we will prefetch HashTable cell for row #(i + prefetch_look_ahead).
PrefetchingHelper prefetching;
size_t prefetch_look_ahead = prefetching.getInitialLookAheadValue();
/// Optimization for special case when there are no aggregate functions. /// Optimization for special case when there are no aggregate functions.
if (params.aggregates_size == 0) if (params.aggregates_size == 0)
{ {
@ -1006,7 +1053,21 @@ void NO_INLINE Aggregator::executeImplBatch(
/// For all rows. /// For all rows.
AggregateDataPtr place = aggregates_pool->alloc(0); AggregateDataPtr place = aggregates_pool->alloc(0);
for (size_t i = row_begin; i < row_end; ++i) for (size_t i = row_begin; i < row_end; ++i)
{
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
{
if (i == row_begin + prefetching.iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();
if (i + prefetch_look_ahead < row_end)
{
auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool);
method.data.prefetch(std::move(key_holder));
}
}
state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place); state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place);
}
return; return;
} }
@ -1059,6 +1120,18 @@ void NO_INLINE Aggregator::executeImplBatch(
if constexpr (!no_more_keys) if constexpr (!no_more_keys)
{ {
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
{
if (i == row_begin + prefetching.iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();
if (i + prefetch_look_ahead < row_end)
{
auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool);
method.data.prefetch(std::move(key_holder));
}
}
auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool); auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
@ -1131,7 +1204,7 @@ void NO_INLINE Aggregator::executeImplBatch(
continue; continue;
AggregateFunctionInstruction * inst = aggregate_instructions + i; AggregateFunctionInstruction * inst = aggregate_instructions + i;
size_t arguments_size = inst->that->getArgumentTypes().size(); size_t arguments_size = inst->that->getArgumentTypes().size(); // NOLINT
for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index) for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index)
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index])); columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
@ -2338,16 +2411,13 @@ void NO_INLINE Aggregator::mergeDataNullKey(
} }
template <typename Method, bool use_compiled_functions, typename Table> template <typename Method, bool use_compiled_functions, bool prefetch, typename Table>
void NO_INLINE Aggregator::mergeDataImpl( void NO_INLINE Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena) const
Table & table_dst,
Table & table_src,
Arena * arena) const
{ {
if constexpr (Method::low_cardinality_optimization) if constexpr (Method::low_cardinality_optimization)
mergeDataNullKey<Method, Table>(table_dst, table_src, arena); mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
table_src.mergeToViaEmplace(table_dst, [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted) auto merge = [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted)
{ {
if (!inserted) if (!inserted)
{ {
@ -2362,7 +2432,8 @@ void NO_INLINE Aggregator::mergeDataImpl(
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
{ {
if (!is_aggregate_function_compiled[i]) if (!is_aggregate_function_compiled[i])
aggregate_functions[i]->merge(dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena); aggregate_functions[i]->merge(
dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena);
} }
for (size_t i = 0; i < params.aggregates_size; ++i) for (size_t i = 0; i < params.aggregates_size; ++i)
@ -2388,7 +2459,9 @@ void NO_INLINE Aggregator::mergeDataImpl(
} }
src = nullptr; src = nullptr;
}); };
table_src.template mergeToViaEmplace<decltype(merge), prefetch>(table_dst, std::move(merge));
table_src.clearAndShrink(); table_src.clearAndShrink();
} }
@ -2483,6 +2556,9 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
AggregatedDataVariantsPtr & res = non_empty_data[0]; AggregatedDataVariantsPtr & res = non_empty_data[0];
bool no_more_keys = false; bool no_more_keys = false;
const bool prefetch = Method::State::has_cheap_key_calculation && params.enable_prefetch
&& (getDataVariant<Method>(*res).data.getBufferSizeInBytes() > min_bytes_for_prefetch);
/// We merge all aggregation results to the first. /// We merge all aggregation results to the first.
for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num) for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num)
{ {
@ -2496,18 +2572,22 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder) if (compiled_aggregate_functions_holder)
{ {
mergeDataImpl<Method, true>( if (prefetch)
getDataVariant<Method>(*res).data, mergeDataImpl<Method, true, true>(
getDataVariant<Method>(current).data, getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
res->aggregates_pool); else
mergeDataImpl<Method, true, false>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
} }
else else
#endif #endif
{ {
mergeDataImpl<Method, false>( if (prefetch)
getDataVariant<Method>(*res).data, mergeDataImpl<Method, false, true>(
getDataVariant<Method>(current).data, getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
res->aggregates_pool); else
mergeDataImpl<Method, false, false>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
} }
} }
else if (res->without_key) else if (res->without_key)
@ -2543,6 +2623,10 @@ void NO_INLINE Aggregator::mergeBucketImpl(
{ {
/// We merge all aggregation results to the first. /// We merge all aggregation results to the first.
AggregatedDataVariantsPtr & res = data[0]; AggregatedDataVariantsPtr & res = data[0];
const bool prefetch = Method::State::has_cheap_key_calculation && params.enable_prefetch
&& (Method::Data::NUM_BUCKETS * getDataVariant<Method>(*res).data.impls[bucket].getBufferSizeInBytes() > min_bytes_for_prefetch);
for (size_t result_num = 1, size = data.size(); result_num < size; ++result_num) for (size_t result_num = 1, size = data.size(); result_num < size; ++result_num)
{ {
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst))
@ -2552,18 +2636,22 @@ void NO_INLINE Aggregator::mergeBucketImpl(
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder) if (compiled_aggregate_functions_holder)
{ {
mergeDataImpl<Method, true>( if (prefetch)
getDataVariant<Method>(*res).data.impls[bucket], mergeDataImpl<Method, true, true>(
getDataVariant<Method>(current).data.impls[bucket], getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
arena); else
mergeDataImpl<Method, true, false>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
} }
else else
#endif #endif
{ {
mergeDataImpl<Method, false>( if (prefetch)
getDataVariant<Method>(*res).data.impls[bucket], mergeDataImpl<Method, false, true>(
getDataVariant<Method>(current).data.impls[bucket], getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
arena); else
mergeDataImpl<Method, false, false>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
} }
} }
} }

View File

@ -939,6 +939,8 @@ public:
bool only_merge; bool only_merge;
bool enable_prefetch;
struct StatsCollectingParams struct StatsCollectingParams
{ {
StatsCollectingParams(); StatsCollectingParams();
@ -974,7 +976,8 @@ public:
bool compile_aggregate_expressions_, bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_, size_t min_count_to_compile_aggregate_expression_,
size_t max_block_size_, size_t max_block_size_,
bool only_merge_ = false, // true for projections bool enable_prefetch_,
bool only_merge_, // true for projections
const StatsCollectingParams & stats_collecting_params_ = {}) const StatsCollectingParams & stats_collecting_params_ = {})
: keys(keys_) : keys(keys_)
, aggregates(aggregates_) , aggregates(aggregates_)
@ -994,6 +997,7 @@ public:
, min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_) , min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, only_merge(only_merge_) , only_merge(only_merge_)
, enable_prefetch(enable_prefetch_)
, stats_collecting_params(stats_collecting_params_) , stats_collecting_params(stats_collecting_params_)
{ {
} }
@ -1001,7 +1005,7 @@ public:
/// Only parameters that matter during merge. /// Only parameters that matter during merge.
Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_, size_t max_block_size_) Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_, size_t max_block_size_)
: Params( : Params(
keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, max_block_size_, true, {}) keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, max_block_size_, false, true, {})
{ {
} }
@ -1146,6 +1150,8 @@ private:
/// For external aggregation. /// For external aggregation.
mutable TemporaryFiles temporary_files; mutable TemporaryFiles temporary_files;
size_t min_bytes_for_prefetch = 0;
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER
std::shared_ptr<CompiledAggregateFunctionsHolder> compiled_aggregate_functions_holder; std::shared_ptr<CompiledAggregateFunctionsHolder> compiled_aggregate_functions_holder;
#endif #endif
@ -1211,7 +1217,7 @@ private:
AggregateDataPtr overflow_row) const; AggregateDataPtr overflow_row) const;
/// Specialization for a particular value no_more_keys. /// Specialization for a particular value no_more_keys.
template <bool no_more_keys, bool use_compiled_functions, typename Method> template <bool no_more_keys, bool use_compiled_functions, bool prefetch, typename Method>
void executeImplBatch( void executeImplBatch(
Method & method, Method & method,
typename Method::State & state, typename Method::State & state,
@ -1255,11 +1261,8 @@ private:
Arena * arena) const; Arena * arena) const;
/// Merge data from hash table `src` into `dst`. /// Merge data from hash table `src` into `dst`.
template <typename Method, bool use_compiled_functions, typename Table> template <typename Method, bool use_compiled_functions, bool prefetch, typename Table>
void mergeDataImpl( void mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena) const;
Table & table_dst,
Table & table_src,
Arena * arena) const;
/// Merge data from hash table `src` into `dst`, but only for keys that already exist in dst. In other cases, merge the data into `overflows`. /// Merge data from hash table `src` into `dst`, but only for keys that already exist in dst. In other cases, merge the data into `overflows`.
template <typename Method, typename Table> template <typename Method, typename Table>

View File

@ -2360,6 +2360,7 @@ static Aggregator::Params getAggregatorParams(
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression, settings.min_count_to_compile_aggregate_expression,
settings.max_block_size, settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
/* only_merge */ false, /* only_merge */ false,
stats_collecting_params stats_collecting_params
}; };

View File

@ -183,6 +183,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.compile_aggregate_expressions, transform_params->params.compile_aggregate_expressions,
transform_params->params.min_count_to_compile_aggregate_expression, transform_params->params.min_count_to_compile_aggregate_expression,
transform_params->params.max_block_size, transform_params->params.max_block_size,
transform_params->params.enable_prefetch,
/* only_merge */ false, /* only_merge */ false,
transform_params->params.stats_collecting_params}; transform_params->params.stats_collecting_params};
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final); auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final);

View File

@ -39,7 +39,9 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression, settings.min_count_to_compile_aggregate_expression,
settings.max_block_size); settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
false /* only_merge */);
aggregator = std::make_unique<Aggregator>(header, params); aggregator = std::make_unique<Aggregator>(header, params);

View File

@ -316,6 +316,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression, settings.min_count_to_compile_aggregate_expression,
settings.max_block_size, settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
only_merge); only_merge);
return std::make_pair(params, only_merge); return std::make_pair(params, only_merge);

View File

@ -1,7 +1,38 @@
<test> <test>
<settings>
<allow_suspicious_low_cardinality_types>1</allow_suspicious_low_cardinality_types>
</settings>
<query>WITH toUInt8(number) AS k, toUInt64(k) AS k1, k AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2</query> <query>WITH toUInt8(number) AS k, toUInt64(k) AS k1, k AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2</query>
<query>WITH toUInt8(number) AS k, toUInt16(k) AS k1, toUInt32(k) AS k2, k AS k3 SELECT k1, k2, k3, count() FROM numbers(100000000) GROUP BY k1, k2, k3</query> <query>WITH toUInt8(number) AS k, toUInt16(k) AS k1, toUInt32(k) AS k2, k AS k3 SELECT k1, k2, k3, count() FROM numbers(100000000) GROUP BY k1, k2, k3</query>
<query>WITH toUInt8(number) AS k, k AS k1, k + 1 AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2</query> <query>WITH toUInt8(number) AS k, k AS k1, k + 1 AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2</query>
<query>WITH toUInt8(number) AS k, k AS k1, k + 1 AS k2, k + 2 AS k3, k + 3 AS k4 SELECT k1, k2, k3, k4, count() FROM numbers(100000000) GROUP BY k1, k2, k3, k4</query> <query>WITH toUInt8(number) AS k, k AS k1, k + 1 AS k2, k + 2 AS k3, k + 3 AS k4 SELECT k1, k2, k3, k4, count() FROM numbers(100000000) GROUP BY k1, k2, k3, k4</query>
<query>WITH toUInt8(number) AS k, toUInt64(k) AS k1, k1 + 1 AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2</query> <query>WITH toUInt8(number) AS k, toUInt64(k) AS k1, k1 + 1 AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2</query>
<create_query>create table group_by_fk(a UInt32, b UInt32, c LowCardinality(UInt32), d Nullable(UInt32), e UInt64, f UInt64, g UInt64, h LowCardinality(UInt64), i Nullable(UInt64)) engine=MergeTree order by tuple()</create_query>
<fill_query>insert into group_by_fk select number, number, number % 10000, number % 2 == 0 ? number : Null, number, number, number, number % 10000, number % 2 == 0 ? number : Null from numbers_mt(3e7)</fill_query>
<!-- keys64_two_level -->
<query>select a, b from group_by_fk group by a, b format Null</query>
<!-- low_cardinality_keys128_two_level -->
<query>select a, c from group_by_fk group by a, c format Null</query>
<!-- nullable_keys128_two_level -->
<query>select a, d from group_by_fk group by a, d format Null</query>
<!-- keys128_two_level -->
<query>select e, f from group_by_fk group by e, f format Null</query>
<!-- low_cardinality_keys128_two_level -->
<query>select e, h from group_by_fk group by e, h format Null</query>
<!-- nullable_keys256_two_level -->
<query>select e, i from group_by_fk group by e, i format Null</query>
<!-- keys256_two_level -->
<query>select e, f, g from group_by_fk group by e, f, g format Null</query>
<!-- low_cardinality_keys256_two_level -->
<query>select e, f, h from group_by_fk group by e, f, h format Null</query>
<!-- nullable_keys256_two_level -->
<query>select e, f, i from group_by_fk group by e, f, i format Null</query>
<drop_query>drop table group_by_fk</drop_query>
</test> </test>

View File

@ -1,4 +1,5 @@
<test> <test>
<!-- -->
<settings> <settings>
<max_size_to_preallocate_for_aggregation>1000000000</max_size_to_preallocate_for_aggregation> <max_size_to_preallocate_for_aggregation>1000000000</max_size_to_preallocate_for_aggregation>
</settings> </settings>

View File

@ -0,0 +1,32 @@
<test>
<substitutions>
<substitution>
<name>size</name>
<values>
<value>10000000</value>
<value>50000000</value>
</values>
</substitution>
<substitution>
<name>table</name>
<values>
<value>numbers</value>
<value>numbers_mt</value>
</values>
</substitution>
</substitutions>
<create_query>create table t_str_key_{size}(a String, b FixedString(25)) engine=MergeTree order by tuple()</create_query>
<fill_query>insert into t_str_key_{size} select toString(number) as s, toFixedString(s, 25) from numbers_mt({size})</fill_query>
<query>select a from t_str_key_{size} group by a format Null</query>
<query>select b from t_str_key_{size} group by b format Null</query>
<query>select number from {table}({size}) group by number format Null</query>
<query>select count() from {table}({size}) group by number format Null</query>
<query>select number from numbers_mt(1e8) group by number format Null</query>
<query>select count() from numbers_mt(1e8) group by number format Null</query>
<drop_query>drop table t_str_key_{size}</drop_query>
</test>

View File

@ -1,4 +1,5 @@
<test> <test>
<!-- -->
<create_query>DROP TABLE IF EXISTS perf_lc_fixed_str_groupby</create_query> <create_query>DROP TABLE IF EXISTS perf_lc_fixed_str_groupby</create_query>
<create_query>CREATE TABLE perf_lc_fixed_str_groupby( <create_query>CREATE TABLE perf_lc_fixed_str_groupby(
a LowCardinality(FixedString(14)), a LowCardinality(FixedString(14)),

View File

@ -1,4 +1,5 @@
<test> <test>
<!-- -->
<settings> <settings>
<max_memory_usage>30000000000</max_memory_usage> <max_memory_usage>30000000000</max_memory_usage>
</settings> </settings>

View File

@ -1,4 +1,5 @@
<test> <test>
<!-- -->
<settings> <settings>
<max_memory_usage>30000000000</max_memory_usage> <max_memory_usage>30000000000</max_memory_usage>
</settings> </settings>