diff --git a/src/Common/ColumnsHashing.h b/src/Common/ColumnsHashing.h index 54446950567..dbdbd588bc5 100644 --- a/src/Common/ColumnsHashing.h +++ b/src/Common/ColumnsHashing.h @@ -36,6 +36,8 @@ struct HashMethodOneNumber using Self = HashMethodOneNumber; using Base = columns_hashing_impl::HashMethodBase; + static constexpr bool has_cheap_key_calculation = true; + const char * vec; /// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise. @@ -78,6 +80,8 @@ struct HashMethodString using Self = HashMethodString; using Base = columns_hashing_impl::HashMethodBase; + static constexpr bool has_cheap_key_calculation = false; + const IColumn::Offset * offsets; const UInt8 * chars; @@ -117,6 +121,8 @@ struct HashMethodFixedString using Self = HashMethodFixedString; using Base = columns_hashing_impl::HashMethodBase; + static constexpr bool has_cheap_key_calculation = false; + size_t n; const ColumnFixedString::Chars * chars; @@ -210,6 +216,8 @@ struct HashMethodSingleLowCardinalityColumn : public SingleColumnMethod using EmplaceResult = columns_hashing_impl::EmplaceResultImpl; using FindResult = columns_hashing_impl::FindResultImpl; + static constexpr bool has_cheap_key_calculation = Base::has_cheap_key_calculation; + static HashMethodContextPtr createContext(const HashMethodContext::Settings & settings) { return std::make_shared(settings); @@ -479,6 +487,8 @@ struct HashMethodKeysFixed static constexpr bool has_nullable_keys = has_nullable_keys_; static constexpr bool has_low_cardinality = has_low_cardinality_; + static constexpr bool has_cheap_key_calculation = true; + LowCardinalityKeys low_cardinality_keys; Sizes key_sizes; size_t keys_size; @@ -653,13 +663,14 @@ struct HashMethodSerialized using Self = HashMethodSerialized; using Base = columns_hashing_impl::HashMethodBase; + static constexpr bool has_cheap_key_calculation = false; + ColumnRawPtrs key_columns; size_t keys_size; HashMethodSerialized(const ColumnRawPtrs & key_columns_, const Sizes & /*key_sizes*/, const HashMethodContextPtr &) : key_columns(key_columns_), keys_size(key_columns_.size()) {} -protected: friend class columns_hashing_impl::HashMethodBase; ALWAYS_INLINE SerializedKeyHolder getKeyHolder(size_t row, Arena & pool) const @@ -679,6 +690,8 @@ struct HashMethodHashed using Self = HashMethodHashed; using Base = columns_hashing_impl::HashMethodBase; + static constexpr bool has_cheap_key_calculation = false; + ColumnRawPtrs key_columns; HashMethodHashed(ColumnRawPtrs key_columns_, const Sizes &, const HashMethodContextPtr &) diff --git a/src/Common/ColumnsHashingImpl.h b/src/Common/ColumnsHashingImpl.h index 7b0650487f5..3f2c214ef47 100644 --- a/src/Common/ColumnsHashingImpl.h +++ b/src/Common/ColumnsHashingImpl.h @@ -212,9 +212,9 @@ protected: if constexpr (has_mapped) cached = &it->getMapped(); - if (inserted) + if constexpr (has_mapped) { - if constexpr (has_mapped) + if (inserted) { new (&it->getMapped()) Mapped(); } diff --git a/src/Common/HashTable/FixedHashMap.h b/src/Common/HashTable/FixedHashMap.h index 64d932f7c2f..40b76d9b4eb 100644 --- a/src/Common/HashTable/FixedHashMap.h +++ b/src/Common/HashTable/FixedHashMap.h @@ -109,7 +109,7 @@ public: using Base::Base; - template + template void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func) { for (auto it = this->begin(), end = this->end(); it != end; ++it) diff --git a/src/Common/HashTable/HashMap.h b/src/Common/HashTable/HashMap.h index 32fc9a8d76c..dc3e9a392bc 100644 --- a/src/Common/HashTable/HashMap.h +++ b/src/Common/HashTable/HashMap.h @@ -3,6 +3,7 @@ #include #include #include +#include /** NOTE HashMap could only be used for memmoveable (position independent) types. @@ -189,8 +190,10 @@ public: using Self = HashMapTable; using Base = HashTable; using LookupResult = typename Base::LookupResult; + using Iterator = typename Base::iterator; using Base::Base; + using Base::prefetch; /// Merge every cell's value of current map into the destination map via emplace. /// Func should have signature void(Mapped & dst, Mapped & src, bool emplaced). @@ -198,11 +201,32 @@ public: /// have a key equals to the given cell, a new cell gets emplaced into that map, /// and func is invoked with the third argument emplaced set to true. Otherwise /// emplaced is set to false. - template + template void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func) { - for (auto it = this->begin(), end = this->end(); it != end; ++it) + DB::PrefetchingHelper prefetching; + size_t prefetch_look_ahead = prefetching.getInitialLookAheadValue(); + + size_t i = 0; + auto prefetch_it = advanceIterator(this->begin(), prefetch_look_ahead); + + for (auto it = this->begin(), end = this->end(); it != end; ++it, ++i) { + if constexpr (prefetch) + { + if (i == prefetching.iterationsToMeasure()) + { + prefetch_look_ahead = prefetching.calcPrefetchLookAhead(); + prefetch_it = advanceIterator(prefetch_it, prefetch_look_ahead - prefetching.getInitialLookAheadValue()); + } + + if (prefetch_it != end) + { + that.prefetchByHash(prefetch_it.getHash()); + ++prefetch_it; + } + } + typename Self::LookupResult res_it; bool inserted; that.emplace(Cell::getKey(it->getValue()), res_it, inserted, it.getHash()); @@ -276,6 +300,18 @@ public: return it->getMapped(); throw DB::Exception("Cannot find element in HashMap::at method", DB::ErrorCodes::LOGICAL_ERROR); } + +private: + Iterator advanceIterator(Iterator it, size_t n) + { + size_t i = 0; + while (i < n && it != this->end()) + { + ++i; + ++it; + } + return it; + } }; namespace std diff --git a/src/Common/HashTable/HashTable.h b/src/Common/HashTable/HashTable.h index e8a204c4043..6fa002139df 100644 --- a/src/Common/HashTable/HashTable.h +++ b/src/Common/HashTable/HashTable.h @@ -911,10 +911,10 @@ protected: bool ALWAYS_INLINE emplaceIfZero(const Key & x, LookupResult & it, bool & inserted, size_t hash_value) { /// If it is claimed that the zero key can not be inserted into the table. - if (!Cell::need_zero_value_storage) + if constexpr (!Cell::need_zero_value_storage) return false; - if (Cell::isZero(x, *this)) + if (unlikely(Cell::isZero(x, *this))) { it = this->zeroValue(); @@ -990,6 +990,11 @@ protected: emplaceNonZeroImpl(place_value, key_holder, it, inserted, hash_value); } + void ALWAYS_INLINE prefetchByHash(size_t hash_key) const + { + const auto place = grower.place(hash_key); + __builtin_prefetch(&buf[place]); + } public: void reserve(size_t num_elements) @@ -1014,7 +1019,6 @@ public: return res; } - /// Reinsert node pointed to by iterator void ALWAYS_INLINE reinsert(iterator & it, size_t hash_value) { @@ -1025,6 +1029,13 @@ public: Cell::move(it.getPtr(), &buf[place_value]); } + template + void ALWAYS_INLINE prefetch(KeyHolder && key_holder) const + { + const auto & key = keyHolderGetKey(key_holder); + const auto key_hash = hash(key); + prefetchByHash(key_hash); + } /** Insert the key. * Return values: diff --git a/src/Common/HashTable/Prefetching.h b/src/Common/HashTable/Prefetching.h new file mode 100644 index 00000000000..89081d78186 --- /dev/null +++ b/src/Common/HashTable/Prefetching.h @@ -0,0 +1,52 @@ +#pragma once + +#include + +#include + +namespace DB +{ + +/** + * The purpose of this helper class is to provide a good value for prefetch look ahead (how distant row we should prefetch on the given iteration) + * based on the latency of a single iteration of the given cycle. + * + * Assumed usage pattern is the following: + * + * PrefetchingHelper prefetching; /// When object is created, it starts a watch to measure iteration latency. + * size_t prefetch_look_ahead = prefetching.getInitialLookAheadValue(); /// Initially it provides you with some reasonable default value. + * + * for (size_t i = 0; i < end; ++i) + * { + * if (i == prefetching.iterationsToMeasure()) /// When enough iterations passed, we are able to make a fairly accurate estimation of a single iteration latency. + * prefetch_look_ahead = prefetching.calcPrefetchLookAhead(); /// Based on this estimation we can choose a good value for prefetch_look_ahead. + * + * ... main loop body ... + * } + * + */ +class PrefetchingHelper +{ +public: + size_t calcPrefetchLookAhead() + { + static constexpr auto assumed_load_latency_ns = 100; + static constexpr auto just_coefficient = 4; + const auto single_iteration_latency = std::max(1.0L * watch.elapsedNanoseconds() / iterations_to_measure, 1); + return std::clamp( + ceil(just_coefficient * assumed_load_latency_ns / single_iteration_latency), min_look_ahead_value, max_look_ahead_value); + } + + static constexpr size_t getInitialLookAheadValue() { return min_look_ahead_value; } + + static constexpr size_t iterationsToMeasure() { return iterations_to_measure; } + +private: + static constexpr size_t iterations_to_measure = 100; + static constexpr size_t min_look_ahead_value = 4; + static constexpr size_t max_look_ahead_value = 32; + + Stopwatch watch; +}; + +} diff --git a/src/Common/HashTable/StringHashMap.h b/src/Common/HashTable/StringHashMap.h index ada10180786..828a54d357a 100644 --- a/src/Common/HashTable/StringHashMap.h +++ b/src/Common/HashTable/StringHashMap.h @@ -98,7 +98,7 @@ public: /// have a key equals to the given cell, a new cell gets emplaced into that map, /// and func is invoked with the third argument emplaced set to true. Otherwise /// emplaced is set to false. - template + template void ALWAYS_INLINE mergeToViaEmplace(Self & that, Func && func) { if (this->m0.hasZero() && that.m0.hasZero()) diff --git a/src/Common/HashTable/TwoLevelHashMap.h b/src/Common/HashTable/TwoLevelHashMap.h index 7bebf0d8af5..df3b5255545 100644 --- a/src/Common/HashTable/TwoLevelHashMap.h +++ b/src/Common/HashTable/TwoLevelHashMap.h @@ -17,9 +17,11 @@ class TwoLevelHashMapTable : public TwoLevelHashTable; + using Base = TwoLevelHashTable>; using LookupResult = typename Impl::LookupResult; - using TwoLevelHashTable>::TwoLevelHashTable; + using Base::Base; + using Base::prefetch; template void ALWAYS_INLINE forEachMapped(Func && func) diff --git a/src/Common/HashTable/TwoLevelHashTable.h b/src/Common/HashTable/TwoLevelHashTable.h index 27cc075acd7..b8d5eedd430 100644 --- a/src/Common/HashTable/TwoLevelHashTable.h +++ b/src/Common/HashTable/TwoLevelHashTable.h @@ -227,6 +227,14 @@ public: return res; } + template + void ALWAYS_INLINE prefetch(KeyHolder && key_holder) const + { + const auto & key = keyHolderGetKey(key_holder); + const auto key_hash = hash(key); + const auto bucket = getBucketFromHash(key_hash); + impls[bucket].prefetchByHash(key_hash); + } /** Insert the key, * return an iterator to a position that can be used for `placement new` of value, diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2ee655396d4..2db081194f1 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -537,6 +537,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, max_size_to_preallocate_for_aggregation, 10'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \ \ M(Bool, kafka_disable_num_consumers_limit, false, "Disable limit on kafka_num_consumers that depends on the number of available CPU cores", 0) \ + M(Bool, enable_software_prefetch_in_aggregation, true, "Enable use of software prefetch in aggregation", 0) \ /** Experimental feature for moving data between shards. */ \ \ M(Bool, allow_experimental_query_deduplication, false, "Experimental data deduplication for SELECT queries based on part UUIDs", 0) \ diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 2080f36a531..9bb61d5b381 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -3,6 +3,10 @@ #include #include +#ifdef OS_LINUX +# include +#endif + #include #include #include @@ -282,6 +286,26 @@ DB::ColumnNumbers calculateKeysPositions(const DB::Block & header, const DB::Agg keys_positions[i] = header.getPositionByName(params.keys[i]); return keys_positions; } + +template +concept HasPrefetchMemberFunc = requires +{ + {std::declval().prefetch(std::declval())}; +}; + +size_t getMinBytesForPrefetch() +{ + size_t l2_size = 0; + +#if defined(OS_LINUX) && defined(_SC_LEVEL2_CACHE_SIZE) + if (auto ret = sysconf(_SC_LEVEL2_CACHE_SIZE); ret != -1) + l2_size = ret; +#endif + + /// 256KB looks like a reasonable default L2 size. 4 is empirical constant. + return 4 * std::max(l2_size, 256 * 1024); +} + } namespace DB @@ -325,7 +349,7 @@ void AggregatedDataVariants::convertToTwoLevel() switch (type) { - #define M(NAME) \ +#define M(NAME) \ case Type::NAME: \ NAME ## _two_level = std::make_unique(*(NAME)); \ (NAME).reset(); \ @@ -539,7 +563,10 @@ public: #endif Aggregator::Aggregator(const Block & header_, const Params & params_) - : header(header_), keys_positions(calculateKeysPositions(header, params_)), params(params_) + : header(header_) + , keys_positions(calculateKeysPositions(header, params_)) + , params(params_) + , min_bytes_for_prefetch(getMinBytesForPrefetch()) { /// Use query-level memory tracker if (auto * memory_tracker_child = CurrentThread::getMemoryTracker()) @@ -970,24 +997,38 @@ void NO_INLINE Aggregator::executeImpl( if (!no_more_keys) { + /// Prefetching doesn't make sense for small hash tables, because they fit in caches entirely. + const bool prefetch = Method::State::has_cheap_key_calculation && params.enable_prefetch + && (method.data.getBufferSizeInBytes() > min_bytes_for_prefetch); + #if USE_EMBEDDED_COMPILER if (compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions)) { - executeImplBatch(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); + if (prefetch) + executeImplBatch( + method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); + else + executeImplBatch( + method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); } else #endif { - executeImplBatch(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); + if (prefetch) + executeImplBatch( + method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); + else + executeImplBatch( + method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); } } else { - executeImplBatch(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); + executeImplBatch(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row); } } -template +template void NO_INLINE Aggregator::executeImplBatch( Method & method, typename Method::State & state, @@ -997,6 +1038,12 @@ void NO_INLINE Aggregator::executeImplBatch( AggregateFunctionInstruction * aggregate_instructions, AggregateDataPtr overflow_row) const { + using KeyHolder = decltype(state.getKeyHolder(0, std::declval())); + + /// During processing of row #i we will prefetch HashTable cell for row #(i + prefetch_look_ahead). + PrefetchingHelper prefetching; + size_t prefetch_look_ahead = prefetching.getInitialLookAheadValue(); + /// Optimization for special case when there are no aggregate functions. if (params.aggregates_size == 0) { @@ -1006,7 +1053,21 @@ void NO_INLINE Aggregator::executeImplBatch( /// For all rows. AggregateDataPtr place = aggregates_pool->alloc(0); for (size_t i = row_begin; i < row_end; ++i) + { + if constexpr (prefetch && HasPrefetchMemberFunc) + { + if (i == row_begin + prefetching.iterationsToMeasure()) + prefetch_look_ahead = prefetching.calcPrefetchLookAhead(); + + if (i + prefetch_look_ahead < row_end) + { + auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool); + method.data.prefetch(std::move(key_holder)); + } + } + state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place); + } return; } @@ -1059,6 +1120,18 @@ void NO_INLINE Aggregator::executeImplBatch( if constexpr (!no_more_keys) { + if constexpr (prefetch && HasPrefetchMemberFunc) + { + if (i == row_begin + prefetching.iterationsToMeasure()) + prefetch_look_ahead = prefetching.calcPrefetchLookAhead(); + + if (i + prefetch_look_ahead < row_end) + { + auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool); + method.data.prefetch(std::move(key_holder)); + } + } + auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool); /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. @@ -1131,7 +1204,7 @@ void NO_INLINE Aggregator::executeImplBatch( continue; AggregateFunctionInstruction * inst = aggregate_instructions + i; - size_t arguments_size = inst->that->getArgumentTypes().size(); + size_t arguments_size = inst->that->getArgumentTypes().size(); // NOLINT for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index) columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index])); @@ -2338,16 +2411,13 @@ void NO_INLINE Aggregator::mergeDataNullKey( } -template -void NO_INLINE Aggregator::mergeDataImpl( - Table & table_dst, - Table & table_src, - Arena * arena) const +template +void NO_INLINE Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena) const { if constexpr (Method::low_cardinality_optimization) mergeDataNullKey(table_dst, table_src, arena); - table_src.mergeToViaEmplace(table_dst, [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted) + auto merge = [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted) { if (!inserted) { @@ -2362,7 +2432,8 @@ void NO_INLINE Aggregator::mergeDataImpl( for (size_t i = 0; i < params.aggregates_size; ++i) { if (!is_aggregate_function_compiled[i]) - aggregate_functions[i]->merge(dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena); + aggregate_functions[i]->merge( + dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena); } for (size_t i = 0; i < params.aggregates_size; ++i) @@ -2388,7 +2459,9 @@ void NO_INLINE Aggregator::mergeDataImpl( } src = nullptr; - }); + }; + + table_src.template mergeToViaEmplace(table_dst, std::move(merge)); table_src.clearAndShrink(); } @@ -2483,6 +2556,9 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl( AggregatedDataVariantsPtr & res = non_empty_data[0]; bool no_more_keys = false; + const bool prefetch = Method::State::has_cheap_key_calculation && params.enable_prefetch + && (getDataVariant(*res).data.getBufferSizeInBytes() > min_bytes_for_prefetch); + /// We merge all aggregation results to the first. for (size_t result_num = 1, size = non_empty_data.size(); result_num < size; ++result_num) { @@ -2496,18 +2572,22 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl( #if USE_EMBEDDED_COMPILER if (compiled_aggregate_functions_holder) { - mergeDataImpl( - getDataVariant(*res).data, - getDataVariant(current).data, - res->aggregates_pool); + if (prefetch) + mergeDataImpl( + getDataVariant(*res).data, getDataVariant(current).data, res->aggregates_pool); + else + mergeDataImpl( + getDataVariant(*res).data, getDataVariant(current).data, res->aggregates_pool); } else #endif { - mergeDataImpl( - getDataVariant(*res).data, - getDataVariant(current).data, - res->aggregates_pool); + if (prefetch) + mergeDataImpl( + getDataVariant(*res).data, getDataVariant(current).data, res->aggregates_pool); + else + mergeDataImpl( + getDataVariant(*res).data, getDataVariant(current).data, res->aggregates_pool); } } else if (res->without_key) @@ -2543,6 +2623,10 @@ void NO_INLINE Aggregator::mergeBucketImpl( { /// We merge all aggregation results to the first. AggregatedDataVariantsPtr & res = data[0]; + + const bool prefetch = Method::State::has_cheap_key_calculation && params.enable_prefetch + && (Method::Data::NUM_BUCKETS * getDataVariant(*res).data.impls[bucket].getBufferSizeInBytes() > min_bytes_for_prefetch); + for (size_t result_num = 1, size = data.size(); result_num < size; ++result_num) { if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) @@ -2552,18 +2636,22 @@ void NO_INLINE Aggregator::mergeBucketImpl( #if USE_EMBEDDED_COMPILER if (compiled_aggregate_functions_holder) { - mergeDataImpl( - getDataVariant(*res).data.impls[bucket], - getDataVariant(current).data.impls[bucket], - arena); + if (prefetch) + mergeDataImpl( + getDataVariant(*res).data.impls[bucket], getDataVariant(current).data.impls[bucket], arena); + else + mergeDataImpl( + getDataVariant(*res).data.impls[bucket], getDataVariant(current).data.impls[bucket], arena); } else #endif { - mergeDataImpl( - getDataVariant(*res).data.impls[bucket], - getDataVariant(current).data.impls[bucket], - arena); + if (prefetch) + mergeDataImpl( + getDataVariant(*res).data.impls[bucket], getDataVariant(current).data.impls[bucket], arena); + else + mergeDataImpl( + getDataVariant(*res).data.impls[bucket], getDataVariant(current).data.impls[bucket], arena); } } } diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 1d317e0a93a..fedb9ad0651 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -939,6 +939,8 @@ public: bool only_merge; + bool enable_prefetch; + struct StatsCollectingParams { StatsCollectingParams(); @@ -974,7 +976,8 @@ public: bool compile_aggregate_expressions_, size_t min_count_to_compile_aggregate_expression_, size_t max_block_size_, - bool only_merge_ = false, // true for projections + bool enable_prefetch_, + bool only_merge_, // true for projections const StatsCollectingParams & stats_collecting_params_ = {}) : keys(keys_) , aggregates(aggregates_) @@ -994,6 +997,7 @@ public: , min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_) , max_block_size(max_block_size_) , only_merge(only_merge_) + , enable_prefetch(enable_prefetch_) , stats_collecting_params(stats_collecting_params_) { } @@ -1001,7 +1005,7 @@ public: /// Only parameters that matter during merge. Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_, size_t max_block_size_) : Params( - keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, max_block_size_, true, {}) + keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, max_block_size_, false, true, {}) { } @@ -1146,6 +1150,8 @@ private: /// For external aggregation. mutable TemporaryFiles temporary_files; + size_t min_bytes_for_prefetch = 0; + #if USE_EMBEDDED_COMPILER std::shared_ptr compiled_aggregate_functions_holder; #endif @@ -1211,7 +1217,7 @@ private: AggregateDataPtr overflow_row) const; /// Specialization for a particular value no_more_keys. - template + template void executeImplBatch( Method & method, typename Method::State & state, @@ -1255,11 +1261,8 @@ private: Arena * arena) const; /// Merge data from hash table `src` into `dst`. - template - void mergeDataImpl( - Table & table_dst, - Table & table_src, - Arena * arena) const; + template + void mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena) const; /// Merge data from hash table `src` into `dst`, but only for keys that already exist in dst. In other cases, merge the data into `overflows`. template diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index c73db82a27b..b561cb77f95 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2360,6 +2360,7 @@ static Aggregator::Params getAggregatorParams( settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression, settings.max_block_size, + settings.enable_software_prefetch_in_aggregation, /* only_merge */ false, stats_collecting_params }; diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 03f346d8f72..86f5c02df83 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -183,6 +183,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B transform_params->params.compile_aggregate_expressions, transform_params->params.min_count_to_compile_aggregate_expression, transform_params->params.max_block_size, + transform_params->params.enable_prefetch, /* only_merge */ false, transform_params->params.stats_collecting_params}; auto transform_params_for_set = std::make_shared(src_header, std::move(params_for_set), final); diff --git a/src/Processors/TTL/TTLAggregationAlgorithm.cpp b/src/Processors/TTL/TTLAggregationAlgorithm.cpp index 6a813a770cf..187d68216cd 100644 --- a/src/Processors/TTL/TTLAggregationAlgorithm.cpp +++ b/src/Processors/TTL/TTLAggregationAlgorithm.cpp @@ -39,7 +39,9 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm( settings.min_free_disk_space_for_temporary_data, settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression, - settings.max_block_size); + settings.max_block_size, + settings.enable_software_prefetch_in_aggregation, + false /* only_merge */); aggregator = std::make_unique(header, params); diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index bb93f7a0a44..286739781c5 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -316,6 +316,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression, settings.max_block_size, + settings.enable_software_prefetch_in_aggregation, only_merge); return std::make_pair(params, only_merge); diff --git a/tests/performance/group_by_fixed_keys.xml b/tests/performance/group_by_fixed_keys.xml index 0be29ff11ac..a64208eb3de 100644 --- a/tests/performance/group_by_fixed_keys.xml +++ b/tests/performance/group_by_fixed_keys.xml @@ -1,7 +1,38 @@ + + 1 + + WITH toUInt8(number) AS k, toUInt64(k) AS k1, k AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2 WITH toUInt8(number) AS k, toUInt16(k) AS k1, toUInt32(k) AS k2, k AS k3 SELECT k1, k2, k3, count() FROM numbers(100000000) GROUP BY k1, k2, k3 WITH toUInt8(number) AS k, k AS k1, k + 1 AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2 WITH toUInt8(number) AS k, k AS k1, k + 1 AS k2, k + 2 AS k3, k + 3 AS k4 SELECT k1, k2, k3, k4, count() FROM numbers(100000000) GROUP BY k1, k2, k3, k4 WITH toUInt8(number) AS k, toUInt64(k) AS k1, k1 + 1 AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2 + + create table group_by_fk(a UInt32, b UInt32, c LowCardinality(UInt32), d Nullable(UInt32), e UInt64, f UInt64, g UInt64, h LowCardinality(UInt64), i Nullable(UInt64)) engine=MergeTree order by tuple() + + insert into group_by_fk select number, number, number % 10000, number % 2 == 0 ? number : Null, number, number, number, number % 10000, number % 2 == 0 ? number : Null from numbers_mt(3e7) + + + select a, b from group_by_fk group by a, b format Null + + select a, c from group_by_fk group by a, c format Null + + select a, d from group_by_fk group by a, d format Null + + + select e, f from group_by_fk group by e, f format Null + + select e, h from group_by_fk group by e, h format Null + + select e, i from group_by_fk group by e, i format Null + + + select e, f, g from group_by_fk group by e, f, g format Null + + select e, f, h from group_by_fk group by e, f, h format Null + + select e, f, i from group_by_fk group by e, f, i format Null + + drop table group_by_fk diff --git a/tests/performance/hash_table_sizes_stats.xml b/tests/performance/hash_table_sizes_stats.xml index 28f9ea4181d..ead0cdfff7e 100644 --- a/tests/performance/hash_table_sizes_stats.xml +++ b/tests/performance/hash_table_sizes_stats.xml @@ -1,4 +1,5 @@ + 1000000000 diff --git a/tests/performance/prefetch_in_aggregation.xml b/tests/performance/prefetch_in_aggregation.xml new file mode 100644 index 00000000000..c059d965335 --- /dev/null +++ b/tests/performance/prefetch_in_aggregation.xml @@ -0,0 +1,32 @@ + + + + size + + 10000000 + 50000000 + + + + table + + numbers + numbers_mt + + + + + create table t_str_key_{size}(a String, b FixedString(25)) engine=MergeTree order by tuple() + insert into t_str_key_{size} select toString(number) as s, toFixedString(s, 25) from numbers_mt({size}) + + select a from t_str_key_{size} group by a format Null + select b from t_str_key_{size} group by b format Null + + select number from {table}({size}) group by number format Null + select count() from {table}({size}) group by number format Null + + select number from numbers_mt(1e8) group by number format Null + select count() from numbers_mt(1e8) group by number format Null + + drop table t_str_key_{size} + diff --git a/tests/performance/single_fixed_string_groupby.xml b/tests/performance/single_fixed_string_groupby.xml index 8e166a124ff..f2b867f0bf0 100644 --- a/tests/performance/single_fixed_string_groupby.xml +++ b/tests/performance/single_fixed_string_groupby.xml @@ -1,4 +1,5 @@ + DROP TABLE IF EXISTS perf_lc_fixed_str_groupby CREATE TABLE perf_lc_fixed_str_groupby( a LowCardinality(FixedString(14)), diff --git a/tests/performance/synthetic_hardware_benchmark.xml b/tests/performance/synthetic_hardware_benchmark.xml index d4d93447265..033ce727715 100644 --- a/tests/performance/synthetic_hardware_benchmark.xml +++ b/tests/performance/synthetic_hardware_benchmark.xml @@ -1,4 +1,5 @@ + 30000000000 diff --git a/tests/performance/website.xml b/tests/performance/website.xml index d83abcf3365..7ff578bdce2 100644 --- a/tests/performance/website.xml +++ b/tests/performance/website.xml @@ -1,4 +1,5 @@ + 30000000000