From cf5d9a175ae7c61ad0a7293fd49a86a4f7e09f01 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Fri, 14 Apr 2023 16:34:19 +0200 Subject: [PATCH 1/4] Revert "Merge pull request #48760 from ClickHouse/revert-46089-background-memory-tracker" This reverts commit a61ed332239e4a35af1b9cd31479560da6f08cca, reversing changes made to 5f01b8a2b59f6f340c65bfbc4ed0dd655d997ff9. --- .../settings.md | 33 ++++++++++++++-- programs/server/Server.cpp | 20 ++++++++++ src/Common/CurrentMetrics.cpp | 1 + src/Common/MemoryTracker.cpp | 8 ++++ src/Common/MemoryTracker.h | 19 ++++++++++ src/Core/ServerSettings.h | 2 + src/Interpreters/ThreadStatusExt.cpp | 1 + src/Storages/MergeTree/MergeList.cpp | 5 +++ src/Storages/MergeTree/MergeList.h | 2 + src/Storages/StorageMergeTree.cpp | 11 +++++- src/Storages/StorageReplicatedMergeTree.cpp | 10 ++++- .../test_merges_memory_limit/__init__.py | 0 .../test_merges_memory_limit/test.py | 38 +++++++++++++++++++ 13 files changed, 145 insertions(+), 5 deletions(-) create mode 100644 tests/integration/test_merges_memory_limit/__init__.py create mode 100644 tests/integration/test_merges_memory_limit/test.py diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index 7c97d0ab640..a9f0cc276ff 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -1045,7 +1045,7 @@ Default value: `0`. ## background_pool_size {#background_pool_size} -Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance. +Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance. Before changing it, please also take a look at related MergeTree settings, such as [number_of_free_entries_in_pool_to_lower_max_size_of_merge](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-lower-max-size-of-merge) and [number_of_free_entries_in_pool_to_execute_mutation](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-execute-mutation). @@ -1063,8 +1063,8 @@ Default value: 16. ## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio} -Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and -`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server. +Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and +`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server. The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility. Possible values: @@ -1079,6 +1079,33 @@ Default value: 2. 3 ``` +## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit} + +Sets the limit on how much RAM is allowed to use for performing merge and mutation operations. +Zero means unlimited. +If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks. + +Possible values: + +- Any positive integer. + +**Example** + +```xml +0 +``` + +## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio} + +The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`. + +Default value: `0.5`. + +**See also** + +- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage) +- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit) + ## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy} Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart. diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 8c0d50bae55..cba7a4c4778 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -135,6 +135,7 @@ namespace CurrentMetrics extern const Metric Revision; extern const Metric VersionInteger; extern const Metric MemoryTracking; + extern const Metric MergesMutationsMemoryTracking; extern const Metric MaxDDLEntryID; extern const Metric MaxPushedDDLEntryID; } @@ -1225,6 +1226,25 @@ try total_memory_tracker.setDescription("(total)"); total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking); + size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit; + + size_t default_merges_mutations_server_memory_usage = static_cast(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio); + if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage) + { + merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage; + LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}" + " ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)", + formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit), + formatReadableSizeWithBinarySuffix(memory_amount), + server_settings_.merges_mutations_memory_usage_to_ram_ratio); + } + + LOG_INFO(log, "Merges and mutations memory limit is set to {}", + formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit)); + background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit); + background_memory_tracker.setDescription("(background)"); + background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking); + total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory); auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker(); diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 81c1481e2de..ea248d996a7 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -53,6 +53,7 @@ M(QueryThread, "Number of query processing threads") \ M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \ M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \ + M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \ M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \ M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \ M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \ diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 674d8d469af..9bff365483d 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -96,6 +96,7 @@ using namespace std::chrono_literals; static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; MemoryTracker total_memory_tracker(nullptr, VariableContext::Global); +MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User); std::atomic MemoryTracker::free_memory_in_allocator_arenas; @@ -528,3 +529,10 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value) while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value)) ; } + +bool canEnqueueBackgroundTask() +{ + auto limit = background_memory_tracker.getSoftLimit(); + auto amount = background_memory_tracker.get(); + return limit == 0 || amount < limit; +} diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 0d7748856bd..260005fd536 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -110,6 +110,22 @@ public: return amount.load(std::memory_order_relaxed); } + // Merges and mutations may pass memory ownership to other threads thus in the end of execution + // MemoryTracker for background task may have a non-zero counter. + // This method is intended to fix the counter inside of background_memory_tracker. + // NOTE: We can't use alloc/free methods to do it, because they also will change the value inside + // of total_memory_tracker. + void adjustOnBackgroundTaskEnd(const MemoryTracker * child) + { + auto background_memory_consumption = child->amount.load(std::memory_order_relaxed); + amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed); + + // Also fix CurrentMetrics::MergesMutationsMemoryTracking + auto metric_loaded = metric.load(std::memory_order_relaxed); + if (metric_loaded != CurrentMetrics::end()) + CurrentMetrics::sub(metric_loaded, background_memory_consumption); + } + Int64 getPeak() const { return peak.load(std::memory_order_relaxed); @@ -220,3 +236,6 @@ public: }; extern MemoryTracker total_memory_tracker; +extern MemoryTracker background_memory_tracker; + +bool canEnqueueBackgroundTask(); diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index aabc89cc6d7..2d8c37783db 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -40,6 +40,8 @@ namespace DB M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \ M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \ M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \ + M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \ + M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \ M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \ \ M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \ diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index fd4a6b5e996..559652fe56c 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -84,6 +84,7 @@ ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_contex group->memory_tracker.setProfilerStep(settings.memory_profiler_step); group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability); group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator); + group->memory_tracker.setParent(&background_memory_tracker); if (settings.memory_tracker_fault_probability > 0.0) group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability); diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 0bf662921ad..d54079bc7a5 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -78,4 +78,9 @@ MergeInfo MergeListElement::getInfo() const return res; } +MergeListElement::~MergeListElement() +{ + background_memory_tracker.adjustOnBackgroundTaskEnd(&getMemoryTracker()); +} + } diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index 9c8c2ebd1e4..d8271a66b45 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -113,6 +113,8 @@ struct MergeListElement : boost::noncopyable MergeListElement * ptr() { return this; } MergeListElement & ref() { return *this; } + + ~MergeListElement(); }; /** Maintains a list of currently running merges. diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 34bf5d55270..6cb3ce35e5b 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,7 @@ #include #include #include +#include namespace DB { @@ -918,7 +920,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT; - if (partition_id.empty()) + if (!canEnqueueBackgroundTask()) + { + if (out_disable_reason) + *out_disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})", + formatReadableSizeWithBinarySuffix(background_memory_tracker.get()), + formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit())); + } + else if (partition_id.empty()) { UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 5cd02c33d55..4a36cf03c2a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -3223,7 +3224,14 @@ void StorageReplicatedMergeTree::mergeSelectingTask() auto merges_and_mutations_queued = queue.countMergesAndPartMutations(); size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations; - if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue) + if (!canEnqueueBackgroundTask()) + { + LOG_TRACE(log, "Reached memory limit for the background tasks ({}), so won't select new parts to merge or mutate." + "Current background tasks memory usage: {}.", + formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()), + formatReadableSizeWithBinarySuffix(background_memory_tracker.get())); + } + else if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue) { LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})" " is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.", diff --git a/tests/integration/test_merges_memory_limit/__init__.py b/tests/integration/test_merges_memory_limit/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_merges_memory_limit/test.py b/tests/integration/test_merges_memory_limit/test.py new file mode 100644 index 00000000000..04729f3a01c --- /dev/null +++ b/tests/integration/test_merges_memory_limit/test.py @@ -0,0 +1,38 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node = cluster.add_instance("node") + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_memory_limit_success(): + node.query( + "CREATE TABLE test_merge_oom ENGINE=AggregatingMergeTree ORDER BY id EMPTY AS SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(20000)" + ) + node.query("SYSTEM STOP MERGES test_merge_oom") + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" + ) + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" + ) + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" + ) + _, error = node.query_and_get_answer_with_error( + "SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL" + ) + + assert not error + node.query("DROP TABLE test_merge_oom") From a1f21a5fc427e860199aa9e1db3c1c4177a340f7 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Fri, 14 Apr 2023 16:00:32 +0200 Subject: [PATCH 2/4] Do not log peak for background memory tracker --- src/Common/MemoryTracker.cpp | 8 ++++++-- src/Common/MemoryTracker.h | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 9bff365483d..81cac2617c5 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -96,13 +96,17 @@ using namespace std::chrono_literals; static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; MemoryTracker total_memory_tracker(nullptr, VariableContext::Global); -MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User); +MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false); std::atomic MemoryTracker::free_memory_in_allocator_arenas; MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {} MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {} - +MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_) + : parent(parent_) + , log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_) + , level(level_) +{} MemoryTracker::~MemoryTracker() { diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 260005fd536..4e29d40c953 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -98,6 +98,7 @@ public: explicit MemoryTracker(VariableContext level_ = VariableContext::Thread); explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread); + MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_); ~MemoryTracker(); From 6f501f983ed0f9d63df4abdbbe25afe64283776b Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Mon, 17 Apr 2023 17:47:03 +0200 Subject: [PATCH 3/4] Update test.py --- tests/integration/test_merges_memory_limit/test.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/integration/test_merges_memory_limit/test.py b/tests/integration/test_merges_memory_limit/test.py index 04729f3a01c..22bf143f4f7 100644 --- a/tests/integration/test_merges_memory_limit/test.py +++ b/tests/integration/test_merges_memory_limit/test.py @@ -27,9 +27,6 @@ def test_memory_limit_success(): node.query( "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" ) - node.query( - "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" - ) _, error = node.query_and_get_answer_with_error( "SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL" ) From 93e8ecf4ffe6b5766792c2316fc024b4f9c05f84 Mon Sep 17 00:00:00 2001 From: Dmitry Novik Date: Wed, 3 May 2023 01:33:26 +0200 Subject: [PATCH 4/4] Update test --- tests/integration/test_merges_memory_limit/test.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_merges_memory_limit/test.py b/tests/integration/test_merges_memory_limit/test.py index 22bf143f4f7..e663f3280cc 100644 --- a/tests/integration/test_merges_memory_limit/test.py +++ b/tests/integration/test_merges_memory_limit/test.py @@ -22,11 +22,15 @@ def test_memory_limit_success(): ) node.query("SYSTEM STOP MERGES test_merge_oom") node.query( - "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)" ) node.query( - "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)" + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)" ) + node.query( + "INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)" + ) + _, error = node.query_and_get_answer_with_error( "SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL" )