Merge pull request #71406 from azat/automatic-external-aggregation

Automatic GROUP/ORDER BY to disk based on the memory usage
This commit is contained in:
Mikhail Artemenko 2024-11-29 12:27:43 +00:00 committed by GitHub
commit b81ee27750
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 382 additions and 176 deletions

View File

@ -122,6 +122,19 @@ Default value: `0`.
Cloud default value: half the memory amount per replica.
## max_bytes_ratio_before_external_group_by {#settings-max_bytes_ratio_before_external_group_by}
The ratio of available memory that is allowed for `GROUP BY`, once reached, uses external memory for aggregation.
For example, if set to `0.6`, `GROUP BY` will allow to use `60%` of available memory (to server/user/merges) at the beginning of the execution, after that, it will start using external aggregation.
Default value: `0.0`.
:::note
- You cannot use both `max_bytes_ratio_before_external_group_by` and `max_bytes_before_external_group_by`
- **The algorithm is experimental and subject to change**
:::
## max_bytes_before_external_sort {#settings-max_bytes_before_external_sort}
Enables or disables execution of `ORDER BY` clauses in external memory. See [ORDER BY Implementation Details](../../sql-reference/statements/select/order-by.md#implementation-details)
@ -133,6 +146,19 @@ Default value: 0.
Cloud default value: half the memory amount per replica.
## max_bytes_ratio_before_external_sort {#settings-max_bytes_ratio_before_external_sort}
The ratio of available memory that is allowed for `ORDER BY`, once reached, uses external sort.
For example, if set to `0.6`, `ORDER BY` will allow to use `60%` of available memory (to server/user/merges) at the beginning of the execution, after that, it will start using external sort.
Default value: `0.0`.
:::note
- You cannot use both `max_bytes_ratio_before_external_sort` and `max_bytes_before_external_sort`
- **The algorithm is experimental and subject to change**
:::
## max_rows_to_sort {#max-rows-to-sort}
A maximum number of rows before sorting. This allows you to limit memory consumption when sorting.

View File

@ -374,8 +374,9 @@ The aggregation can be performed more effectively, if a table is sorted by some
You can enable dumping temporary data to the disk to restrict memory usage during `GROUP BY`.
The [max_bytes_before_external_group_by](../../../operations/settings/query-complexity.md#settings-max_bytes_before_external_group_by) setting determines the threshold RAM consumption for dumping `GROUP BY` temporary data to the file system. If set to 0 (the default), it is disabled.
Alternatively, you can set [max_bytes_ratio_before_external_group_by](../../../operations/settings/query-complexity.md#settings-max_bytes_ratio_before_external_group_by), which allows to use `GROUP BY` in external memory only once the query reaches certain threshold of used memory.
When using `max_bytes_before_external_group_by`, we recommend that you set `max_memory_usage` about twice as high. This is necessary because there are two stages to aggregation: reading the data and forming intermediate data (1) and merging the intermediate data (2). Dumping data to the file system can only occur during stage 1. If the temporary data wasnt dumped, then stage 2 might require up to the same amount of memory as in stage 1.
When using `max_bytes_before_external_group_by`, we recommend that you set `max_memory_usage` about twice as high (or `max_bytes_ratio_before_external_group_by=0.5`). This is necessary because there are two stages to aggregation: reading the data and forming intermediate data (1) and merging the intermediate data (2). Dumping data to the file system can only occur during stage 1. If the temporary data wasnt dumped, then stage 2 might require up to the same amount of memory as in stage 1.
For example, if [max_memory_usage](../../../operations/settings/query-complexity.md#settings_max_memory_usage) was set to 10000000000 and you want to use external aggregation, it makes sense to set `max_bytes_before_external_group_by` to 10000000000, and `max_memory_usage` to 20000000000. When external aggregation is triggered (if there was at least one dump of temporary data), maximum consumption of RAM is only slightly more than `max_bytes_before_external_group_by`.

View File

@ -164,7 +164,9 @@ If the query omits the `DISTINCT`, `GROUP BY` and `ORDER BY` clauses and the `IN
- `max_rows_in_join`
- `max_bytes_in_join`
- `max_bytes_before_external_sort`
- `max_bytes_ratio_before_external_sort`
- `max_bytes_before_external_group_by`
- `max_bytes_ratio_before_external_group_by`
For more information, see the section “Settings”. It is possible to use external sorting (saving temporary tables to a disk) and external aggregation.

View File

@ -0,0 +1,39 @@
#include <algorithm>
#include <limits>
#include <Common/CurrentThread.h>
#include <Common/MemoryTrackerUtils.h>
std::optional<UInt64> getMostStrictAvailableSystemMemory()
{
MemoryTracker * query_memory_tracker;
if (query_memory_tracker = DB::CurrentThread::getMemoryTracker(); !query_memory_tracker)
return {};
/// query-level memory tracker
if (query_memory_tracker = query_memory_tracker->getParent(); !query_memory_tracker)
return {};
Int64 available = std::numeric_limits<Int64>::max();
MemoryTracker * system_memory_tracker = query_memory_tracker->getParent();
while (system_memory_tracker)
{
if (Int64 tracker_limit = system_memory_tracker->getHardLimit(); tracker_limit > 0)
{
Int64 tracker_used = system_memory_tracker->get();
Int64 tracker_available = std::clamp<Int64>(tracker_limit - tracker_used, 0, std::numeric_limits<Int64>::max());
available = std::min(available, tracker_available);
}
system_memory_tracker = system_memory_tracker->getParent();
}
if (available == std::numeric_limits<Int64>::max())
return {};
return available;
}
Int64 getCurrentQueryMemoryUsage()
{
/// Use query-level memory tracker
if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
return memory_tracker->get();
return 0;
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <optional>
#include <base/types.h>
/// Return most strict (by hard limit) system (non query-level, i.e. server/user/merges/...) memory limit
std::optional<UInt64> getMostStrictAvailableSystemMemory();
/// Return current query tracked memory usage
Int64 getCurrentQueryMemoryUsage();

View File

@ -2314,6 +2314,9 @@ What to do when the limit is exceeded.
)", 0) \
DECLARE(UInt64, max_bytes_before_external_group_by, 0, R"(
If memory usage during GROUP BY operation is exceeding this threshold in bytes, activate the 'external aggregation' mode (spill data to disk). Recommended value is half of the available system memory.
)", 0) \
DECLARE(Double, max_bytes_ratio_before_external_group_by, 0., R"(
Ratio of used memory before enabling external GROUP BY. If you set it to 0.6 the external GROUP BY will be used once the memory usage will reach 60% of allowed memory for query.
)", 0) \
\
DECLARE(UInt64, max_rows_to_sort, 0, R"(
@ -2330,6 +2333,9 @@ Prefer maximum block bytes for external sort, reduce the memory usage during mer
)", 0) \
DECLARE(UInt64, max_bytes_before_external_sort, 0, R"(
If memory usage during ORDER BY operation is exceeding this threshold in bytes, activate the 'external sorting' mode (spill data to disk). Recommended value is half of the available system memory.
)", 0) \
DECLARE(Double, max_bytes_ratio_before_external_sort, 0., R"(
Ratio of used memory before enabling external ORDER BY. If you set it to 0.6 the external ORDER BY will be used once the memory usage will reach 60% of allowed memory for query.
)", 0) \
DECLARE(UInt64, max_bytes_before_remerge_sort, 1000000000, R"(
In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.

View File

@ -63,6 +63,8 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"max_size_to_preallocate_for_aggregation", 100'000'000, 1'000'000'000'000, "Enable optimisation for bigger tables."},
{"max_size_to_preallocate_for_joins", 100'000'000, 1'000'000'000'000, "Enable optimisation for bigger tables."},
{"parallel_replicas_index_analysis_only_on_coordinator", false, true, "Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan"},
{"max_bytes_ratio_before_external_group_by", 0., 0., "New setting."},
{"max_bytes_ratio_before_external_sort", 0., 0., "New setting."},
{"use_async_executor_for_materialized_views", false, false, "New setting."},
}
},

View File

@ -2,6 +2,7 @@
#include <future>
#include <numeric>
#include <Poco/Util/Application.h>
#include <Core/Settings.h>
#ifdef OS_LINUX
# include <unistd.h>
@ -30,6 +31,7 @@
#include <Common/CurrentThread.h>
#include <Common/JSONBuilder.h>
#include <Common/MemoryTracker.h>
#include <Common/MemoryTrackerUtils.h>
#include <Common/Stopwatch.h>
#include <Common/assert_cast.h>
#include <Common/formatReadable.h>
@ -71,8 +73,25 @@ namespace ErrorCodes
extern const int EMPTY_DATA_PASSED;
extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
namespace Setting
{
extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const SettingsUInt64 max_rows_to_group_by;
extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
extern const SettingsUInt64 max_bytes_before_external_group_by;
extern const SettingsDouble max_bytes_ratio_before_external_group_by;
extern const SettingsMaxThreads max_threads;
extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
extern const SettingsBool compile_aggregate_expressions;
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsUInt64 max_block_size;
extern const SettingsBool enable_software_prefetch_in_aggregation;
extern const SettingsBool optimize_group_by_constant_keys;
};
}
namespace
@ -175,6 +194,84 @@ Block Aggregator::getHeader(bool final) const
return params.getHeader(header, final);
}
Aggregator::Params::Params(
const Settings & settings,
const Names & keys_,
const AggregateDescriptions & aggregates_,
bool overflow_row_,
size_t group_by_two_level_threshold_,
size_t group_by_two_level_threshold_bytes_,
bool empty_result_for_aggregation_by_empty_set_,
TemporaryDataOnDiskScopePtr tmp_data_scope_,
bool only_merge_, // true for projections
const StatsCollectingParams & stats_collecting_params_)
: keys(keys_)
, keys_size(keys.size())
, aggregates(aggregates_)
, aggregates_size(aggregates.size())
, overflow_row(overflow_row_)
, max_rows_to_group_by(settings[Setting::max_rows_to_group_by])
, group_by_overflow_mode(settings[Setting::group_by_overflow_mode])
, group_by_two_level_threshold(group_by_two_level_threshold_)
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
, max_bytes_before_external_group_by(settings[Setting::max_bytes_before_external_group_by])
, empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_)
, tmp_data_scope(std::move(tmp_data_scope_))
, max_threads(settings[Setting::max_threads])
, min_free_disk_space(settings[Setting::min_free_disk_space_for_temporary_data])
, compile_aggregate_expressions(settings[Setting::compile_aggregate_expressions])
, min_count_to_compile_aggregate_expression(settings[Setting::min_count_to_compile_aggregate_expression])
, max_block_size(settings[Setting::max_block_size])
, only_merge(only_merge_)
, enable_prefetch(settings[Setting::enable_software_prefetch_in_aggregation])
, optimize_group_by_constant_keys(settings[Setting::optimize_group_by_constant_keys])
, min_hit_rate_to_use_consecutive_keys_optimization(settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization])
, stats_collecting_params(stats_collecting_params_)
{
if (settings[Setting::max_bytes_ratio_before_external_group_by] != 0.)
{
if (settings[Setting::max_bytes_before_external_group_by] > 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Settings max_bytes_ratio_before_external_group_by and max_bytes_before_external_group_by cannot be set simultaneously");
double ratio = settings[Setting::max_bytes_ratio_before_external_group_by];
if (ratio < 0 || ratio >= 1.)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting max_bytes_ratio_before_external_group_by should be >= 0 and < 1 ({})", ratio);
auto available_system_memory = getMostStrictAvailableSystemMemory();
if (available_system_memory.has_value())
{
max_bytes_before_external_group_by = static_cast<size_t>(*available_system_memory * ratio);
LOG_TEST(getLogger("Aggregator"), "Set max_bytes_before_external_group_by={} (ratio: {}, available system memory: {})",
formatReadableSizeWithBinarySuffix(max_bytes_before_external_group_by),
ratio,
formatReadableSizeWithBinarySuffix(*available_system_memory));
}
else
{
LOG_WARNING(getLogger("Aggregator"), "No system memory limits configured. Ignoring max_bytes_ratio_before_external_group_by");
}
}
}
Aggregator::Params::Params(
const Names & keys_,
const AggregateDescriptions & aggregates_,
bool overflow_row_,
size_t max_threads_,
size_t max_block_size_,
double min_hit_rate_to_use_consecutive_keys_optimization_)
: keys(keys_)
, keys_size(keys.size())
, aggregates(aggregates_)
, aggregates_size(aggregates_.size())
, overflow_row(overflow_row_)
, max_threads(max_threads_)
, max_block_size(max_block_size_)
, only_merge(true)
, min_hit_rate_to_use_consecutive_keys_optimization(min_hit_rate_to_use_consecutive_keys_optimization_)
{
}
Block Aggregator::Params::getHeader(
const Block & header, bool only_merge, const Names & keys, const AggregateDescriptions & aggregates, bool final)
{
@ -338,10 +435,7 @@ Aggregator::Aggregator(const Block & header_, const Params & params_)
, tmp_data(params.tmp_data_scope ? params.tmp_data_scope->childScope(CurrentMetrics::TemporaryFilesForAggregation) : nullptr)
, min_bytes_for_prefetch(getMinBytesForPrefetch())
{
/// Use query-level memory tracker
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
memory_usage_before_aggregation = memory_tracker->get();
memory_usage_before_aggregation = getCurrentQueryMemoryUsage();
aggregate_functions.resize(params.aggregates_size);
for (size_t i = 0; i < params.aggregates_size; ++i)
@ -1474,10 +1568,7 @@ bool Aggregator::executeOnBlock(Columns columns,
}
size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
current_memory_usage = memory_tracker->get();
Int64 current_memory_usage = getCurrentQueryMemoryUsage();
/// Here all the results in the sum are taken into account, from different threads.
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation;
@ -2906,10 +2997,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant.");
size_t result_size = result.sizeWithoutOverflowRow();
Int64 current_memory_usage = 0;
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
current_memory_usage = memory_tracker->get();
Int64 current_memory_usage = getCurrentQueryMemoryUsage();
/// Here all the results in the sum are taken into account, from different threads.
auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation;

View File

@ -96,103 +96,72 @@ public:
struct Params
{
/// What to count.
const Names keys;
Names keys;
size_t keys_size = 0;
const AggregateDescriptions aggregates;
const size_t keys_size;
const size_t aggregates_size;
const size_t aggregates_size = 0;
///
/// The settings of approximate calculation of GROUP BY.
const bool overflow_row; /// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
const size_t max_rows_to_group_by;
const OverflowMode group_by_overflow_mode;
///
/// Do we need to put into AggregatedDataVariants::without_key aggregates for keys that are not in max_rows_to_group_by.
const bool overflow_row = false;
const size_t max_rows_to_group_by = 0;
const OverflowMode group_by_overflow_mode = OverflowMode::THROW;
/// Two-level aggregation settings (used for a large number of keys).
/** With how many keys or the size of the aggregation state in bytes,
* two-level aggregation begins to be used. Enough to reach of at least one of the thresholds.
* 0 - the corresponding threshold is not specified.
*/
size_t group_by_two_level_threshold;
size_t group_by_two_level_threshold_bytes;
/// With how many keys or the size of the aggregation state in bytes,
/// two-level aggregation begins to be used. Enough to reach of at least one of the thresholds.
/// 0 - the corresponding threshold is not specified.
size_t group_by_two_level_threshold = 0;
size_t group_by_two_level_threshold_bytes = 0;
/// Settings to flush temporary data to the filesystem (external aggregation).
const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation.
/// 0 - do not use external aggregation.
size_t max_bytes_before_external_group_by = 0;
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
bool empty_result_for_aggregation_by_empty_set = false;
TemporaryDataOnDiskScopePtr tmp_data_scope;
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
const size_t min_free_disk_space;
bool compile_aggregate_expressions;
size_t min_count_to_compile_aggregate_expression;
size_t max_block_size;
bool only_merge;
bool enable_prefetch;
bool optimize_group_by_constant_keys;
const double min_hit_rate_to_use_consecutive_keys_optimization;
size_t max_threads = 0;
const size_t min_free_disk_space = 0;
bool compile_aggregate_expressions = false;
size_t min_count_to_compile_aggregate_expression = 0;
size_t max_block_size = 0;
bool only_merge = false;
bool enable_prefetch = false;
bool optimize_group_by_constant_keys = false;
const double min_hit_rate_to_use_consecutive_keys_optimization = 0.;
StatsCollectingParams stats_collecting_params;
Params(
const Settings & settings,
const Names & keys_,
const AggregateDescriptions & aggregates_,
bool overflow_row_,
size_t group_by_two_level_threshold_,
size_t group_by_two_level_threshold_bytes_,
bool empty_result_for_aggregation_by_empty_set_,
TemporaryDataOnDiskScopePtr tmp_data_scope_,
bool only_merge_, // true for projections
const StatsCollectingParams & stats_collecting_params_);
/// Only parameters that matter during merge.
Params(
const Names & keys_,
const AggregateDescriptions & aggregates_,
bool overflow_row_,
size_t max_rows_to_group_by_,
OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_,
size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
TemporaryDataOnDiskScopePtr tmp_data_scope_,
size_t max_threads_,
size_t min_free_disk_space_,
bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_,
size_t max_block_size_,
bool enable_prefetch_,
bool only_merge_, // true for projections
bool optimize_group_by_constant_keys_,
double min_hit_rate_to_use_consecutive_keys_optimization_,
const StatsCollectingParams & stats_collecting_params_)
: keys(keys_)
, aggregates(aggregates_)
, keys_size(keys.size())
, aggregates_size(aggregates.size())
, overflow_row(overflow_row_)
, max_rows_to_group_by(max_rows_to_group_by_)
, group_by_overflow_mode(group_by_overflow_mode_)
, group_by_two_level_threshold(group_by_two_level_threshold_)
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
, max_bytes_before_external_group_by(max_bytes_before_external_group_by_)
, empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_)
, tmp_data_scope(std::move(tmp_data_scope_))
, max_threads(max_threads_)
, min_free_disk_space(min_free_disk_space_)
, compile_aggregate_expressions(compile_aggregate_expressions_)
, min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
, max_block_size(max_block_size_)
, only_merge(only_merge_)
, enable_prefetch(enable_prefetch_)
, optimize_group_by_constant_keys(optimize_group_by_constant_keys_)
, min_hit_rate_to_use_consecutive_keys_optimization(min_hit_rate_to_use_consecutive_keys_optimization_)
, stats_collecting_params(stats_collecting_params_)
{
}
double min_hit_rate_to_use_consecutive_keys_optimization_);
/// Only parameters that matter during merge.
Params(const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_, size_t max_block_size_, double min_hit_rate_to_use_consecutive_keys_optimization_)
: Params(
keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, max_block_size_, false, true, false, min_hit_rate_to_use_consecutive_keys_optimization_, {})
Params cloneWithKeys(const Names & keys_, bool only_merge_ = false)
{
Params new_params = *this;
new_params.keys = keys_;
new_params.keys_size = keys_.size();
new_params.only_merge = only_merge_;
return new_params;
}
static Block

View File

@ -117,7 +117,6 @@ namespace Setting
extern const SettingsBool allow_experimental_query_deduplication;
extern const SettingsBool async_socket_for_remote;
extern const SettingsBool collect_hash_table_stats_during_aggregation;
extern const SettingsBool compile_aggregate_expressions;
extern const SettingsBool compile_sort_description;
extern const SettingsBool count_distinct_optimization;
extern const SettingsUInt64 cross_to_inner_join_rewrite;
@ -127,7 +126,6 @@ namespace Setting
extern const SettingsBool empty_result_for_aggregation_by_empty_set;
extern const SettingsBool enable_global_with_statement;
extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
extern const SettingsBool enable_software_prefetch_in_aggregation;
extern const SettingsBool exact_rows_before_limit;
extern const SettingsBool enable_unaligned_array_join;
extern const SettingsBool extremes;
@ -140,7 +138,6 @@ namespace Setting
extern const SettingsSeconds lock_acquire_timeout;
extern const SettingsUInt64 max_analyze_depth;
extern const SettingsUInt64 max_block_size;
extern const SettingsUInt64 max_bytes_before_external_group_by;
extern const SettingsUInt64 max_bytes_in_distinct;
extern const SettingsUInt64 max_columns_to_read;
extern const SettingsUInt64 max_distributed_connections;
@ -158,17 +155,12 @@ namespace Setting
extern const SettingsFloat max_streams_to_max_threads_ratio;
extern const SettingsUInt64 max_subquery_depth;
extern const SettingsMaxThreads max_threads;
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsUInt64 min_count_to_compile_sort_description;
extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const SettingsBool multiple_joins_try_to_keep_original_names;
extern const SettingsBool optimize_aggregation_in_order;
extern const SettingsBool optimize_distinct_in_order;
extern const SettingsBool optimize_group_by_constant_keys;
extern const SettingsBool optimize_move_to_prewhere;
extern const SettingsBool optimize_move_to_prewhere_if_final;
extern const SettingsBool optimize_sorting_by_input_stream_properties;
extern const SettingsBool optimize_uniq_to_count;
extern const SettingsUInt64 parallel_replicas_count;
extern const SettingsString parallel_replicas_custom_key;
@ -2757,27 +2749,15 @@ static Aggregator::Params getAggregatorParams(
return Aggregator::Params
{
settings,
keys,
aggregates,
overflow_row,
settings[Setting::max_rows_to_group_by],
settings[Setting::group_by_overflow_mode],
group_by_two_level_threshold,
group_by_two_level_threshold_bytes,
settings[Setting::max_bytes_before_external_group_by],
settings[Setting::empty_result_for_aggregation_by_empty_set]
|| (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && keys.empty()
&& query_analyzer.hasConstAggregationKeys()),
settings[Setting::empty_result_for_aggregation_by_empty_set] || (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && keys.empty() && query_analyzer.hasConstAggregationKeys()),
context.getTempDataOnDisk(),
settings[Setting::max_threads],
settings[Setting::min_free_disk_space_for_temporary_data],
settings[Setting::compile_aggregate_expressions],
settings[Setting::min_count_to_compile_aggregate_expression],
settings[Setting::max_block_size],
settings[Setting::enable_software_prefetch_in_aggregation],
/* only_merge */ false,
settings[Setting::optimize_group_by_constant_keys],
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
/* only_merge_= */ false,
stats_collecting_params};
}

View File

@ -603,7 +603,17 @@ void MergeJoin::mergeInMemoryRightBlocks()
/// TODO: there should be no split keys by blocks for RIGHT|FULL JOIN
builder.addTransform(std::make_shared<MergeSortingTransform>(
builder.getHeader(), right_sort_description, max_rows_in_right_block, 0, 0, false, 0, 0, 0, nullptr, 0));
builder.getHeader(),
right_sort_description,
max_rows_in_right_block,
/*max_block_bytes=*/0,
/*limit_=*/0,
/*increase_sort_description_compile_attempts=*/false,
/*max_bytes_before_remerge_*/0,
/*remerge_lowered_memory_bytes_ratio_*/0,
/*max_bytes_before_external_sort_*/0,
/*tmp_data_*/nullptr,
/*min_free_disk_space_*/0));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);

View File

@ -98,11 +98,9 @@ namespace Setting
extern const SettingsUInt64 aggregation_memory_efficient_merge_threads;
extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
extern const SettingsBool collect_hash_table_stats_during_aggregation;
extern const SettingsBool compile_aggregate_expressions;
extern const SettingsOverflowMode distinct_overflow_mode;
extern const SettingsBool distributed_aggregation_memory_efficient;
extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
extern const SettingsBool enable_software_prefetch_in_aggregation;
extern const SettingsBool empty_result_for_aggregation_by_constant_keys_on_empty_set;
extern const SettingsBool empty_result_for_aggregation_by_empty_set;
extern const SettingsBool exact_rows_before_limit;
@ -112,7 +110,6 @@ namespace Setting
extern const SettingsUInt64 group_by_two_level_threshold;
extern const SettingsUInt64 group_by_two_level_threshold_bytes;
extern const SettingsBool group_by_use_nulls;
extern const SettingsUInt64 max_bytes_before_external_group_by;
extern const SettingsUInt64 max_bytes_in_distinct;
extern const SettingsUInt64 max_block_size;
extern const SettingsUInt64 max_size_to_preallocate_for_aggregation;
@ -120,12 +117,7 @@ namespace Setting
extern const SettingsUInt64 max_rows_in_distinct;
extern const SettingsUInt64 max_rows_to_group_by;
extern const SettingsMaxThreads max_threads;
extern const SettingsUInt64 min_count_to_compile_aggregate_expression;
extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
extern const SettingsBool optimize_distinct_in_order;
extern const SettingsBool optimize_group_by_constant_keys;
extern const SettingsBool optimize_sorting_by_input_stream_properties;
extern const SettingsBool parallel_replicas_allow_in_with_subquery;
extern const SettingsString parallel_replicas_custom_key;
extern const SettingsUInt64 parallel_replicas_min_number_of_rows_per_replica;
@ -437,27 +429,15 @@ Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context
}
Aggregator::Params aggregator_params = Aggregator::Params(
settings,
aggregation_analysis_result.aggregation_keys,
aggregate_descriptions,
query_analysis_result.aggregate_overflow_row,
settings[Setting::max_rows_to_group_by],
settings[Setting::group_by_overflow_mode],
settings[Setting::group_by_two_level_threshold],
settings[Setting::group_by_two_level_threshold_bytes],
settings[Setting::max_bytes_before_external_group_by],
settings[Setting::empty_result_for_aggregation_by_empty_set]
|| (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && aggregation_analysis_result.aggregation_keys.empty()
&& aggregation_analysis_result.group_by_with_constant_keys),
settings[Setting::empty_result_for_aggregation_by_empty_set] || (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && aggregation_analysis_result.aggregation_keys.empty() && aggregation_analysis_result.group_by_with_constant_keys),
query_context->getTempDataOnDisk(),
settings[Setting::max_threads],
settings[Setting::min_free_disk_space_for_temporary_data],
settings[Setting::compile_aggregate_expressions],
settings[Setting::min_count_to_compile_aggregate_expression],
settings[Setting::max_block_size],
settings[Setting::enable_software_prefetch_in_aggregation],
/* only_merge */ false,
settings[Setting::optimize_group_by_constant_keys],
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
stats_collecting_params);
return aggregator_params;

View File

@ -266,30 +266,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
Processors processors;
for (size_t i = 0; i < grouping_sets_size; ++i)
{
Aggregator::Params params_for_set
{
grouping_sets_params[i].used_keys,
transform_params->params.aggregates,
transform_params->params.overflow_row,
transform_params->params.max_rows_to_group_by,
transform_params->params.group_by_overflow_mode,
transform_params->params.group_by_two_level_threshold,
transform_params->params.group_by_two_level_threshold_bytes,
transform_params->params.max_bytes_before_external_group_by,
transform_params->params.empty_result_for_aggregation_by_empty_set,
transform_params->params.tmp_data_scope,
transform_params->params.max_threads,
transform_params->params.min_free_disk_space,
transform_params->params.compile_aggregate_expressions,
transform_params->params.min_count_to_compile_aggregate_expression,
transform_params->params.max_block_size,
transform_params->params.enable_prefetch,
/* only_merge */ false,
transform_params->params.optimize_group_by_constant_keys,
transform_params->params.min_hit_rate_to_use_consecutive_keys_optimization,
transform_params->params.stats_collecting_params,
};
Aggregator::Params params_for_set = transform_params->params.cloneWithKeys(grouping_sets_params[i].used_keys, false);
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final);
if (streams > 1)

View File

@ -8,6 +8,7 @@
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/QueryPlan/BufferChunksTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/MemoryTrackerUtils.h>
#include <Common/JSONBuilder.h>
#include <Core/Settings.h>
@ -27,6 +28,7 @@ namespace Setting
{
extern const SettingsUInt64 max_block_size;
extern const SettingsUInt64 max_bytes_before_external_sort;
extern const SettingsDouble max_bytes_ratio_before_external_sort;
extern const SettingsUInt64 max_bytes_before_remerge_sort;
extern const SettingsUInt64 max_bytes_to_sort;
extern const SettingsUInt64 max_rows_to_sort;
@ -40,6 +42,7 @@ namespace Setting
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
SortingStep::Settings::Settings(const Context & context)
@ -54,6 +57,30 @@ SortingStep::Settings::Settings(const Context & context)
min_free_disk_space = settings[Setting::min_free_disk_space_for_temporary_data];
max_block_bytes = settings[Setting::prefer_external_sort_block_bytes];
read_in_order_use_buffering = settings[Setting::read_in_order_use_buffering];
if (settings[Setting::max_bytes_ratio_before_external_sort] != 0.)
{
if (settings[Setting::max_bytes_before_external_sort] > 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Settings max_bytes_ratio_before_external_sort and max_bytes_before_external_sort cannot be set simultaneously");
double ratio = settings[Setting::max_bytes_ratio_before_external_sort];
if (ratio < 0 || ratio >= 1.)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting max_bytes_ratio_before_external_sort should be >= 0 and < 1 ({})", ratio);
auto available_system_memory = getMostStrictAvailableSystemMemory();
if (available_system_memory.has_value())
{
max_bytes_before_external_sort = static_cast<size_t>(*available_system_memory * ratio);
LOG_TEST(getLogger("SortingStep"), "Set max_bytes_before_external_sort={} (ratio: {}, available system memory: {})",
formatReadableSizeWithBinarySuffix(max_bytes_before_external_sort),
ratio,
formatReadableSizeWithBinarySuffix(*available_system_memory));
}
else
{
LOG_WARNING(getLogger("SortingStep"), "No system memory limits configured. Ignoring max_bytes_ratio_before_external_sort");
}
}
}
SortingStep::Settings::Settings(size_t max_block_size_)

View File

@ -12,6 +12,7 @@ namespace Setting
extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
extern const SettingsUInt64 max_block_size;
extern const SettingsUInt64 max_bytes_before_external_group_by;
extern const SettingsDouble max_bytes_ratio_before_external_group_by;
extern const SettingsUInt64 max_rows_to_group_by;
extern const SettingsMaxThreads max_threads;
extern const SettingsUInt64 min_chunk_bytes_for_parallel_parsing;
@ -42,26 +43,16 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
const Settings & settings = storage_.getContext()->getSettingsRef();
Aggregator::Params params(
settings,
keys,
aggregates,
false,
settings[Setting::max_rows_to_group_by],
settings[Setting::group_by_overflow_mode],
/*group_by_two_level_threshold*/ 0,
/*group_by_two_level_threshold_bytes*/ 0,
settings[Setting::max_bytes_before_external_group_by],
/*overflow_row_=*/ false,
/*group_by_two_level_threshold_=*/ 0,
/*group_by_two_level_threshold_bytes_=*/ 0,
settings[Setting::empty_result_for_aggregation_by_empty_set],
storage_.getContext()->getTempDataOnDisk(),
settings[Setting::max_threads],
settings[Setting::min_free_disk_space_for_temporary_data],
settings[Setting::compile_aggregate_expressions],
settings[Setting::min_count_to_compile_aggregate_expression],
settings[Setting::max_block_size],
settings[Setting::enable_software_prefetch_in_aggregation],
/*only_merge=*/false,
settings[Setting::optimize_group_by_constant_keys],
settings[Setting::min_chunk_bytes_for_parallel_parsing],
/*stats_collecting_params=*/{});
/*only_merge_=*/false,
/*stats_collecting_params_=*/{});
aggregator = std::make_unique<Aggregator>(header, params);

View File

@ -0,0 +1,2 @@
---
max_server_memory_usage: 10Gi

View File

@ -0,0 +1,87 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node_server = cluster.add_instance(
"node_server", main_configs=["config.d/memory_overrides.yaml"]
)
node_user = cluster.add_instance(
"node_user", user_configs=["users.d/memory_overrides.yaml"]
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.mark.parametrize(
"node",
[
pytest.param(node_server, id="server"),
pytest.param(node_user, id="user"),
],
)
def test_max_bytes_ratio_before_external_group_by(node):
if node.is_built_with_thread_sanitizer():
pytest.skip("TSan build is skipped due to memory overhead")
# Peak memory usage: 15-16GiB
query = """
SELECT
uniqExact(number::String),
uniqExact((number,number))
FROM numbers(100e6) GROUP BY (number%1000)::String FORMAT Null
"""
settings = {
"max_bytes_before_external_group_by": 0,
"max_memory_usage": "0",
"max_bytes_ratio_before_external_group_by": 0.3,
}
node.query(query, settings=settings)
settings["max_bytes_ratio_before_external_group_by"] = 0
with pytest.raises(QueryRuntimeException):
node.query(query, settings=settings)
@pytest.mark.parametrize(
"node",
[
pytest.param(node_server, id="server"),
pytest.param(node_user, id="user"),
],
)
def test_max_bytes_ratio_before_external_sort(node):
if node.is_built_with_thread_sanitizer():
pytest.skip("TSan build is skipped due to memory overhead")
# Peak memory usage: 12GiB (each column in ORDER BY eats ~2GiB)
query = """
SELECT number FROM numbers(100e6) ORDER BY (
number::String,
(number+1)::String,
(number+2)::String,
(number+3)::String,
(number+4)::String
) FORMAT Null
"""
settings = {
"max_bytes_before_external_sort": 0,
"max_memory_usage": "0",
"max_bytes_ratio_before_external_sort": 0.3,
}
node.query(query, settings=settings)
settings["max_bytes_ratio_before_external_sort"] = 0
with pytest.raises(QueryRuntimeException):
node.query(query, settings=settings)

View File

@ -0,0 +1,3 @@
profiles:
default:
max_memory_usage_for_user: 10Gi

View File

@ -0,0 +1,3 @@
SELECT uniqExact(number::String) FROM numbers(10e6) GROUP BY (number%100)::String FORMAT Null SETTINGS max_bytes_ratio_before_external_group_by=-0.1; -- { serverError BAD_ARGUMENTS }
SELECT uniqExact(number::String) FROM numbers(10e6) GROUP BY (number%100)::String FORMAT Null SETTINGS max_bytes_ratio_before_external_group_by=1; -- { serverError BAD_ARGUMENTS }
SELECT uniqExact(number::String) FROM numbers(10e6) GROUP BY (number%100)::String FORMAT Null SETTINGS max_bytes_ratio_before_external_group_by=0.5, max_bytes_before_external_group_by=1; -- { serverError BAD_ARGUMENTS }

View File

@ -0,0 +1,3 @@
select number from numbers(100e6) order by number format Null settings max_bytes_ratio_before_external_sort=-0.1; -- { serverError BAD_ARGUMENTS }
select number from numbers(100e6) order by number format Null settings max_bytes_ratio_before_external_sort=1; -- { serverError BAD_ARGUMENTS }
select number from numbers(100e6) order by number format Null settings max_bytes_ratio_before_external_sort=0.5, max_bytes_before_external_sort=1; -- { serverError BAD_ARGUMENTS }