Merge pull request #64553 from nickitat/concurrent_hash_join_cache_ht_sizes

Collect hash table sizes statistics in ConcurrentHashJoin
This commit is contained in:
Nikita Taranov 2024-07-10 16:11:27 +00:00 committed by GitHub
commit e7d5992966
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 401 additions and 211 deletions

View File

@ -568,6 +568,7 @@ The server successfully detected this situation and will download merged part fr
M(AggregationPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for aggregation.") \
M(AggregationHashTablesInitializedAsTwoLevel, "How many hash tables were inited as two-level for aggregation.") \
M(AggregationOptimizedEqualRangesOfKeys, "For how many blocks optimization of equal ranges of keys was applied") \
M(HashJoinPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for hash join.") \
\
M(MetadataFromKeeperCacheHit, "Number of times an object storage metadata request was answered from cache without making request to Keeper") \
M(MetadataFromKeeperCacheMiss, "Number of times an object storage metadata request had to be answered from Keeper") \

View File

@ -151,6 +151,7 @@ namespace DB
M(UInt64, global_profiler_real_time_period_ns, 0, "Period for real clock timer of global profiler (in nanoseconds). Set 0 value to turn off the real clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, global_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of global profiler (in nanoseconds). Set 0 value to turn off the CPU clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, enable_azure_sdk_logging, false, "Enables logging from Azure sdk", 0) \
M(UInt64, max_entries_for_hash_table_stats, 10'000, "How many entries hash table statistics collected during aggregation is allowed to have", 0) \
M(String, merge_workload, "default", "Name of workload to be used to access resources for all merges (may be overridden by a merge tree setting)", 0) \
M(String, mutation_workload, "default", "Name of workload to be used to access resources for all mutations (may be overridden by a merge tree setting)", 0) \
M(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \

View File

@ -680,9 +680,11 @@ class IColumn;
M(UInt64, insert_shard_id, 0, "If non zero, when insert into a distributed table, the data will be inserted into the shard `insert_shard_id` synchronously. Possible values range from 1 to `shards_number` of corresponding distributed table", 0) \
\
M(Bool, collect_hash_table_stats_during_aggregation, true, "Enable collecting hash table statistics to optimize memory allocation", 0) \
M(UInt64, max_entries_for_hash_table_stats, 10'000, "How many entries hash table statistics collected during aggregation is allowed to have", 0) \
M(UInt64, max_size_to_preallocate_for_aggregation, 100'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \
\
M(Bool, collect_hash_table_stats_during_joins, true, "Enable collecting hash table statistics to optimize memory allocation", 0) \
M(UInt64, max_size_to_preallocate_for_joins, 100'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before join", 0) \
\
M(Bool, kafka_disable_num_consumers_limit, false, "Disable limit on kafka_num_consumers that depends on the number of available CPU cores", 0) \
M(Bool, enable_software_prefetch_in_aggregation, true, "Enable use of software prefetch in aggregation", 0) \
M(Bool, allow_aggregate_partitions_independently, false, "Enable independent aggregation of partitions on separate threads when partition key suits group by key. Beneficial when number of partitions close to number of cores and partitions have roughly the same size", 0) \
@ -1012,6 +1014,7 @@ class IColumn;
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, async_insert_threads, 16) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_replicated_fetches_network_bandwidth_for_server, 0) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_replicated_sends_network_bandwidth_for_server, 0) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_entries_for_hash_table_stats, 10'000) \
/* ---- */ \
MAKE_OBSOLETE(M, DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic) \
MAKE_OBSOLETE(M, UInt64, max_pipeline_depth, 0) \

View File

@ -67,6 +67,8 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"enable_named_columns_in_function_tuple", false, true, "Generate named tuples in function tuple() when all names are unique and can be treated as unquoted identifiers."},
{"input_format_json_ignore_key_case", false, false, "Ignore json key case while read json field from string."},
{"optimize_trivial_insert_select", true, false, "The optimization does not make sense in many cases."},
{"collect_hash_table_stats_during_joins", false, true, "New setting."},
{"max_size_to_preallocate_for_joins", 0, 100'000'000, "New setting."},
{"input_format_orc_read_use_writer_time_zone", false, false, "Whether use the writer's time zone in ORC stripe for ORC row reader, the default ORC row reader's time zone is GMT."},
{"lightweight_mutation_projection_mode", "throw", "throw", "When lightweight delete happens on a table with projection(s), the possible operations include throw the exception as projection exists, or drop all projection related to this table then do lightweight delete."},
{"database_replicated_allow_heavy_create", true, false, "Long-running DDL queries (CREATE AS SELECT and POPULATE) for Replicated database engine was forbidden"},

View File

@ -26,7 +26,6 @@
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Parsers/ASTSelectQuery.h>
#include <base/sort.h>
#include <Common/CacheBase.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/JSONBuilder.h>
@ -78,115 +77,6 @@ namespace ErrorCodes
namespace
{
/** Collects observed HashMap-s sizes to avoid redundant intermediate resizes.
*/
class HashTablesStatistics
{
public:
struct Entry
{
size_t sum_of_sizes; // used to determine if it's better to convert aggregation to two-level from the beginning
size_t median_size; // roughly the size we're going to preallocate on each thread
};
using Cache = DB::CacheBase<UInt64, Entry>;
using CachePtr = std::shared_ptr<Cache>;
using Params = DB::Aggregator::Params::StatsCollectingParams;
/// Collection and use of the statistics should be enabled.
std::optional<Entry> getSizeHint(const Params & params)
{
if (!params.isCollectionAndUseEnabled())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
std::lock_guard lock(mutex);
const auto cache = getHashTableStatsCache(params, lock);
if (const auto hint = cache->get(params.key))
{
LOG_TRACE(
getLogger("Aggregator"),
"An entry for key={} found in cache: sum_of_sizes={}, median_size={}",
params.key,
hint->sum_of_sizes,
hint->median_size);
return *hint;
}
return std::nullopt;
}
/// Collection and use of the statistics should be enabled.
void update(size_t sum_of_sizes, size_t median_size, const Params & params)
{
if (!params.isCollectionAndUseEnabled())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
std::lock_guard lock(mutex);
const auto cache = getHashTableStatsCache(params, lock);
const auto hint = cache->get(params.key);
// We'll maintain the maximum among all the observed values until the next prediction turns out to be too wrong.
if (!hint || sum_of_sizes < hint->sum_of_sizes / 2 || hint->sum_of_sizes < sum_of_sizes || median_size < hint->median_size / 2
|| hint->median_size < median_size)
{
LOG_TRACE(
getLogger("Aggregator"),
"Statistics updated for key={}: new sum_of_sizes={}, median_size={}",
params.key,
sum_of_sizes,
median_size);
cache->set(params.key, std::make_shared<Entry>(Entry{.sum_of_sizes = sum_of_sizes, .median_size = median_size}));
}
}
std::optional<DB::HashTablesCacheStatistics> getCacheStats() const
{
std::lock_guard lock(mutex);
if (hash_table_stats)
{
size_t hits = 0, misses = 0;
hash_table_stats->getStats(hits, misses);
return DB::HashTablesCacheStatistics{.entries = hash_table_stats->count(), .hits = hits, .misses = misses};
}
return std::nullopt;
}
static size_t calculateCacheKey(const DB::ASTPtr & select_query)
{
if (!select_query)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Query ptr cannot be null");
const auto & select = select_query->as<DB::ASTSelectQuery &>();
// It may happen in some corner cases like `select 1 as num group by num`.
if (!select.tables())
return 0;
SipHash hash;
hash.update(select.tables()->getTreeHash(/*ignore_aliases=*/ true));
if (const auto where = select.where())
hash.update(where->getTreeHash(/*ignore_aliases=*/ true));
if (const auto group_by = select.groupBy())
hash.update(group_by->getTreeHash(/*ignore_aliases=*/ true));
return hash.get64();
}
private:
CachePtr getHashTableStatsCache(const Params & params, const std::lock_guard<std::mutex> &)
{
if (!hash_table_stats || hash_table_stats->maxSizeInBytes() != params.max_entries_for_hash_table_stats)
hash_table_stats = std::make_shared<Cache>(params.max_entries_for_hash_table_stats);
return hash_table_stats;
}
mutable std::mutex mutex;
CachePtr hash_table_stats;
};
HashTablesStatistics & getHashTablesStatistics()
{
static HashTablesStatistics hash_tables_stats;
return hash_tables_stats;
}
bool worthConvertToTwoLevel(
size_t group_by_two_level_threshold, size_t result_size, size_t group_by_two_level_threshold_bytes, auto result_size_bytes)
{
@ -215,49 +105,29 @@ void initDataVariantsWithSizeHint(
DB::AggregatedDataVariants & result, DB::AggregatedDataVariants::Type method_chosen, const DB::Aggregator::Params & params)
{
const auto & stats_collecting_params = params.stats_collecting_params;
if (stats_collecting_params.isCollectionAndUseEnabled())
const auto max_threads = params.group_by_two_level_threshold != 0 ? std::max(params.max_threads, 1ul) : 1;
if (auto hint = getSizeHint(stats_collecting_params, /*tables_cnt=*/max_threads))
{
if (auto hint = getHashTablesStatistics().getSizeHint(stats_collecting_params))
{
const auto max_threads = params.group_by_two_level_threshold != 0 ? std::max(params.max_threads, 1ul) : 1;
const auto lower_limit = hint->sum_of_sizes / max_threads;
const auto upper_limit = stats_collecting_params.max_size_to_preallocate_for_aggregation / max_threads;
if (hint->median_size > upper_limit)
{
/// Since we cannot afford to preallocate as much as we want, we will likely need to do resize anyway.
/// But we will also work with the big (i.e. not so cache friendly) HT from the beginning which may result in a slight slowdown.
/// So let's just do nothing.
LOG_TRACE(
getLogger("Aggregator"),
"No space were preallocated in hash tables because 'max_size_to_preallocate_for_aggregation' has too small value: {}, "
"should be at least {}",
stats_collecting_params.max_size_to_preallocate_for_aggregation,
hint->median_size * max_threads);
}
/// https://github.com/ClickHouse/ClickHouse/issues/44402#issuecomment-1359920703
else if ((max_threads > 1 && hint->sum_of_sizes > 100'000) || hint->sum_of_sizes > 500'000)
{
const auto adjusted = std::max(lower_limit, hint->median_size);
if (worthConvertToTwoLevel(
params.group_by_two_level_threshold,
hint->sum_of_sizes,
/*group_by_two_level_threshold_bytes*/ 0,
/*result_size_bytes*/ 0))
method_chosen = convertToTwoLevelTypeIfPossible(method_chosen);
result.init(method_chosen, adjusted);
ProfileEvents::increment(ProfileEvents::AggregationHashTablesInitializedAsTwoLevel, result.isTwoLevel());
return;
}
}
if (worthConvertToTwoLevel(
params.group_by_two_level_threshold,
hint->sum_of_sizes,
/*group_by_two_level_threshold_bytes*/ 0,
/*result_size_bytes*/ 0))
method_chosen = convertToTwoLevelTypeIfPossible(method_chosen);
result.init(method_chosen, hint->median_size);
}
result.init(method_chosen);
else
{
result.init(method_chosen);
}
ProfileEvents::increment(ProfileEvents::AggregationHashTablesInitializedAsTwoLevel, result.isTwoLevel());
}
/// Collection and use of the statistics should be enabled.
void updateStatistics(const DB::ManyAggregatedDataVariants & data_variants, const DB::Aggregator::Params::StatsCollectingParams & params)
void updateStatistics(const DB::ManyAggregatedDataVariants & data_variants, const DB::StatsCollectingParams & params)
{
if (!params.isCollectionAndUseEnabled())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
return;
std::vector<size_t> sizes(data_variants.size());
for (size_t i = 0; i < data_variants.size(); ++i)
@ -265,7 +135,7 @@ void updateStatistics(const DB::ManyAggregatedDataVariants & data_variants, cons
const auto median_size = sizes.begin() + sizes.size() / 2; // not precisely though...
std::nth_element(sizes.begin(), median_size, sizes.end());
const auto sum_of_sizes = std::accumulate(sizes.begin(), sizes.end(), 0ull);
getHashTablesStatistics().update(sum_of_sizes, *median_size, params);
DB::getHashTablesStatistics().update(sum_of_sizes, *median_size, params);
}
DB::ColumnNumbers calculateKeysPositions(const DB::Block & header, const DB::Aggregator::Params & params)
@ -300,24 +170,6 @@ size_t getMinBytesForPrefetch()
namespace DB
{
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics()
{
return getHashTablesStatistics().getCacheStats();
}
Aggregator::Params::StatsCollectingParams::StatsCollectingParams() = default;
Aggregator::Params::StatsCollectingParams::StatsCollectingParams(
const ASTPtr & select_query_,
bool collect_hash_table_stats_during_aggregation_,
size_t max_entries_for_hash_table_stats_,
size_t max_size_to_preallocate_for_aggregation_)
: key(collect_hash_table_stats_during_aggregation_ ? HashTablesStatistics::calculateCacheKey(select_query_) : 0)
, max_entries_for_hash_table_stats(max_entries_for_hash_table_stats_)
, max_size_to_preallocate_for_aggregation(max_size_to_preallocate_for_aggregation_)
{
}
Block Aggregator::getHeader(bool final) const
{
return params.getHeader(header, final);
@ -2783,8 +2635,7 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData
LOG_TRACE(log, "Merging aggregated data");
if (params.stats_collecting_params.isCollectionAndUseEnabled())
updateStatistics(data_variants, params.stats_collecting_params);
updateStatistics(data_variants, params.stats_collecting_params);
ManyAggregatedDataVariants non_empty_data;
non_empty_data.reserve(data_variants.size());
@ -3486,4 +3337,23 @@ void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result) cons
throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant.");
}
UInt64 calculateCacheKey(const DB::ASTPtr & select_query)
{
if (!select_query)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Query ptr cannot be null");
const auto & select = select_query->as<DB::ASTSelectQuery &>();
// It may happen in some corner cases like `select 1 as num group by num`.
if (!select.tables())
return 0;
SipHash hash;
hash.update(select.tables()->getTreeHash(/*ignore_aliases=*/true));
if (const auto where = select.where())
hash.update(where->getTreeHash(/*ignore_aliases=*/true));
if (const auto group_by = select.groupBy())
hash.update(group_by->getTreeHash(/*ignore_aliases=*/true));
return hash.get64();
}
}

View File

@ -39,9 +39,10 @@
#include <Parsers/IAST_fwd.h>
#include <Interpreters/AggregationMethod.h>
#include <Interpreters/AggregatedData.h>
#include <Interpreters/AggregatedDataVariants.h>
#include <Interpreters/AggregationMethod.h>
#include <Interpreters/HashTablesStatistics.h>
namespace DB
{
@ -128,24 +129,6 @@ public:
const double min_hit_rate_to_use_consecutive_keys_optimization;
struct StatsCollectingParams
{
StatsCollectingParams();
StatsCollectingParams(
const ASTPtr & select_query_,
bool collect_hash_table_stats_during_aggregation_,
size_t max_entries_for_hash_table_stats_,
size_t max_size_to_preallocate_for_aggregation_);
bool isCollectionAndUseEnabled() const { return key != 0; }
void disable() { key = 0; }
UInt64 key = 0;
const size_t max_entries_for_hash_table_stats = 0;
const size_t max_size_to_preallocate_for_aggregation = 0;
};
StatsCollectingParams stats_collecting_params;
Params(
@ -674,6 +657,7 @@ private:
Arena * arena);
};
UInt64 calculateCacheKey(const DB::ASTPtr & select_query);
/** Get the aggregation variant by its type. */
template <typename Method> Method & getDataVariant(AggregatedDataVariants & variants);
@ -685,13 +669,4 @@ APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
struct HashTablesCacheStatistics
{
size_t entries = 0;
size_t hits = 0;
size_t misses = 0;
};
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics();
}

View File

@ -2,6 +2,7 @@
#include <Columns/FilterDescription.h>
#include <Columns/IColumn.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Interpreters/ConcurrentHashJoin.h>
@ -16,12 +17,18 @@
#include <Parsers/parseQuery.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/ThreadPool.h>
#include <Common/WeakHash.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
namespace ProfileEvents
{
extern const Event HashJoinPreallocatedElementsInHashTables;
}
namespace CurrentMetrics
{
extern const Metric ConcurrentHashJoinPoolThreads;
@ -29,6 +36,25 @@ extern const Metric ConcurrentHashJoinPoolThreadsActive;
extern const Metric ConcurrentHashJoinPoolThreadsScheduled;
}
namespace
{
void updateStatistics(const auto & hash_joins, const DB::StatsCollectingParams & params)
{
if (!params.isCollectionAndUseEnabled())
return;
std::vector<size_t> sizes(hash_joins.size());
for (size_t i = 0; i < hash_joins.size(); ++i)
sizes[i] = hash_joins[i]->data->getTotalRowCount();
const auto median_size = sizes.begin() + sizes.size() / 2; // not precisely though...
std::nth_element(sizes.begin(), median_size, sizes.end());
if (auto sum_of_sizes = std::accumulate(sizes.begin(), sizes.end(), 0ull))
DB::getHashTablesStatistics().update(sum_of_sizes, *median_size, params);
}
}
namespace DB
{
@ -46,7 +72,12 @@ static UInt32 toPowerOfTwo(UInt32 x)
}
ConcurrentHashJoin::ConcurrentHashJoin(
ContextPtr context_, std::shared_ptr<TableJoin> table_join_, size_t slots_, const Block & right_sample_block, bool any_take_last_row_)
ContextPtr context_,
std::shared_ptr<TableJoin> table_join_,
size_t slots_,
const Block & right_sample_block,
const StatsCollectingParams & stats_collecting_params_,
bool any_take_last_row_)
: context(context_)
, table_join(table_join_)
, slots(toPowerOfTwo(std::min<UInt32>(static_cast<UInt32>(slots_), 256)))
@ -55,6 +86,7 @@ ConcurrentHashJoin::ConcurrentHashJoin(
CurrentMetrics::ConcurrentHashJoinPoolThreadsActive,
CurrentMetrics::ConcurrentHashJoinPoolThreadsScheduled,
slots))
, stats_collecting_params(stats_collecting_params_)
{
hash_joins.resize(slots);
@ -74,9 +106,14 @@ ConcurrentHashJoin::ConcurrentHashJoin(
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("ConcurrentJoin");
size_t reserve_size = 0;
if (auto hint = getSizeHint(stats_collecting_params, slots))
reserve_size = hint->median_size;
ProfileEvents::increment(ProfileEvents::HashJoinPreallocatedElementsInHashTables, reserve_size);
auto inner_hash_join = std::make_shared<InternalHashJoin>();
inner_hash_join->data = std::make_unique<HashJoin>(
table_join_, right_sample_block, any_take_last_row_, 0, fmt::format("concurrent{}", idx));
table_join_, right_sample_block, any_take_last_row_, reserve_size, fmt::format("concurrent{}", idx));
/// Non zero `max_joined_block_rows` allows to process block partially and return not processed part.
/// TODO: It's not handled properly in ConcurrentHashJoin case, so we set it to 0 to disable this feature.
inner_hash_join->data->setMaxJoinedBlockRows(0);
@ -97,6 +134,8 @@ ConcurrentHashJoin::~ConcurrentHashJoin()
{
try
{
updateStatistics(hash_joins, stats_collecting_params);
for (size_t i = 0; i < slots; ++i)
{
// Hash tables destruction may be very time-consuming.
@ -300,4 +339,16 @@ Blocks ConcurrentHashJoin::dispatchBlock(const Strings & key_columns_names, cons
return result;
}
UInt64 calculateCacheKey(std::shared_ptr<TableJoin> & table_join, const QueryTreeNodePtr & right_table_expression)
{
IQueryTreeNode::HashState hash;
chassert(right_table_expression);
hash.update(right_table_expression->getTreeHash());
chassert(table_join && table_join->oneDisjunct());
const auto keys
= NameOrderedSet{table_join->getClauses().at(0).key_names_right.begin(), table_join->getClauses().at(0).key_names_right.end()};
for (const auto & name : keys)
hash.update(name);
return hash.get64();
}
}

View File

@ -3,8 +3,10 @@
#include <condition_variable>
#include <memory>
#include <optional>
#include <Analyzer/IQueryTreeNode.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/HashTablesStatistics.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/IJoin.h>
#include <base/defines.h>
@ -38,6 +40,7 @@ public:
std::shared_ptr<TableJoin> table_join_,
size_t slots_,
const Block & right_sample_block,
const StatsCollectingParams & stats_collecting_params_,
bool any_take_last_row_ = false);
~ConcurrentHashJoin() override;
@ -70,6 +73,8 @@ private:
std::unique_ptr<ThreadPool> pool;
std::vector<std::shared_ptr<InternalHashJoin>> hash_joins;
StatsCollectingParams stats_collecting_params;
std::mutex totals_mutex;
Block totals;
@ -77,4 +82,5 @@ private:
Blocks dispatchBlock(const Strings & key_columns_names, const Block & from_block);
};
UInt64 calculateCacheKey(std::shared_ptr<TableJoin> & table_join, const QueryTreeNodePtr & right_table_expression);
}

View File

@ -986,7 +986,8 @@ static std::shared_ptr<IJoin> tryCreateJoin(
const auto & settings = context->getSettings();
if (analyzed_join->allowParallelHashJoin())
return std::make_shared<ConcurrentHashJoin>(context, analyzed_join, settings.max_threads, right_sample_block);
return std::make_shared<ConcurrentHashJoin>(
context, analyzed_join, settings.max_threads, right_sample_block, StatsCollectingParams{});
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
}

View File

@ -0,0 +1,117 @@
#include <Interpreters/HashTablesStatistics.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
std::optional<HashTablesStatistics::Entry> HashTablesStatistics::getSizeHint(const Params & params)
{
if (!params.isCollectionAndUseEnabled())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
std::lock_guard lock(mutex);
const auto cache = getHashTableStatsCache(params, lock);
if (const auto hint = cache->get(params.key))
{
LOG_TRACE(
getLogger("HashTablesStatistics"),
"An entry for key={} found in cache: sum_of_sizes={}, median_size={}",
params.key,
hint->sum_of_sizes,
hint->median_size);
return *hint;
}
return std::nullopt;
}
/// Collection and use of the statistics should be enabled.
void HashTablesStatistics::update(size_t sum_of_sizes, size_t median_size, const Params & params)
{
if (!params.isCollectionAndUseEnabled())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
std::lock_guard lock(mutex);
const auto cache = getHashTableStatsCache(params, lock);
const auto hint = cache->get(params.key);
// We'll maintain the maximum among all the observed values until another prediction is much lower (that should indicate some change)
if (!hint || sum_of_sizes < hint->sum_of_sizes / 2 || hint->sum_of_sizes < sum_of_sizes || median_size < hint->median_size / 2
|| hint->median_size < median_size)
{
LOG_TRACE(
getLogger("HashTablesStatistics"),
"Statistics updated for key={}: new sum_of_sizes={}, median_size={}",
params.key,
sum_of_sizes,
median_size);
cache->set(params.key, std::make_shared<Entry>(Entry{.sum_of_sizes = sum_of_sizes, .median_size = median_size}));
}
}
std::optional<HashTablesCacheStatistics> HashTablesStatistics::getCacheStats() const
{
std::lock_guard lock(mutex);
if (hash_table_stats)
{
size_t hits = 0, misses = 0;
hash_table_stats->getStats(hits, misses);
return DB::HashTablesCacheStatistics{.entries = hash_table_stats->count(), .hits = hits, .misses = misses};
}
return std::nullopt;
}
HashTablesStatistics::CachePtr HashTablesStatistics::getHashTableStatsCache(const Params & params, const std::lock_guard<std::mutex> &)
{
if (!hash_table_stats)
hash_table_stats = std::make_shared<Cache>(params.max_entries_for_hash_table_stats * sizeof(Entry));
return hash_table_stats;
}
HashTablesStatistics & getHashTablesStatistics()
{
static HashTablesStatistics hash_tables_stats;
return hash_tables_stats;
}
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics()
{
return getHashTablesStatistics().getCacheStats();
}
std::optional<HashTablesStatistics::Entry> getSizeHint(const DB::StatsCollectingParams & stats_collecting_params, size_t tables_cnt)
{
if (stats_collecting_params.isCollectionAndUseEnabled())
{
if (auto hint = DB::getHashTablesStatistics().getSizeHint(stats_collecting_params))
{
const auto lower_limit = hint->sum_of_sizes / tables_cnt;
const auto upper_limit = stats_collecting_params.max_size_to_preallocate / tables_cnt;
if (hint->median_size > upper_limit)
{
/// Since we cannot afford to preallocate as much as needed, we would likely have to do at least one resize anyway.
/// Though it still sounds better than N resizes, but in actuality we saw that one big resize (remember, HT-s grow exponentially)
/// plus worse cache locality since we're dealing with big HT-s from the beginning yields worse performance.
/// So let's just do nothing.
LOG_TRACE(
getLogger("HashTablesStatistics"),
"No space were preallocated in hash tables because 'max_size_to_preallocate' has too small value: {}, "
"should be at least {}",
stats_collecting_params.max_size_to_preallocate,
hint->median_size * tables_cnt);
}
/// https://github.com/ClickHouse/ClickHouse/issues/44402#issuecomment-1359920703
else if ((tables_cnt > 1 && hint->sum_of_sizes > 100'000) || hint->sum_of_sizes > 500'000)
{
return HashTablesStatistics::Entry{hint->sum_of_sizes, std::max(lower_limit, hint->median_size)};
}
}
}
return std::nullopt;
}
}

View File

@ -0,0 +1,69 @@
#pragma once
#include <Common/CacheBase.h>
namespace DB
{
struct HashTablesCacheStatistics
{
size_t entries = 0;
size_t hits = 0;
size_t misses = 0;
};
struct StatsCollectingParams
{
StatsCollectingParams() = default;
StatsCollectingParams(UInt64 key_, bool enable_, size_t max_entries_for_hash_table_stats_, size_t max_size_to_preallocate_)
: key(enable_ ? key_ : 0)
, max_entries_for_hash_table_stats(max_entries_for_hash_table_stats_)
, max_size_to_preallocate(max_size_to_preallocate_)
{
}
bool isCollectionAndUseEnabled() const { return key != 0; }
void disable() { key = 0; }
UInt64 key = 0;
const size_t max_entries_for_hash_table_stats = 0;
const size_t max_size_to_preallocate = 0;
};
/** Collects observed HashTable-s sizes to avoid redundant intermediate resizes.
*/
class HashTablesStatistics
{
public:
struct Entry
{
size_t sum_of_sizes; // used to determine if it's better to convert aggregation to two-level from the beginning
size_t median_size; // roughly the size we're going to preallocate on each thread
};
using Cache = DB::CacheBase<UInt64, Entry>;
using CachePtr = std::shared_ptr<Cache>;
using Params = StatsCollectingParams;
/// Collection and use of the statistics should be enabled.
std::optional<Entry> getSizeHint(const Params & params);
/// Collection and use of the statistics should be enabled.
void update(size_t sum_of_sizes, size_t median_size, const Params & params);
std::optional<DB::HashTablesCacheStatistics> getCacheStats() const;
private:
CachePtr getHashTableStatsCache(const Params & params, const std::lock_guard<std::mutex> &);
mutable std::mutex mutex;
CachePtr hash_table_stats;
};
HashTablesStatistics & getHashTablesStatistics();
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics();
std::optional<HashTablesStatistics::Entry> getSizeHint(const DB::StatsCollectingParams & stats_collecting_params, size_t tables_cnt);
}

View File

@ -85,16 +85,17 @@
#include <Core/Field.h>
#include <Core/ProtocolDefines.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/HashTablesStatistics.h>
#include <Interpreters/IJoin.h>
#include <QueryPipeline/SizeLimits.h>
#include <base/map.h>
#include <Common/FieldVisitorToString.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Common/NaNUtils.h>
#include <Common/ProfileEvents.h>
#include <Common/checkStackSize.h>
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>
#include <Common/ProfileEvents.h>
#include <Common/NaNUtils.h>
namespace ProfileEvents
@ -2615,10 +2616,10 @@ static Aggregator::Params getAggregatorParams(
size_t group_by_two_level_threshold,
size_t group_by_two_level_threshold_bytes)
{
const auto stats_collecting_params = Aggregator::Params::StatsCollectingParams(
query_ptr,
const auto stats_collecting_params = StatsCollectingParams(
calculateCacheKey(query_ptr),
settings.collect_hash_table_stats_during_aggregation,
settings.max_entries_for_hash_table_stats,
context.getServerSettings().max_entries_for_hash_table_stats,
settings.max_size_to_preallocate_for_aggregation);
return Aggregator::Params

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnSet.h>
#include <Core/ProtocolDefines.h>
#include <Interpreters/HashTablesStatistics.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
@ -370,10 +371,10 @@ Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context
const auto & query_context = planner_context->getQueryContext();
const Settings & settings = query_context->getSettingsRef();
const auto stats_collecting_params = Aggregator::Params::StatsCollectingParams(
select_query_info.query,
const auto stats_collecting_params = StatsCollectingParams(
calculateCacheKey(select_query_info.query),
settings.collect_hash_table_stats_during_aggregation,
settings.max_entries_for_hash_table_stats,
query_context->getServerSettings().max_entries_for_hash_table_stats,
settings.max_size_to_preallocate_for_aggregation);
auto aggregate_descriptions = aggregation_analysis_result.aggregate_descriptions;

View File

@ -43,6 +43,8 @@
#include <Planner/PlannerContext.h>
#include <Planner/Utils.h>
#include <Core/ServerSettings.h>
namespace DB
{
@ -768,10 +770,8 @@ std::shared_ptr<DirectKeyValueJoin> tryDirectJoin(const std::shared_ptr<TableJoi
return std::make_shared<DirectKeyValueJoin>(table_join, right_table_expression_header, storage, right_table_expression_header_with_storage_column_names);
}
}
static std::shared_ptr<IJoin> tryCreateJoin(JoinAlgorithm algorithm,
std::shared_ptr<TableJoin> & table_join,
const QueryTreeNodePtr & right_table_expression,
@ -803,9 +803,17 @@ static std::shared_ptr<IJoin> tryCreateJoin(JoinAlgorithm algorithm,
algorithm == JoinAlgorithm::DEFAULT)
{
auto query_context = planner_context->getQueryContext();
if (table_join->allowParallelHashJoin())
return std::make_shared<ConcurrentHashJoin>(query_context, table_join, query_context->getSettings().max_threads, right_table_expression_header);
{
const auto & settings = query_context->getSettingsRef();
StatsCollectingParams params{
calculateCacheKey(table_join, right_table_expression),
settings.collect_hash_table_stats_during_joins,
query_context->getServerSettings().max_entries_for_hash_table_stats,
settings.max_size_to_preallocate_for_joins};
return std::make_shared<ConcurrentHashJoin>(
query_context, table_join, query_context->getSettings().max_threads, right_table_expression_header, params);
}
return std::make_shared<HashJoin>(table_join, right_table_expression_header, query_context->getSettingsRef().join_any_take_last_row);
}

View File

@ -0,0 +1,10 @@
1
1
1
1
1
1
1
1
1
1

View File

@ -0,0 +1,74 @@
#!/usr/bin/env bash
# Tags: long, distributed, no-debug, no-tsan, no-msan, no-ubsan, no-asan, no-random-settings, no-random-merge-tree-settings
# shellcheck disable=SC2154
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
opts=(
--join_algorithm='parallel_hash'
)
$CLICKHOUSE_CLIENT -nq "
CREATE TABLE t1(a UInt32, b UInt32) ENGINE=MergeTree ORDER BY ();
INSERT INTO t1 SELECT number, number FROM numbers_mt(1e6);
CREATE TABLE t2(a UInt32, b UInt32) ENGINE=MergeTree ORDER BY ();
INSERT INTO t2 SELECT number, number FROM numbers_mt(1e6);
"
queries_without_preallocation=()
queries_with_preallocation=()
run_new_query() {
query_id1="hash_table_sizes_stats_joins_$RANDOM$RANDOM"
# when we see a query for the first time we only collect it stats when execution ends. preallocation will happen only on the next run
queries_without_preallocation+=("$query_id1")
$CLICKHOUSE_CLIENT "${opts[@]}" --query_id="$query_id1" -q "$1" --format Null
query_id2="hash_table_sizes_stats_joins_$RANDOM$RANDOM"
queries_with_preallocation+=("$query_id2")
$CLICKHOUSE_CLIENT "${opts[@]}" --query_id="$query_id2" -q "$1" --format Null
}
run_new_query "SELECT * FROM t1 AS x INNER JOIN t2 AS y ON x.a = y.a"
# it only matters what columns from the right table are part of the join key, as soon as we change them - it is a new cache entry
run_new_query "SELECT * FROM t1 AS x INNER JOIN t2 AS y ON x.a = y.b"
run_new_query "SELECT * FROM t1 AS x INNER JOIN t2 AS y USING (a, b)"
# we already had a join on t2.a, so cache should be populated
query_id="hash_table_sizes_stats_joins_$RANDOM$RANDOM"
queries_with_preallocation+=("$query_id")
$CLICKHOUSE_CLIENT "${opts[@]}" --query_id="$query_id" -q "SELECT * FROM t1 AS x INNER JOIN t2 AS y ON x.b = y.a" --format Null
# the same query with a different alias for the t2
query_id="hash_table_sizes_stats_joins_$RANDOM$RANDOM"
queries_with_preallocation+=("$query_id")
$CLICKHOUSE_CLIENT "${opts[@]}" --query_id="$query_id" -q "SELECT * FROM t1 AS x INNER JOIN t2 AS z ON x.b = z.a" --format Null
# now t1 is the right table
run_new_query "SELECT * FROM t2 AS x INNER JOIN t1 AS y ON x.a = y.a"
##################################
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
for i in "${!queries_without_preallocation[@]}"; do
$CLICKHOUSE_CLIENT --param_query_id="${queries_without_preallocation[$i]}" -q "
-- the old analyzer is not supported
SELECT sum(if(getSetting('allow_experimental_analyzer'), ProfileEvents['HashJoinPreallocatedElementsInHashTables'] = 0, 1))
FROM system.query_log
WHERE event_date >= yesterday() AND query_id = {query_id:String} AND current_database = currentDatabase() AND type = 'QueryFinish'
"
done
for i in "${!queries_with_preallocation[@]}"; do
$CLICKHOUSE_CLIENT --param_query_id="${queries_with_preallocation[$i]}" -q "
-- the old analyzer is not supported
SELECT sum(if(getSetting('allow_experimental_analyzer'), ProfileEvents['HashJoinPreallocatedElementsInHashTables'] > 0, 1))
FROM system.query_log
WHERE event_date >= yesterday() AND query_id = {query_id:String} AND current_database = currentDatabase() AND type = 'QueryFinish'
"
done