mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Predict size of hash table for GROUP BY (#33439)
* use AggregationMethod ctor with reserve
* add new settings
* add HashTablesStatistics
* support queries with limit
* support distributed and with external aggregation
* add new profile events
* add some tests
* add perf test
* export cache stats through AsynchronousMetrics
* rm redundant trace
* fix style
* fix 02122_parallel_formatting test
* review fixes
* fix 02122_parallel_formatting test
* apply also to two-level HTs
* try simpler strategy
* increase max_size_to_preallocate_for_aggregation for experiment
* fixes
* Revert "increase max_size_to_preallocate_for_aggregation for experiment"
This reverts commit 6cf6f75704
.
* fix test
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
parent
02574bd05b
commit
30f2a942c5
@ -67,6 +67,9 @@ struct FixedHashTableCalculatedSize
|
||||
{
|
||||
size_t getSize(const Cell * buf, const typename Cell::State & state, size_t num_cells) const
|
||||
{
|
||||
if (!buf)
|
||||
return 0;
|
||||
|
||||
size_t res = 0;
|
||||
for (const Cell * end = buf + num_cells; buf != end; ++buf)
|
||||
if (!buf->isZero(state))
|
||||
@ -76,6 +79,9 @@ struct FixedHashTableCalculatedSize
|
||||
|
||||
bool isEmpty(const Cell * buf, const typename Cell::State & state, size_t num_cells) const
|
||||
{
|
||||
if (!buf)
|
||||
return true;
|
||||
|
||||
for (const Cell * end = buf + num_cells; buf != end; ++buf)
|
||||
if (!buf->isZero(state))
|
||||
return false;
|
||||
|
@ -94,6 +94,12 @@ public:
|
||||
|
||||
TwoLevelHashTable() = default;
|
||||
|
||||
explicit TwoLevelHashTable(size_t size_hint)
|
||||
{
|
||||
for (auto & impl : impls)
|
||||
impl.reserve(size_hint / NUM_BUCKETS);
|
||||
}
|
||||
|
||||
/// Copy the data from another (normal) hash table. It should have the same hash function.
|
||||
template <typename Source>
|
||||
explicit TwoLevelHashTable(const Source & src)
|
||||
|
@ -285,6 +285,9 @@
|
||||
\
|
||||
M(MainConfigLoads, "Number of times the main configuration was reloaded.") \
|
||||
\
|
||||
M(AggregationPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for aggregation.") \
|
||||
M(AggregationHashTablesInitializedAsTwoLevel, "How many hash tables were inited as two-level for aggregation.") \
|
||||
\
|
||||
M(MergeTreeMetadataCacheGet, "Number of rocksdb reads(used for merge tree metadata cache)") \
|
||||
M(MergeTreeMetadataCachePut, "Number of rocksdb puts(used for merge tree metadata cache)") \
|
||||
M(MergeTreeMetadataCacheDelete, "Number of rocksdb deletes(used for merge tree metadata cache)") \
|
||||
|
@ -500,6 +500,10 @@ class IColumn;
|
||||
M(Bool, optimize_rewrite_sum_if_to_count_if, true, "Rewrite sumIf() and sum(if()) function countIf() function when logically equivalent", 0) \
|
||||
M(UInt64, insert_shard_id, 0, "If non zero, when insert into a distributed table, the data will be inserted into the shard `insert_shard_id` synchronously. Possible values range from 1 to `shards_number` of corresponding distributed table", 0) \
|
||||
\
|
||||
M(Bool, collect_hash_table_stats_during_aggregation, true, "Enable collecting hash table statistics to optimize memory allocation", 0) \
|
||||
M(UInt64, max_entries_for_hash_table_stats, 10'000, "How many entries hash table statistics collected during aggregation is allowed to have", 0) \
|
||||
M(UInt64, max_size_to_preallocate_for_aggregation, 10'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \
|
||||
\
|
||||
/** Experimental feature for moving data between shards. */ \
|
||||
\
|
||||
M(Bool, allow_experimental_query_deduplication, false, "Experimental data deduplication for SELECT queries based on part UUIDs", 0) \
|
||||
|
@ -1,4 +1,6 @@
|
||||
#include <algorithm>
|
||||
#include <future>
|
||||
#include <numeric>
|
||||
#include <Poco/Util/Application.h>
|
||||
|
||||
#include <base/sort.h>
|
||||
@ -15,6 +17,7 @@
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <Compression/CompressedWriteBuffer.h>
|
||||
#include <Interpreters/Aggregator.h>
|
||||
#include <Common/LRUCache.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
@ -27,12 +30,236 @@
|
||||
#include <Interpreters/JIT/CompiledExpressionCache.h>
|
||||
#include <Core/ProtocolDefines.h>
|
||||
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event ExternalAggregationWritePart;
|
||||
extern const Event ExternalAggregationCompressedBytes;
|
||||
extern const Event ExternalAggregationUncompressedBytes;
|
||||
extern const Event ExternalAggregationWritePart;
|
||||
extern const Event ExternalAggregationCompressedBytes;
|
||||
extern const Event ExternalAggregationUncompressedBytes;
|
||||
extern const Event AggregationPreallocatedElementsInHashTables;
|
||||
extern const Event AggregationHashTablesInitializedAsTwoLevel;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
/** Collects observed HashMap-s sizes to avoid redundant intermediate resizes.
|
||||
*/
|
||||
class HashTablesStatistics
|
||||
{
|
||||
public:
|
||||
struct Entry
|
||||
{
|
||||
size_t sum_of_sizes; // used to determine if it's better to convert aggregation to two-level from the beginning
|
||||
size_t median_size; // roughly the size we're going to preallocate on each thread
|
||||
};
|
||||
|
||||
using Cache = DB::LRUCache<UInt64, Entry>;
|
||||
using CachePtr = std::shared_ptr<Cache>;
|
||||
using Params = DB::Aggregator::Params::StatsCollectingParams;
|
||||
|
||||
/// Collection and use of the statistics should be enabled.
|
||||
std::optional<Entry> getSizeHint(const Params & params)
|
||||
{
|
||||
if (!params.isCollectionAndUseEnabled())
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
const auto cache = getHashTableStatsCache(params, lock);
|
||||
if (const auto hint = cache->get(params.key))
|
||||
{
|
||||
LOG_DEBUG(
|
||||
&Poco::Logger::get("Aggregator"),
|
||||
"An entry for key={} found in cache: sum_of_sizes={}, median_size={}",
|
||||
params.key,
|
||||
hint->sum_of_sizes,
|
||||
hint->median_size);
|
||||
return *hint;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/// Collection and use of the statistics should be enabled.
|
||||
void update(size_t sum_of_sizes, size_t median_size, const Params & params)
|
||||
{
|
||||
if (!params.isCollectionAndUseEnabled())
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
const auto cache = getHashTableStatsCache(params, lock);
|
||||
const auto hint = cache->get(params.key);
|
||||
// We'll maintain the maximum among all the observed values until the next prediction turns out to be too wrong.
|
||||
if (!hint || sum_of_sizes < hint->sum_of_sizes / 2 || hint->sum_of_sizes < sum_of_sizes || median_size < hint->median_size / 2
|
||||
|| hint->median_size < median_size)
|
||||
{
|
||||
LOG_DEBUG(
|
||||
&Poco::Logger::get("Aggregator"),
|
||||
"Statistics updated for key={}: new sum_of_sizes={}, median_size={}",
|
||||
params.key,
|
||||
sum_of_sizes,
|
||||
median_size);
|
||||
cache->set(params.key, std::make_shared<Entry>(Entry{.sum_of_sizes = sum_of_sizes, .median_size = median_size}));
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<DB::HashTablesCacheStatistics> getCacheStats() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (hash_table_stats)
|
||||
{
|
||||
size_t hits = 0, misses = 0;
|
||||
hash_table_stats->getStats(hits, misses);
|
||||
return DB::HashTablesCacheStatistics{.entries = hash_table_stats->count(), .hits = hits, .misses = misses};
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
static size_t calculateCacheKey(const DB::ASTPtr & select_query)
|
||||
{
|
||||
if (!select_query)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Query ptr cannot be null");
|
||||
|
||||
const auto & select = select_query->as<DB::ASTSelectQuery &>();
|
||||
|
||||
// It may happen in some corner cases like `select 1 as num group by num`.
|
||||
if (!select.tables())
|
||||
return 0;
|
||||
|
||||
SipHash hash;
|
||||
hash.update(select.tables()->getTreeHash());
|
||||
if (const auto where = select.where())
|
||||
hash.update(where->getTreeHash());
|
||||
if (const auto group_by = select.groupBy())
|
||||
hash.update(group_by->getTreeHash());
|
||||
return hash.get64();
|
||||
}
|
||||
|
||||
private:
|
||||
CachePtr getHashTableStatsCache(const Params & params, const std::lock_guard<std::mutex> &)
|
||||
{
|
||||
if (!hash_table_stats || hash_table_stats->maxSize() != params.max_entries_for_hash_table_stats)
|
||||
hash_table_stats = std::make_shared<Cache>(params.max_entries_for_hash_table_stats);
|
||||
return hash_table_stats;
|
||||
}
|
||||
|
||||
mutable std::mutex mutex;
|
||||
CachePtr hash_table_stats;
|
||||
};
|
||||
|
||||
HashTablesStatistics & getHashTablesStatistics()
|
||||
{
|
||||
static HashTablesStatistics hash_tables_stats;
|
||||
return hash_tables_stats;
|
||||
}
|
||||
|
||||
bool worthConvertToTwoLevel(
|
||||
size_t group_by_two_level_threshold, size_t result_size, size_t group_by_two_level_threshold_bytes, auto result_size_bytes)
|
||||
{
|
||||
// params.group_by_two_level_threshold will be equal to 0 if we have only one thread to execute aggregation (refer to AggregatingStep::transformPipeline).
|
||||
return (group_by_two_level_threshold && result_size >= group_by_two_level_threshold)
|
||||
|| (group_by_two_level_threshold_bytes && result_size_bytes >= static_cast<Int64>(group_by_two_level_threshold_bytes));
|
||||
}
|
||||
|
||||
DB::AggregatedDataVariants::Type convertToTwoLevelTypeIfPossible(DB::AggregatedDataVariants::Type type)
|
||||
{
|
||||
using Type = DB::AggregatedDataVariants::Type;
|
||||
switch (type)
|
||||
{
|
||||
#define M(NAME) \
|
||||
case Type::NAME: \
|
||||
return Type::NAME##_two_level;
|
||||
APPLY_FOR_VARIANTS_CONVERTIBLE_TO_TWO_LEVEL(M)
|
||||
#undef M
|
||||
default:
|
||||
return type;
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
void initDataVariantsWithSizeHint(
|
||||
DB::AggregatedDataVariants & result, DB::AggregatedDataVariants::Type method_chosen, const DB::Aggregator::Params & params)
|
||||
{
|
||||
const auto & stats_collecting_params = params.stats_collecting_params;
|
||||
if (stats_collecting_params.isCollectionAndUseEnabled())
|
||||
{
|
||||
if (auto hint = getHashTablesStatistics().getSizeHint(stats_collecting_params))
|
||||
{
|
||||
const auto max_threads = params.group_by_two_level_threshold != 0 ? std::max(params.max_threads, 1ul) : 1;
|
||||
const auto lower_limit = hint->sum_of_sizes / max_threads;
|
||||
const auto upper_limit = stats_collecting_params.max_size_to_preallocate_for_aggregation / max_threads;
|
||||
const auto adjusted = std::min(std::max(lower_limit, hint->median_size), upper_limit);
|
||||
if (worthConvertToTwoLevel(
|
||||
params.group_by_two_level_threshold,
|
||||
hint->sum_of_sizes,
|
||||
/*group_by_two_level_threshold_bytes*/ 0,
|
||||
/*result_size_bytes*/ 0))
|
||||
method_chosen = convertToTwoLevelTypeIfPossible(method_chosen);
|
||||
result.init(method_chosen, adjusted);
|
||||
ProfileEvents::increment(ProfileEvents::AggregationHashTablesInitializedAsTwoLevel, result.isTwoLevel());
|
||||
return;
|
||||
}
|
||||
}
|
||||
result.init(method_chosen);
|
||||
}
|
||||
|
||||
/// Collection and use of the statistics should be enabled.
|
||||
void updateStatistics(const DB::ManyAggregatedDataVariants & data_variants, const DB::Aggregator::Params::StatsCollectingParams & params)
|
||||
{
|
||||
if (!params.isCollectionAndUseEnabled())
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Collection and use of the statistics should be enabled.");
|
||||
|
||||
std::vector<size_t> sizes(data_variants.size());
|
||||
for (size_t i = 0; i < data_variants.size(); ++i)
|
||||
sizes[i] = data_variants[i]->size();
|
||||
const auto median_size = sizes.begin() + sizes.size() / 2; // not precisely though...
|
||||
std::nth_element(sizes.begin(), median_size, sizes.end());
|
||||
const auto sum_of_sizes = std::accumulate(sizes.begin(), sizes.end(), 0ull);
|
||||
getHashTablesStatistics().update(sum_of_sizes, *median_size, params);
|
||||
}
|
||||
|
||||
// The std::is_constructible trait isn't suitable here because some classes have template constructors with semantics different from providing size hints.
|
||||
// Also string hash table variants are not supported due to the fact that both local perf tests and tests in CI showed slowdowns for them.
|
||||
template <typename...>
|
||||
struct HasConstructorOfNumberOfElements : std::false_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename... Ts>
|
||||
struct HasConstructorOfNumberOfElements<HashMapTable<Ts...>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename Key, typename Cell, typename Hash, typename Grower, typename Allocator, template <typename...> typename ImplTable>
|
||||
struct HasConstructorOfNumberOfElements<TwoLevelHashMapTable<Key, Cell, Hash, Grower, Allocator, ImplTable>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename... Ts>
|
||||
struct HasConstructorOfNumberOfElements<HashTable<Ts...>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <typename... Ts>
|
||||
struct HasConstructorOfNumberOfElements<TwoLevelHashTable<Ts...>> : std::true_type
|
||||
{
|
||||
};
|
||||
|
||||
template <template <typename> typename Method, typename Base>
|
||||
struct HasConstructorOfNumberOfElements<Method<Base>> : HasConstructorOfNumberOfElements<Base>
|
||||
{
|
||||
};
|
||||
|
||||
template <typename Method>
|
||||
auto constructWithReserveIfPossible(size_t size_hint)
|
||||
{
|
||||
if constexpr (HasConstructorOfNumberOfElements<typename Method::Data>::value)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::AggregationPreallocatedElementsInHashTables, size_hint);
|
||||
return std::make_unique<Method>(size_hint);
|
||||
}
|
||||
else
|
||||
return std::make_unique<Method>();
|
||||
}
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -64,6 +291,10 @@ AggregatedDataVariants::~AggregatedDataVariants()
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics()
|
||||
{
|
||||
return getHashTablesStatistics().getCacheStats();
|
||||
}
|
||||
|
||||
void AggregatedDataVariants::convertToTwoLevel()
|
||||
{
|
||||
@ -88,6 +319,47 @@ void AggregatedDataVariants::convertToTwoLevel()
|
||||
}
|
||||
}
|
||||
|
||||
void AggregatedDataVariants::init(Type type_, std::optional<size_t> size_hint)
|
||||
{
|
||||
switch (type_)
|
||||
{
|
||||
case Type::EMPTY:
|
||||
break;
|
||||
case Type::without_key:
|
||||
break;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: \
|
||||
if (size_hint) \
|
||||
(NAME) = constructWithReserveIfPossible<decltype(NAME)::element_type>(*size_hint); \
|
||||
else \
|
||||
(NAME) = std::make_unique<decltype(NAME)::element_type>(); \
|
||||
break;
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
type = type_;
|
||||
}
|
||||
|
||||
Aggregator::Params::StatsCollectingParams::StatsCollectingParams() = default;
|
||||
|
||||
Aggregator::Params::StatsCollectingParams::StatsCollectingParams(
|
||||
const ASTPtr & select_query_,
|
||||
bool collect_hash_table_stats_during_aggregation_,
|
||||
size_t max_entries_for_hash_table_stats_,
|
||||
size_t max_size_to_preallocate_for_aggregation_)
|
||||
: key(collect_hash_table_stats_during_aggregation_ ? HashTablesStatistics::calculateCacheKey(select_query_) : 0)
|
||||
, max_entries_for_hash_table_stats(max_entries_for_hash_table_stats_)
|
||||
, max_size_to_preallocate_for_aggregation(max_size_to_preallocate_for_aggregation_)
|
||||
{
|
||||
}
|
||||
|
||||
bool Aggregator::Params::StatsCollectingParams::isCollectionAndUseEnabled() const
|
||||
{
|
||||
return key != 0;
|
||||
}
|
||||
|
||||
Block Aggregator::getHeader(bool final) const
|
||||
{
|
||||
return params.getHeader(final);
|
||||
@ -237,8 +509,7 @@ public:
|
||||
|
||||
#endif
|
||||
|
||||
Aggregator::Aggregator(const Params & params_)
|
||||
: params(params_)
|
||||
Aggregator::Aggregator(const Params & params_) : params(params_)
|
||||
{
|
||||
/// Use query-level memory tracker
|
||||
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
|
||||
@ -292,7 +563,6 @@ Aggregator::Aggregator(const Params & params_)
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
compileAggregateFunctionsIfNeeded();
|
||||
#endif
|
||||
|
||||
}
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
@ -958,7 +1228,7 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
|
||||
/// How to perform the aggregation?
|
||||
if (result.empty())
|
||||
{
|
||||
result.init(method_chosen);
|
||||
initDataVariantsWithSizeHint(result, method_chosen, params);
|
||||
result.keys_size = params.keys_size;
|
||||
result.key_sizes = key_sizes;
|
||||
LOG_TRACE(log, "Aggregation method: {}", result.getMethodName());
|
||||
@ -1038,9 +1308,8 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
|
||||
/// Here all the results in the sum are taken into account, from different threads.
|
||||
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation;
|
||||
|
||||
bool worth_convert_to_two_level
|
||||
= (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold)
|
||||
|| (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast<Int64>(params.group_by_two_level_threshold_bytes));
|
||||
bool worth_convert_to_two_level = worthConvertToTwoLevel(
|
||||
params.group_by_two_level_threshold, result_size, params.group_by_two_level_threshold_bytes, result_size_bytes);
|
||||
|
||||
/** Converting to a two-level data structure.
|
||||
* It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
|
||||
@ -1327,10 +1596,7 @@ void Aggregator::convertToBlockImpl(
|
||||
|
||||
|
||||
template <typename Mapped>
|
||||
inline void Aggregator::insertAggregatesIntoColumns(
|
||||
Mapped & mapped,
|
||||
MutableColumns & final_aggregate_columns,
|
||||
Arena * arena) const
|
||||
inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColumns & final_aggregate_columns, Arena * arena) const
|
||||
{
|
||||
/** Final values of aggregate functions are inserted to columns.
|
||||
* Then states of aggregate functions, that are not longer needed, are destroyed.
|
||||
@ -2179,6 +2445,9 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData
|
||||
|
||||
LOG_TRACE(log, "Merging aggregated data");
|
||||
|
||||
if (params.stats_collecting_params.isCollectionAndUseEnabled())
|
||||
updateStatistics(data_variants, params.stats_collecting_params);
|
||||
|
||||
ManyAggregatedDataVariants non_empty_data;
|
||||
non_empty_data.reserve(data_variants.size());
|
||||
for (auto & data : data_variants)
|
||||
@ -2388,9 +2657,8 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
|
||||
/// Here all the results in the sum are taken into account, from different threads.
|
||||
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation;
|
||||
|
||||
bool worth_convert_to_two_level
|
||||
= (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold)
|
||||
|| (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast<Int64>(params.group_by_two_level_threshold_bytes));
|
||||
bool worth_convert_to_two_level = worthConvertToTwoLevel(
|
||||
params.group_by_two_level_threshold, result_size, params.group_by_two_level_threshold_bytes, result_size_bytes);
|
||||
|
||||
/** Converting to a two-level data structure.
|
||||
* It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnLowCardinality.h>
|
||||
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -129,6 +130,7 @@ private:
|
||||
template <typename Base>
|
||||
struct AggregationDataWithNullKeyTwoLevel : public Base
|
||||
{
|
||||
using Base::Base;
|
||||
using Base::impls;
|
||||
|
||||
AggregationDataWithNullKeyTwoLevel() = default;
|
||||
@ -183,6 +185,8 @@ struct AggregationMethodOneNumber
|
||||
|
||||
AggregationMethodOneNumber() = default;
|
||||
|
||||
explicit AggregationMethodOneNumber(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodOneNumber(const Other & other) : data(other.data)
|
||||
{
|
||||
@ -225,6 +229,8 @@ struct AggregationMethodString
|
||||
{
|
||||
}
|
||||
|
||||
explicit AggregationMethodString(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped>;
|
||||
|
||||
static const bool low_cardinality_optimization = false;
|
||||
@ -250,6 +256,8 @@ struct AggregationMethodStringNoCache
|
||||
|
||||
AggregationMethodStringNoCache() = default;
|
||||
|
||||
explicit AggregationMethodStringNoCache(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodStringNoCache(const Other & other) : data(other.data)
|
||||
{
|
||||
@ -280,6 +288,8 @@ struct AggregationMethodFixedString
|
||||
|
||||
AggregationMethodFixedString() = default;
|
||||
|
||||
explicit AggregationMethodFixedString(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodFixedString(const Other & other) : data(other.data)
|
||||
{
|
||||
@ -309,6 +319,8 @@ struct AggregationMethodFixedStringNoCache
|
||||
|
||||
AggregationMethodFixedStringNoCache() = default;
|
||||
|
||||
explicit AggregationMethodFixedStringNoCache(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodFixedStringNoCache(const Other & other) : data(other.data)
|
||||
{
|
||||
@ -382,6 +394,8 @@ struct AggregationMethodKeysFixed
|
||||
|
||||
AggregationMethodKeysFixed() = default;
|
||||
|
||||
explicit AggregationMethodKeysFixed(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodKeysFixed(const Other & other) : data(other.data)
|
||||
{
|
||||
@ -473,6 +487,8 @@ struct AggregationMethodSerialized
|
||||
|
||||
AggregationMethodSerialized() = default;
|
||||
|
||||
explicit AggregationMethodSerialized(size_t size_hint) : data(size_hint) { }
|
||||
|
||||
template <typename Other>
|
||||
explicit AggregationMethodSerialized(const Other & other) : data(other.data)
|
||||
{
|
||||
@ -652,21 +668,7 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
|
||||
~AggregatedDataVariants();
|
||||
|
||||
void init(Type type_)
|
||||
{
|
||||
switch (type_)
|
||||
{
|
||||
case Type::EMPTY: break;
|
||||
case Type::without_key: break;
|
||||
|
||||
#define M(NAME, IS_TWO_LEVEL) \
|
||||
case Type::NAME: (NAME) = std::make_unique<decltype(NAME)::element_type>(); break;
|
||||
APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
|
||||
type = type_;
|
||||
}
|
||||
void init(Type type_, std::optional<size_t> size_hint = std::nullopt);
|
||||
|
||||
/// Number of rows (different keys).
|
||||
size_t size() const
|
||||
@ -929,29 +931,61 @@ public:
|
||||
bool compile_aggregate_expressions;
|
||||
size_t min_count_to_compile_aggregate_expression;
|
||||
|
||||
struct StatsCollectingParams
|
||||
{
|
||||
StatsCollectingParams();
|
||||
|
||||
StatsCollectingParams(
|
||||
const ASTPtr & select_query_,
|
||||
bool collect_hash_table_stats_during_aggregation_,
|
||||
size_t max_entries_for_hash_table_stats_,
|
||||
size_t max_size_to_preallocate_for_aggregation_);
|
||||
|
||||
bool isCollectionAndUseEnabled() const;
|
||||
|
||||
const UInt64 key = 0;
|
||||
const size_t max_entries_for_hash_table_stats = 0;
|
||||
const size_t max_size_to_preallocate_for_aggregation = 0;
|
||||
};
|
||||
StatsCollectingParams stats_collecting_params;
|
||||
|
||||
Params(
|
||||
const Block & src_header_,
|
||||
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
|
||||
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
|
||||
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
|
||||
const ColumnNumbers & keys_,
|
||||
const AggregateDescriptions & aggregates_,
|
||||
bool overflow_row_,
|
||||
size_t max_rows_to_group_by_,
|
||||
OverflowMode group_by_overflow_mode_,
|
||||
size_t group_by_two_level_threshold_,
|
||||
size_t group_by_two_level_threshold_bytes_,
|
||||
size_t max_bytes_before_external_group_by_,
|
||||
bool empty_result_for_aggregation_by_empty_set_,
|
||||
VolumePtr tmp_volume_, size_t max_threads_,
|
||||
VolumePtr tmp_volume_,
|
||||
size_t max_threads_,
|
||||
size_t min_free_disk_space_,
|
||||
bool compile_aggregate_expressions_,
|
||||
size_t min_count_to_compile_aggregate_expression_,
|
||||
const Block & intermediate_header_ = {})
|
||||
: src_header(src_header_),
|
||||
intermediate_header(intermediate_header_),
|
||||
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
|
||||
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
||||
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
|
||||
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
|
||||
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
|
||||
tmp_volume(tmp_volume_), max_threads(max_threads_),
|
||||
min_free_disk_space(min_free_disk_space_),
|
||||
compile_aggregate_expressions(compile_aggregate_expressions_),
|
||||
min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
|
||||
const Block & intermediate_header_ = {},
|
||||
const StatsCollectingParams & stats_collecting_params_ = {})
|
||||
: src_header(src_header_)
|
||||
, intermediate_header(intermediate_header_)
|
||||
, keys(keys_)
|
||||
, aggregates(aggregates_)
|
||||
, keys_size(keys.size())
|
||||
, aggregates_size(aggregates.size())
|
||||
, overflow_row(overflow_row_)
|
||||
, max_rows_to_group_by(max_rows_to_group_by_)
|
||||
, group_by_overflow_mode(group_by_overflow_mode_)
|
||||
, group_by_two_level_threshold(group_by_two_level_threshold_)
|
||||
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
|
||||
, max_bytes_before_external_group_by(max_bytes_before_external_group_by_)
|
||||
, empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_)
|
||||
, tmp_volume(tmp_volume_)
|
||||
, max_threads(max_threads_)
|
||||
, min_free_disk_space(min_free_disk_space_)
|
||||
, compile_aggregate_expressions(compile_aggregate_expressions_)
|
||||
, min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
|
||||
, stats_collecting_params(stats_collecting_params_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -1350,4 +1384,13 @@ APPLY_FOR_AGGREGATED_VARIANTS(M)
|
||||
|
||||
#undef M
|
||||
|
||||
|
||||
struct HashTablesCacheStatistics
|
||||
{
|
||||
size_t entries = 0;
|
||||
size_t hits = 0;
|
||||
size_t misses = 0;
|
||||
};
|
||||
|
||||
std::optional<HashTablesCacheStatistics> getHashTablesCacheStatistics();
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include <Interpreters/Aggregator.h>
|
||||
#include <Interpreters/AsynchronousMetrics.h>
|
||||
#include <Interpreters/AsynchronousMetricLog.h>
|
||||
#include <Interpreters/JIT/CompiledExpressionCache.h>
|
||||
@ -630,6 +631,15 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
|
||||
|
||||
new_values["Uptime"] = getContext()->getUptimeSeconds();
|
||||
|
||||
{
|
||||
if (const auto stats = getHashTablesCacheStatistics())
|
||||
{
|
||||
new_values["HashTableStatsCacheEntries"] = stats->entries;
|
||||
new_values["HashTableStatsCacheHits"] = stats->hits;
|
||||
new_values["HashTableStatsCacheMisses"] = stats->misses;
|
||||
}
|
||||
}
|
||||
|
||||
/// Process process memory usage according to OS
|
||||
#if defined(OS_LINUX) || defined(OS_FREEBSD)
|
||||
{
|
||||
|
@ -2082,6 +2082,12 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
|
||||
const auto stats_collecting_params = Aggregator::Params::StatsCollectingParams(
|
||||
query_ptr,
|
||||
settings.collect_hash_table_stats_during_aggregation,
|
||||
settings.max_entries_for_hash_table_stats,
|
||||
settings.max_size_to_preallocate_for_aggregation);
|
||||
|
||||
Aggregator::Params params(
|
||||
header_before_aggregation,
|
||||
keys,
|
||||
@ -2099,7 +2105,9 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
settings.max_threads,
|
||||
settings.min_free_disk_space_for_temporary_data,
|
||||
settings.compile_aggregate_expressions,
|
||||
settings.min_count_to_compile_aggregate_expression);
|
||||
settings.min_count_to_compile_aggregate_expression,
|
||||
Block{},
|
||||
stats_collecting_params);
|
||||
|
||||
SortDescription group_by_sort_description;
|
||||
|
||||
|
29
tests/performance/hash_table_sizes_stats.xml
Normal file
29
tests/performance/hash_table_sizes_stats.xml
Normal file
@ -0,0 +1,29 @@
|
||||
<test>
|
||||
<preconditions>
|
||||
<table_exists>hits_10m_single</table_exists>
|
||||
<table_exists>hits_100m_single</table_exists>
|
||||
</preconditions>
|
||||
|
||||
<settings>
|
||||
<max_size_to_preallocate_for_aggregation>1000000000</max_size_to_preallocate_for_aggregation>
|
||||
</settings>
|
||||
|
||||
<query>SELECT number FROM numbers(5000000) GROUP BY number FORMAT Null</query>
|
||||
<query>SELECT number FROM numbers(10000000) GROUP BY number FORMAT Null</query>
|
||||
<query short="1">SELECT number FROM numbers_mt(500000) GROUP BY number FORMAT Null</query>
|
||||
<query short="1">SELECT number FROM numbers_mt(1000000) GROUP BY number FORMAT Null</query>
|
||||
<query>SELECT number FROM numbers_mt(10000000) GROUP BY number FORMAT Null</query>
|
||||
<query>SELECT number FROM numbers_mt(50000000) GROUP BY number FORMAT Null</query>
|
||||
<query>WITH number % 524289 AS k, toUInt64(k) AS k1, k1 + 1 AS k2 SELECT k1, k2, count() FROM numbers(100000000) GROUP BY k1, k2 FORMAT Null</query>
|
||||
<query>SELECT number FROM numbers_mt(10000000) GROUP BY number FORMAT Null SETTINGS group_by_two_level_threshold = 1e12, group_by_two_level_threshold_bytes = 1e12</query>
|
||||
<query>SELECT number FROM numbers_mt(50000000) GROUP BY number FORMAT Null SETTINGS group_by_two_level_threshold = 1e12, group_by_two_level_threshold_bytes = 1e12</query>
|
||||
|
||||
<query>SELECT WatchID FROM hits_10m_single GROUP BY WatchID FORMAT Null</query>
|
||||
<query>SELECT WatchID FROM hits_100m_single GROUP BY WatchID FORMAT Null</query>
|
||||
<query>SELECT ClientIP AS x, x - 1, x - 2, x - 3, count() AS c FROM hits_10m_single GROUP BY x, x - 1, x - 2, x - 3 ORDER BY c DESC LIMIT 10</query>
|
||||
<query>SELECT ClientIP AS x, x - 1, x - 2, x - 3, count() AS c FROM hits_100m_single GROUP BY x, x - 1, x - 2, x - 3 ORDER BY c DESC LIMIT 10</query>
|
||||
<query>SELECT WatchID, ClientIP, count() AS c, sum(Refresh), avg(ResolutionWidth) FROM hits_10m_single WHERE SearchPhrase != '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10</query>
|
||||
<query>SELECT WatchID, ClientIP, count() AS c, sum(Refresh), avg(ResolutionWidth) FROM hits_100m_single WHERE SearchPhrase != '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10</query>
|
||||
<query>SELECT min(MobilePhoneModel) FROM hits_10m_single WHERE MobilePhoneModel != '' GROUP BY intHash32(UserID) % 1000000 FORMAT Null</query>
|
||||
<query>SELECT min(MobilePhoneModel) FROM hits_100m_single WHERE MobilePhoneModel != '' GROUP BY intHash32(UserID) % 1000000 FORMAT Null</query>
|
||||
</test>
|
@ -18,8 +18,8 @@ for format in ${formats}; do
|
||||
diff $non_parallel_file $parallel_file
|
||||
|
||||
echo $format-2
|
||||
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals limit 190000 format $format" --extremes=1 --output_format_parallel_formatting=0 --output_format_pretty_max_rows=1000000 | grep -a -v "elapsed" > $non_parallel_file
|
||||
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals limit 190000 format $format" --extremes=1 --output_format_parallel_formatting=1 --output_format_pretty_max_rows=1000000 | grep -a -v "elapsed" > $parallel_file
|
||||
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals order by number limit 190000 format $format" --extremes=1 --output_format_parallel_formatting=0 --output_format_pretty_max_rows=1000000 | grep -a -v "elapsed" > $non_parallel_file
|
||||
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals order by number limit 190000 format $format" --extremes=1 --output_format_parallel_formatting=1 --output_format_pretty_max_rows=1000000 | grep -a -v "elapsed" > $parallel_file
|
||||
|
||||
diff $non_parallel_file $parallel_file
|
||||
done
|
||||
@ -33,15 +33,17 @@ $CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(nu
|
||||
diff $non_parallel_file $parallel_file
|
||||
|
||||
echo "CustomSeparated-2"
|
||||
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals limit 190000 format CustomSeparated $CUSTOM_SETTINGS" --output_format_parallel_formatting=0 --extremes=1 > $non_parallel_file
|
||||
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals limit 190000 format CustomSeparated $CUSTOM_SETTINGS" --output_format_parallel_formatting=1 --extremes=1 > $parallel_file
|
||||
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals order by number limit 190000 format CustomSeparated $CUSTOM_SETTINGS" --output_format_parallel_formatting=0 --extremes=1 > $non_parallel_file
|
||||
$CLICKHOUSE_CLIENT -q "select number, number + 1, concat('string: ', toString(number)) from numbers(200000) group by number with totals order by number limit 190000 format CustomSeparated $CUSTOM_SETTINGS" --output_format_parallel_formatting=1 --extremes=1 > $parallel_file
|
||||
|
||||
diff $non_parallel_file $parallel_file
|
||||
|
||||
echo -ne '{prefix} \n${data}\n $$ suffix $$\n' > "$CUR_DIR"/02122_template_format_resultset.tmp
|
||||
echo -ne 'x:${x:Quoted}, y:${y:Quoted}, s:${s:Quoted}' > "$CUR_DIR"/02122_template_format_row.tmp
|
||||
resultset_path=$CUR_DIR/$CLICKHOUSE_TEST_UNIQUE_NAME"_template_format_resultset.tmp"
|
||||
echo -ne '{prefix} \n${data}\n $$ suffix $$\n' > $resultset_path
|
||||
row_path=$CUR_DIR/$CLICKHOUSE_TEST_UNIQUE_NAME"_template_format_row.tmp"
|
||||
echo -ne 'x:${x:Quoted}, y:${y:Quoted}, s:${s:Quoted}' > $row_path
|
||||
|
||||
TEMPLATE_SETTINGS="SETTINGS format_template_resultset = '$CUR_DIR/02122_template_format_resultset.tmp', format_template_row = '$CUR_DIR/02122_template_format_row.tmp', format_template_rows_between_delimiter = ';\n'"
|
||||
TEMPLATE_SETTINGS="SETTINGS format_template_resultset = '$resultset_path', format_template_row = '$row_path', format_template_rows_between_delimiter = ';\n'"
|
||||
|
||||
echo "Template-1"
|
||||
$CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', toString(number)) as s from numbers(200000) format Template $TEMPLATE_SETTINGS" --output_format_parallel_formatting=0 > $non_parallel_file
|
||||
@ -49,14 +51,14 @@ $CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', t
|
||||
|
||||
diff $non_parallel_file $parallel_file
|
||||
|
||||
echo -ne '{prefix} \n${data}\n $$ suffix $$\n${totals}\n${min}\n${max}\n${rows:Quoted}\n${rows_before_limit:Quoted}\n${rows_read:Quoted}\n${bytes_read:Quoted}\n' > "$CUR_DIR"/02122_template_format_resultset.tmp
|
||||
echo -ne '{prefix} \n${data}\n $$ suffix $$\n${totals}\n${min}\n${max}\n${rows:Quoted}\n${rows_before_limit:Quoted}\n${rows_read:Quoted}\n${bytes_read:Quoted}\n' > $resultset_path
|
||||
|
||||
echo "Template-2"
|
||||
$CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', toString(number)) as s from numbers(200000) group by number with totals limit 190000 format Template $TEMPLATE_SETTINGS" --output_format_parallel_formatting=0 --extremes=1 > $non_parallel_file
|
||||
$CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', toString(number)) as s from numbers(200000) group by number with totals limit 190000 format Template $TEMPLATE_SETTINGS" --output_format_parallel_formatting=1 --extremes=1 > $parallel_file
|
||||
$CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', toString(number)) as s from numbers(200000) group by number with totals order by number limit 190000 format Template $TEMPLATE_SETTINGS" --output_format_parallel_formatting=0 --extremes=1 > $non_parallel_file
|
||||
$CLICKHOUSE_CLIENT -q "select number as x, number + 1 as y, concat('string: ', toString(number)) as s from numbers(200000) group by number with totals order by number limit 190000 format Template $TEMPLATE_SETTINGS" --output_format_parallel_formatting=1 --extremes=1 > $parallel_file
|
||||
|
||||
diff $non_parallel_file $parallel_file
|
||||
|
||||
rm $non_parallel_file $parallel_file
|
||||
rm "$CUR_DIR"/02122_template_format_resultset.tmp "$CUR_DIR"/02122_template_format_row.tmp
|
||||
rm $resultset_path $row_path
|
||||
|
||||
|
@ -0,0 +1,21 @@
|
||||
1
|
||||
--
|
||||
1
|
||||
--
|
||||
1
|
||||
--
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
--
|
90
tests/queries/0_stateless/02151_hash_table_sizes_stats.sh
Executable file
90
tests/queries/0_stateless/02151_hash_table_sizes_stats.sh
Executable file
@ -0,0 +1,90 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long
|
||||
|
||||
# shellcheck disable=SC2154
|
||||
|
||||
unset CLICKHOUSE_LOG_COMMENT
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
|
||||
# tests rely on that all the rows are unique and max_threads divides table_size
|
||||
table_size=10000
|
||||
max_threads=5
|
||||
|
||||
|
||||
prepare_table() {
|
||||
table_name="t_hash_table_sizes_stats_$RANDOM$RANDOM"
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS $table_name;"
|
||||
if [ -z "$1" ]; then
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE $table_name(number UInt64) Engine=MergeTree() ORDER BY tuple();"
|
||||
else
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE $table_name(number UInt64) Engine=MergeTree() ORDER BY $1;"
|
||||
fi
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM STOP MERGES $table_name;"
|
||||
for ((i = 1; i <= max_threads; i++)); do
|
||||
cnt=$((table_size / max_threads))
|
||||
from=$(((i - 1) * cnt))
|
||||
$CLICKHOUSE_CLIENT -q "INSERT INTO $table_name SELECT * FROM numbers($from, $cnt);"
|
||||
done
|
||||
}
|
||||
|
||||
prepare_table_with_sorting_key() {
|
||||
prepare_table "$1"
|
||||
}
|
||||
|
||||
run_query() {
|
||||
query_id="${CLICKHOUSE_DATABASE}_hash_table_sizes_stats_$RANDOM$RANDOM"
|
||||
$CLICKHOUSE_CLIENT --query_id="$query_id" --multiquery -q "
|
||||
SET max_block_size = $((table_size / 10));
|
||||
SET merge_tree_min_rows_for_concurrent_read = 1;
|
||||
SET max_untracked_memory = 0;
|
||||
SET max_size_to_preallocate_for_aggregation = 1e12;
|
||||
$query"
|
||||
}
|
||||
|
||||
check_preallocated_elements() {
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
|
||||
# rows may be distributed in any way including "everything goes to the one particular thread"
|
||||
min=$1
|
||||
if [ -z "$2" ]; then
|
||||
max=$1
|
||||
else
|
||||
max=$2
|
||||
fi
|
||||
$CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "
|
||||
SELECT COUNT(*)
|
||||
FROM system.query_log
|
||||
WHERE event_date >= yesterday() AND query_id = {query_id:String} AND current_database = currentDatabase()
|
||||
AND ProfileEvents['AggregationPreallocatedElementsInHashTables'] BETWEEN $min AND $max"
|
||||
}
|
||||
|
||||
check_convertion_to_two_level() {
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
|
||||
# rows may be distributed in any way including "everything goes to the one particular thread"
|
||||
$CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "
|
||||
SELECT SUM(ProfileEvents['AggregationHashTablesInitializedAsTwoLevel']) BETWEEN 1 AND $max_threads
|
||||
FROM system.query_log
|
||||
WHERE event_date >= yesterday() AND query_id = {query_id:String} AND current_database = currentDatabase()"
|
||||
}
|
||||
|
||||
print_border() {
|
||||
echo "--"
|
||||
}
|
||||
|
||||
|
||||
# shellcheck source=./02151_hash_table_sizes_stats.testcases
|
||||
source "$CURDIR"/02151_hash_table_sizes_stats.testcases
|
||||
|
||||
|
||||
test_one_thread_simple_group_by
|
||||
test_one_thread_simple_group_by_with_limit
|
||||
test_one_thread_simple_group_by_with_join_and_subquery
|
||||
test_several_threads_simple_group_by_with_limit_single_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_two_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_and_rollup_single_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_and_rollup_two_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_and_cube_single_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_and_cube_two_level_ht
|
195
tests/queries/0_stateless/02151_hash_table_sizes_stats.testcases
Normal file
195
tests/queries/0_stateless/02151_hash_table_sizes_stats.testcases
Normal file
@ -0,0 +1,195 @@
|
||||
test_one_thread_simple_group_by() {
|
||||
expected_size_hint=$table_size
|
||||
prepare_table
|
||||
|
||||
query="
|
||||
-- size_hint = $expected_size_hint --
|
||||
SELECT number
|
||||
FROM $table_name
|
||||
GROUP BY number
|
||||
SETTINGS max_threads = 1
|
||||
FORMAT Null;"
|
||||
|
||||
run_query
|
||||
run_query
|
||||
check_preallocated_elements $expected_size_hint
|
||||
print_border
|
||||
}
|
||||
|
||||
test_one_thread_simple_group_by_with_limit() {
|
||||
expected_size_hint=$table_size
|
||||
prepare_table
|
||||
|
||||
query="
|
||||
-- size_hint = $expected_size_hint despite the presence of limit --
|
||||
SELECT number
|
||||
FROM $table_name
|
||||
GROUP BY number
|
||||
LIMIT 5
|
||||
SETTINGS max_threads = 1
|
||||
FORMAT Null;"
|
||||
|
||||
run_query
|
||||
run_query
|
||||
check_preallocated_elements $expected_size_hint
|
||||
print_border
|
||||
}
|
||||
|
||||
test_one_thread_simple_group_by_with_join_and_subquery() {
|
||||
expected_size_hint=$((table_size + table_size / 2))
|
||||
prepare_table
|
||||
|
||||
query="
|
||||
-- expected two size_hints for different keys: for the inner ($table_size) and the outer aggregation ($((table_size / 2)))
|
||||
SELECT number
|
||||
FROM $table_name AS t1
|
||||
JOIN
|
||||
(
|
||||
SELECT number
|
||||
FROM $table_name AS t2
|
||||
GROUP BY number
|
||||
LIMIT $((table_size / 2))
|
||||
) AS t3 USING(number)
|
||||
GROUP BY number
|
||||
SETTINGS max_threads = 1,
|
||||
distributed_product_mode = 'local'
|
||||
FORMAT Null;"
|
||||
|
||||
run_query
|
||||
run_query
|
||||
check_preallocated_elements $expected_size_hint
|
||||
print_border
|
||||
}
|
||||
|
||||
test_several_threads_simple_group_by_with_limit_single_level_ht() {
|
||||
expected_size_hint=$table_size
|
||||
prepare_table
|
||||
|
||||
query="
|
||||
-- size_hint = $expected_size_hint despite the presence of limit --
|
||||
SELECT number
|
||||
FROM $table_name
|
||||
GROUP BY number
|
||||
LIMIT 5
|
||||
SETTINGS max_threads = $max_threads,
|
||||
group_by_two_level_threshold = $((expected_size_hint + 1)),
|
||||
group_by_two_level_threshold_bytes = $((table_size * 1000))
|
||||
FORMAT Null;"
|
||||
|
||||
run_query
|
||||
run_query
|
||||
check_preallocated_elements $((expected_size_hint / max_threads)) $((expected_size_hint * max_threads))
|
||||
print_border
|
||||
}
|
||||
|
||||
test_several_threads_simple_group_by_with_limit_two_level_ht() {
|
||||
expected_size_hint=$table_size
|
||||
prepare_table
|
||||
|
||||
query="
|
||||
-- size_hint = $expected_size_hint despite the presence of limit --
|
||||
SELECT number
|
||||
FROM $table_name
|
||||
GROUP BY number
|
||||
LIMIT 5
|
||||
SETTINGS max_threads = $max_threads,
|
||||
group_by_two_level_threshold = $expected_size_hint,
|
||||
group_by_two_level_threshold_bytes = $((table_size * 1000))
|
||||
FORMAT Null;"
|
||||
|
||||
run_query
|
||||
run_query
|
||||
check_convertion_to_two_level
|
||||
check_preallocated_elements $((expected_size_hint / max_threads)) $((expected_size_hint * max_threads))
|
||||
print_border
|
||||
}
|
||||
|
||||
test_several_threads_simple_group_by_with_limit_and_rollup_single_level_ht() {
|
||||
expected_size_hint=$table_size
|
||||
prepare_table
|
||||
|
||||
query="
|
||||
-- size_hint = $expected_size_hint despite the presence of limit --
|
||||
SELECT number
|
||||
FROM $table_name
|
||||
GROUP BY number
|
||||
WITH ROLLUP
|
||||
LIMIT 5
|
||||
SETTINGS max_threads = $max_threads,
|
||||
group_by_two_level_threshold = $((expected_size_hint + 1)),
|
||||
group_by_two_level_threshold_bytes = $((table_size * 1000))
|
||||
FORMAT Null;"
|
||||
|
||||
run_query
|
||||
run_query
|
||||
check_preallocated_elements $((expected_size_hint / max_threads)) $((expected_size_hint * max_threads))
|
||||
print_border
|
||||
}
|
||||
|
||||
test_several_threads_simple_group_by_with_limit_and_rollup_two_level_ht() {
|
||||
expected_size_hint=$table_size
|
||||
prepare_table
|
||||
|
||||
query="
|
||||
-- size_hint = $expected_size_hint despite the presence of limit --
|
||||
SELECT number
|
||||
FROM $table_name
|
||||
GROUP BY number
|
||||
WITH ROLLUP
|
||||
LIMIT 5
|
||||
SETTINGS max_threads = $max_threads,
|
||||
group_by_two_level_threshold = $expected_size_hint,
|
||||
group_by_two_level_threshold_bytes = $((table_size * 1000))
|
||||
FORMAT Null;"
|
||||
|
||||
run_query
|
||||
run_query
|
||||
check_convertion_to_two_level
|
||||
check_preallocated_elements $((expected_size_hint / max_threads)) $((expected_size_hint * max_threads))
|
||||
print_border
|
||||
}
|
||||
|
||||
test_several_threads_simple_group_by_with_limit_and_cube_single_level_ht() {
|
||||
expected_size_hint=$table_size
|
||||
prepare_table
|
||||
|
||||
query="
|
||||
-- size_hint = $expected_size_hint despite the presence of limit --
|
||||
SELECT number
|
||||
FROM $table_name
|
||||
GROUP BY number
|
||||
WITH CUBE
|
||||
LIMIT 5
|
||||
SETTINGS max_threads = $max_threads,
|
||||
group_by_two_level_threshold = $((expected_size_hint + 1)),
|
||||
group_by_two_level_threshold_bytes = $((table_size * 1000))
|
||||
FORMAT Null;"
|
||||
|
||||
run_query
|
||||
run_query
|
||||
check_preallocated_elements $((expected_size_hint / max_threads)) $((expected_size_hint * max_threads))
|
||||
print_border
|
||||
}
|
||||
|
||||
test_several_threads_simple_group_by_with_limit_and_cube_two_level_ht() {
|
||||
expected_size_hint=$table_size
|
||||
prepare_table
|
||||
|
||||
query="
|
||||
-- size_hint = $expected_size_hint despite the presence of limit --
|
||||
SELECT number
|
||||
FROM $table_name
|
||||
GROUP BY number
|
||||
WITH CUBE
|
||||
LIMIT 5
|
||||
SETTINGS max_threads = $max_threads,
|
||||
group_by_two_level_threshold = $expected_size_hint,
|
||||
group_by_two_level_threshold_bytes = $((table_size * 1000))
|
||||
FORMAT Null;"
|
||||
|
||||
run_query
|
||||
run_query
|
||||
check_convertion_to_two_level
|
||||
check_preallocated_elements $((expected_size_hint / max_threads)) $((expected_size_hint * max_threads))
|
||||
print_border
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
--
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
--
|
97
tests/queries/0_stateless/02151_hash_table_sizes_stats_distributed.sh
Executable file
97
tests/queries/0_stateless/02151_hash_table_sizes_stats_distributed.sh
Executable file
@ -0,0 +1,97 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long, distributed
|
||||
|
||||
# These tests don't use `current_database = currentDatabase()` condition, because database name isn't propagated during remote queries.
|
||||
|
||||
# shellcheck disable=SC2154
|
||||
|
||||
unset CLICKHOUSE_LOG_COMMENT
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
|
||||
# tests rely on that all the rows are unique and max_threads divides table_size
|
||||
table_size=10000
|
||||
max_threads=5
|
||||
|
||||
|
||||
prepare_table() {
|
||||
table_name="t_hash_table_sizes_stats_$RANDOM$RANDOM"
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS $table_name;"
|
||||
if [ -z "$1" ]; then
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE $table_name(number UInt64) Engine=MergeTree() ORDER BY tuple();"
|
||||
else
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE $table_name(number UInt64) Engine=MergeTree() ORDER BY $1;"
|
||||
fi
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM STOP MERGES $table_name;"
|
||||
for ((i = 1; i <= max_threads; i++)); do
|
||||
cnt=$((table_size / max_threads))
|
||||
from=$(((i - 1) * cnt))
|
||||
$CLICKHOUSE_CLIENT -q "INSERT INTO $table_name SELECT * FROM numbers($from, $cnt);"
|
||||
done
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS ${table_name}_d;"
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE ${table_name}_d AS $table_name ENGINE = Distributed(test_cluster_two_shards, currentDatabase(), $table_name);"
|
||||
table_name="${table_name}_d"
|
||||
}
|
||||
|
||||
prepare_table_with_sorting_key() {
|
||||
prepare_table "$1"
|
||||
}
|
||||
|
||||
run_query() {
|
||||
query_id="${CLICKHOUSE_DATABASE}_hash_table_sizes_stats_$RANDOM$RANDOM"
|
||||
$CLICKHOUSE_CLIENT --query_id="$query_id" --multiquery -q "
|
||||
SET max_block_size = $((table_size / 10));
|
||||
SET merge_tree_min_rows_for_concurrent_read = 1;
|
||||
SET max_untracked_memory = 0;
|
||||
SET prefer_localhost_replica = 1;
|
||||
$query"
|
||||
}
|
||||
|
||||
check_preallocated_elements() {
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
|
||||
# rows may be distributed in any way including "everything goes to the one particular thread"
|
||||
min=$1
|
||||
if [ -z "$2" ]; then
|
||||
max=$1
|
||||
else
|
||||
max=$2
|
||||
fi
|
||||
$CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "
|
||||
SELECT COUNT(*)
|
||||
FROM system.query_log
|
||||
WHERE event_date >= yesterday() AND (query_id = {query_id:String} OR initial_query_id = {query_id:String})
|
||||
AND ProfileEvents['AggregationPreallocatedElementsInHashTables'] BETWEEN $min AND $max
|
||||
GROUP BY query_id"
|
||||
}
|
||||
|
||||
check_convertion_to_two_level() {
|
||||
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
|
||||
# rows may be distributed in any way including "everything goes to the one particular thread"
|
||||
$CLICKHOUSE_CLIENT --param_query_id="$query_id" -q "
|
||||
SELECT SUM(ProfileEvents['AggregationHashTablesInitializedAsTwoLevel']) BETWEEN 1 AND $max_threads
|
||||
FROM system.query_log
|
||||
WHERE event_date >= yesterday() AND (query_id = {query_id:String} OR initial_query_id = {query_id:String})
|
||||
GROUP BY query_id"
|
||||
}
|
||||
|
||||
print_border() {
|
||||
echo "--"
|
||||
}
|
||||
|
||||
|
||||
# shellcheck source=./02151_hash_table_sizes_stats.testcases
|
||||
source "$CURDIR"/02151_hash_table_sizes_stats.testcases
|
||||
|
||||
|
||||
test_one_thread_simple_group_by
|
||||
test_one_thread_simple_group_by_with_limit
|
||||
test_one_thread_simple_group_by_with_join_and_subquery
|
||||
test_several_threads_simple_group_by_with_limit_single_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_two_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_and_rollup_single_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_and_rollup_two_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_and_cube_single_level_ht
|
||||
test_several_threads_simple_group_by_with_limit_and_cube_two_level_ht
|
Loading…
Reference in New Issue
Block a user