mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #48787 from ClickHouse/background-memory-tracker
Add MemoryTracker for the background tasks [Resubmit]
This commit is contained in:
commit
a8bdb20fc4
@ -1045,7 +1045,7 @@ Default value: `0`.
|
||||
|
||||
## background_pool_size {#background_pool_size}
|
||||
|
||||
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
|
||||
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
|
||||
|
||||
Before changing it, please also take a look at related MergeTree settings, such as [number_of_free_entries_in_pool_to_lower_max_size_of_merge](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-lower-max-size-of-merge) and [number_of_free_entries_in_pool_to_execute_mutation](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-execute-mutation).
|
||||
|
||||
@ -1063,8 +1063,8 @@ Default value: 16.
|
||||
|
||||
## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio}
|
||||
|
||||
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and
|
||||
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
|
||||
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and
|
||||
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
|
||||
The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility.
|
||||
|
||||
Possible values:
|
||||
@ -1079,6 +1079,33 @@ Default value: 2.
|
||||
<background_merges_mutations_concurrency_ratio>3</background_merges_mutations_concurrency_ratio>
|
||||
```
|
||||
|
||||
## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit}
|
||||
|
||||
Sets the limit on how much RAM is allowed to use for performing merge and mutation operations.
|
||||
Zero means unlimited.
|
||||
If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks.
|
||||
|
||||
Possible values:
|
||||
|
||||
- Any positive integer.
|
||||
|
||||
**Example**
|
||||
|
||||
```xml
|
||||
<merges_mutations_memory_usage_soft_limit>0</merges_mutations_memory_usage_soft_limit>
|
||||
```
|
||||
|
||||
## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio}
|
||||
|
||||
The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`.
|
||||
|
||||
Default value: `0.5`.
|
||||
|
||||
**See also**
|
||||
|
||||
- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage)
|
||||
- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit)
|
||||
|
||||
## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy}
|
||||
|
||||
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.
|
||||
|
@ -130,6 +130,7 @@ namespace CurrentMetrics
|
||||
extern const Metric Revision;
|
||||
extern const Metric VersionInteger;
|
||||
extern const Metric MemoryTracking;
|
||||
extern const Metric MergesMutationsMemoryTracking;
|
||||
extern const Metric MaxDDLEntryID;
|
||||
extern const Metric MaxPushedDDLEntryID;
|
||||
}
|
||||
@ -1225,6 +1226,25 @@ try
|
||||
total_memory_tracker.setDescription("(total)");
|
||||
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
|
||||
|
||||
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
|
||||
|
||||
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
|
||||
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
|
||||
{
|
||||
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
|
||||
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
|
||||
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
|
||||
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
|
||||
formatReadableSizeWithBinarySuffix(memory_amount),
|
||||
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
|
||||
}
|
||||
|
||||
LOG_INFO(log, "Merges and mutations memory limit is set to {}",
|
||||
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit));
|
||||
background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit);
|
||||
background_memory_tracker.setDescription("(background)");
|
||||
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
|
||||
|
||||
total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory);
|
||||
|
||||
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
|
||||
|
@ -53,6 +53,7 @@
|
||||
M(QueryThread, "Number of query processing threads") \
|
||||
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
|
||||
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
|
||||
M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \
|
||||
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
|
||||
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
|
||||
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \
|
||||
|
@ -96,12 +96,17 @@ using namespace std::chrono_literals;
|
||||
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
|
||||
|
||||
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
|
||||
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false);
|
||||
|
||||
std::atomic<Int64> MemoryTracker::free_memory_in_allocator_arenas;
|
||||
|
||||
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
|
||||
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
|
||||
|
||||
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_)
|
||||
: parent(parent_)
|
||||
, log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_)
|
||||
, level(level_)
|
||||
{}
|
||||
|
||||
MemoryTracker::~MemoryTracker()
|
||||
{
|
||||
@ -528,3 +533,10 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
|
||||
while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value))
|
||||
;
|
||||
}
|
||||
|
||||
bool canEnqueueBackgroundTask()
|
||||
{
|
||||
auto limit = background_memory_tracker.getSoftLimit();
|
||||
auto amount = background_memory_tracker.get();
|
||||
return limit == 0 || amount < limit;
|
||||
}
|
||||
|
@ -98,6 +98,7 @@ public:
|
||||
|
||||
explicit MemoryTracker(VariableContext level_ = VariableContext::Thread);
|
||||
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
|
||||
MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_);
|
||||
|
||||
~MemoryTracker();
|
||||
|
||||
@ -110,6 +111,22 @@ public:
|
||||
return amount.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
// Merges and mutations may pass memory ownership to other threads thus in the end of execution
|
||||
// MemoryTracker for background task may have a non-zero counter.
|
||||
// This method is intended to fix the counter inside of background_memory_tracker.
|
||||
// NOTE: We can't use alloc/free methods to do it, because they also will change the value inside
|
||||
// of total_memory_tracker.
|
||||
void adjustOnBackgroundTaskEnd(const MemoryTracker * child)
|
||||
{
|
||||
auto background_memory_consumption = child->amount.load(std::memory_order_relaxed);
|
||||
amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed);
|
||||
|
||||
// Also fix CurrentMetrics::MergesMutationsMemoryTracking
|
||||
auto metric_loaded = metric.load(std::memory_order_relaxed);
|
||||
if (metric_loaded != CurrentMetrics::end())
|
||||
CurrentMetrics::sub(metric_loaded, background_memory_consumption);
|
||||
}
|
||||
|
||||
Int64 getPeak() const
|
||||
{
|
||||
return peak.load(std::memory_order_relaxed);
|
||||
@ -220,3 +237,6 @@ public:
|
||||
};
|
||||
|
||||
extern MemoryTracker total_memory_tracker;
|
||||
extern MemoryTracker background_memory_tracker;
|
||||
|
||||
bool canEnqueueBackgroundTask();
|
||||
|
@ -42,6 +42,8 @@ namespace DB
|
||||
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
|
||||
M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \
|
||||
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \
|
||||
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \
|
||||
M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \
|
||||
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
|
||||
\
|
||||
M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \
|
||||
|
@ -84,6 +84,7 @@ ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_contex
|
||||
group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
|
||||
group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
|
||||
group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
|
||||
group->memory_tracker.setParent(&background_memory_tracker);
|
||||
if (settings.memory_tracker_fault_probability > 0.0)
|
||||
group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);
|
||||
|
||||
|
@ -80,4 +80,9 @@ MergeInfo MergeListElement::getInfo() const
|
||||
return res;
|
||||
}
|
||||
|
||||
MergeListElement::~MergeListElement()
|
||||
{
|
||||
background_memory_tracker.adjustOnBackgroundTaskEnd(&getMemoryTracker());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -115,6 +115,8 @@ struct MergeListElement : boost::noncopyable
|
||||
MergeListElement * ptr() { return this; }
|
||||
|
||||
MergeListElement & ref() { return *this; }
|
||||
|
||||
~MergeListElement();
|
||||
};
|
||||
|
||||
/** Maintains a list of currently running merges.
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <base/sort.h>
|
||||
#include <Backups/BackupEntriesCollector.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/ProfileEventsScope.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
@ -42,6 +43,7 @@
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
#include <fmt/core.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -918,7 +920,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
|
||||
|
||||
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
|
||||
|
||||
if (partition_id.empty())
|
||||
if (!canEnqueueBackgroundTask())
|
||||
{
|
||||
if (out_disable_reason)
|
||||
*out_disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})",
|
||||
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()),
|
||||
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()));
|
||||
}
|
||||
else if (partition_id.empty())
|
||||
{
|
||||
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
|
||||
bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool;
|
||||
|
@ -5,6 +5,7 @@
|
||||
|
||||
#include <base/hex.h>
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/ProfileEventsScope.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
@ -3250,7 +3251,14 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
|
||||
|
||||
auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
|
||||
size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations;
|
||||
if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
|
||||
if (!canEnqueueBackgroundTask())
|
||||
{
|
||||
LOG_TRACE(log, "Reached memory limit for the background tasks ({}), so won't select new parts to merge or mutate."
|
||||
"Current background tasks memory usage: {}.",
|
||||
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()),
|
||||
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()));
|
||||
}
|
||||
else if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
|
||||
{
|
||||
LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})"
|
||||
" is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.",
|
||||
|
39
tests/integration/test_merges_memory_limit/test.py
Normal file
39
tests/integration/test_merges_memory_limit/test.py
Normal file
@ -0,0 +1,39 @@
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance("node")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_memory_limit_success():
|
||||
node.query(
|
||||
"CREATE TABLE test_merge_oom ENGINE=AggregatingMergeTree ORDER BY id EMPTY AS SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(20000)"
|
||||
)
|
||||
node.query("SYSTEM STOP MERGES test_merge_oom")
|
||||
node.query(
|
||||
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
|
||||
)
|
||||
node.query(
|
||||
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
|
||||
)
|
||||
node.query(
|
||||
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(3000)"
|
||||
)
|
||||
|
||||
_, error = node.query_and_get_answer_with_error(
|
||||
"SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL"
|
||||
)
|
||||
|
||||
assert not error
|
||||
node.query("DROP TABLE test_merge_oom")
|
Loading…
Reference in New Issue
Block a user