slightly faster and perf test

This commit is contained in:
Anton Popov 2023-12-15 21:46:59 +00:00
parent 97ba04e4c7
commit 5faf5e913e
4 changed files with 75 additions and 56 deletions

View File

@ -35,6 +35,12 @@ struct LastElementCacheStats
{
UInt64 hits = 0;
UInt64 misses = 0;
void update(size_t num_tries, size_t num_misses)
{
hits += num_tries - num_misses;
misses += num_misses;
}
};
namespace columns_hashing_impl
@ -48,15 +54,15 @@ struct LastElementCache
Value value;
bool empty = true;
bool found = false;
LastElementCacheStats stats;
UInt64 misses = 0;
bool check(const Value & value_) const { return value == value_; }
template <typename Key>
bool check(const Key & key) const { return value.first == key; }
bool hasOnlyOneValue() const { return !empty && found && stats.misses == 0; }
LastElementCacheStats getStats() const { return stats; }
bool hasOnlyOneValue() const { return found && misses == 1; }
UInt64 getMisses() const { return misses; }
};
template <typename Data>
@ -212,7 +218,7 @@ public:
{
cache.empty = true;
cache.found = false;
cache.stats = {};
cache.misses = 0;
}
}
@ -223,11 +229,11 @@ public:
return false;
}
ALWAYS_INLINE LastElementCacheStats getCacheStatsSinceLastReset() const
ALWAYS_INLINE UInt64 getCacheMissesSinceLastReset() const
{
if constexpr (consecutive_keys_optimization)
return cache.getStats();
return {0, 0};
return cache.getMisses();
return 0;
}
ALWAYS_INLINE bool isNullAt(size_t row) const
@ -271,20 +277,12 @@ protected:
{
if constexpr (consecutive_keys_optimization)
{
if (!cache.empty)
if (cache.found && cache.check(keyHolderGetKey(key_holder)))
{
if (cache.found && cache.check(keyHolderGetKey(key_holder)))
{
++cache.stats.hits;
if constexpr (has_mapped)
return EmplaceResult(cache.value.second, cache.value.second, false);
else
return EmplaceResult(false);
}
if constexpr (has_mapped)
return EmplaceResult(cache.value.second, cache.value.second, false);
else
{
++cache.stats.misses;
}
return EmplaceResult(false);
}
}
@ -308,6 +306,7 @@ protected:
{
cache.found = true;
cache.empty = false;
++cache.misses;
if constexpr (has_mapped)
{
@ -330,25 +329,17 @@ protected:
template <typename Data, typename Key>
ALWAYS_INLINE FindResult findKeyImpl(Key key, Data & data)
{
if constexpr (Cache::consecutive_keys_optimization)
if constexpr (consecutive_keys_optimization)
{
/// It's possible to support such combination, but code will became more complex.
/// Now there's not place where we need this options enabled together
static_assert(!FindResult::has_offset, "`consecutive_keys_optimization` and `has_offset` are conflicting options");
if (!cache.empty)
if (likely(!cache.empty) && cache.check(key))
{
if (cache.check(key))
{
++cache.stats.hits;
if constexpr (has_mapped)
return FindResult(&cache.value.second, cache.found, 0);
else
return FindResult(cache.found, 0);
}
if constexpr (has_mapped)
return FindResult(&cache.value.second, cache.found, 0);
else
{
++cache.stats.misses;
}
return FindResult(cache.found, 0);
}
}
@ -358,6 +349,7 @@ protected:
{
cache.found = it != nullptr;
cache.empty = false;
++cache.misses;
if constexpr (has_mapped)
{

View File

@ -1014,7 +1014,9 @@ void Aggregator::mergeOnBlockSmall(
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(result.aggregates_pool, *result.NAME, result.NAME->data, \
result.without_key, /* no_more_keys= */ false, \
result.without_key, \
result.consecutive_keys_cache_stats, \
/* no_more_keys= */ false, \
row_begin, row_end, \
aggregate_columns_data, key_columns, result.aggregates_pool);
@ -1067,10 +1069,7 @@ void NO_INLINE Aggregator::executeImpl(
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
executeImpl(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, no_more_keys, all_keys_are_const, overflow_row);
auto current_stats = state.getCacheStatsSinceLastReset();
consecutive_keys_cache_stats.hits += current_stats.hits;
consecutive_keys_cache_stats.misses += current_stats.misses;
consecutive_keys_cache_stats.update(row_end - row_begin, state.getCacheMissesSinceLastReset());
}
else
{
@ -2921,20 +2920,17 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData
return non_empty_data;
}
template <bool no_more_keys, typename Method, typename Table>
template <bool no_more_keys, typename State, typename Table>
void NO_INLINE Aggregator::mergeStreamsImplCase(
Arena * aggregates_pool,
Method & method [[maybe_unused]],
State & state,
Table & data,
AggregateDataPtr overflow_row,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[row_end]);
if (!arena_for_keys)
@ -2990,6 +2986,7 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
Method & method,
Table & data,
AggregateDataPtr overflow_row,
LastElementCacheStats & consecutive_keys_cache_stats,
bool no_more_keys,
Arena * arena_for_keys) const
{
@ -2997,15 +2994,17 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
const ColumnRawPtrs & key_columns = params.makeRawKeyColumns(block);
mergeStreamsImpl<Method, Table>(
aggregates_pool, method, data, overflow_row, no_more_keys, 0, block.rows(), aggregate_columns_data, key_columns, arena_for_keys);
aggregates_pool, method, data, overflow_row, consecutive_keys_cache_stats,
no_more_keys, 0, block.rows(), aggregate_columns_data, key_columns, arena_for_keys);
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImpl(
Arena * aggregates_pool,
Method & method,
Method & method [[maybe_unused]],
Table & data,
AggregateDataPtr overflow_row,
LastElementCacheStats & consecutive_keys_cache_stats,
bool no_more_keys,
size_t row_begin,
size_t row_end,
@ -3013,12 +3012,30 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const
{
if (!no_more_keys)
mergeStreamsImplCase<false>(
aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys);
UInt64 total_rows = consecutive_keys_cache_stats.hits + consecutive_keys_cache_stats.misses;
double cache_hit_rate = total_rows ? static_cast<double>(consecutive_keys_cache_stats.hits) / (total_rows) : 1.0;
bool use_cache = cache_hit_rate >= 0.5;
if (use_cache)
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if (!no_more_keys)
mergeStreamsImplCase<false>(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
else
mergeStreamsImplCase<true>(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
consecutive_keys_cache_stats.update(row_end - row_begin, state.getCacheMissesSinceLastReset());
}
else
mergeStreamsImplCase<true>(
aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys);
{
typename Method::StateNoCache state(key_columns, key_sizes, aggregation_state_cache);
if (!no_more_keys)
mergeStreamsImplCase<false>(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
else
mergeStreamsImplCase<true>(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
}
}
@ -3078,7 +3095,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, result.consecutive_keys_cache_stats, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -3183,7 +3200,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
{
#define M(NAME) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(std::move(block), aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
mergeStreamsImpl(std::move(block), aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, result.consecutive_keys_cache_stats, false);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
@ -3238,7 +3255,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, result.consecutive_keys_cache_stats, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -3316,7 +3333,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false, arena_for_keys.get());
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, result.consecutive_keys_cache_stats, false, arena_for_keys.get());
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M

View File

@ -1467,16 +1467,15 @@ private:
bool final,
ThreadPool * thread_pool) const;
template <bool no_more_keys, typename Method, typename Table>
template <bool no_more_keys, typename State, typename Table>
void mergeStreamsImplCase(
Arena * aggregates_pool,
Method & method,
State & state,
Table & data,
AggregateDataPtr overflow_row,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const;
/// `arena_for_keys` used to store serialized aggregation keys (in methods like `serialized`) to save some space.
@ -1488,6 +1487,7 @@ private:
Method & method,
Table & data,
AggregateDataPtr overflow_row,
LastElementCacheStats & consecutive_keys_cache_stats,
bool no_more_keys,
Arena * arena_for_keys = nullptr) const;
@ -1497,6 +1497,7 @@ private:
Method & method,
Table & data,
AggregateDataPtr overflow_row,
LastElementCacheStats & consecutive_keys_cache_stats,
bool no_more_keys,
size_t row_begin,
size_t row_end,
@ -1507,6 +1508,7 @@ private:
void mergeBlockWithoutKeyStreamsImpl(
Block block,
AggregatedDataVariants & result) const;
void mergeWithoutKeyStreamsImpl(
AggregatedDataVariants & result,
size_t row_begin,

View File

@ -0,0 +1,8 @@
<test>
<query>SELECT toUInt64(intDiv(number, 1000000)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null</query>
<query>SELECT toString(intDiv(number, 1000000)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null</query>
<query>SELECT toUInt64(intDiv(number, 1000000)) AS n, count(), uniq(number) FROM numbers(10000000) GROUP BY n FORMAT Null</query>
<query>SELECT toUInt64(intDiv(number, 100000)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null</query>
<query>SELECT toUInt64(intDiv(number, 100)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null</query>
<query>SELECT toUInt64(intDiv(number, 10)) AS n, count(), sum(number) FROM numbers(10000000) GROUP BY n FORMAT Null</query>
</test>