Revert "Merge pull request #48760 from ClickHouse/revert-46089-background-memory-tracker"

This reverts commit a61ed33223, reversing
changes made to 5f01b8a2b5.
This commit is contained in:
Dmitry Novik 2023-04-14 16:34:19 +02:00
parent 352a240a20
commit cf5d9a175a
13 changed files with 145 additions and 5 deletions

View File

@ -1063,8 +1063,8 @@ Default value: 16.
## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio}
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility.
Possible values:
@ -1079,6 +1079,33 @@ Default value: 2.
<background_merges_mutations_concurrency_ratio>3</background_merges_mutations_concurrency_ratio>
```
## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit}
Sets the limit on how much RAM is allowed to use for performing merge and mutation operations.
Zero means unlimited.
If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks.
Possible values:
- Any positive integer.
**Example**
```xml
<merges_mutations_memory_usage_soft_limit>0</merges_mutations_memory_usage_soft_limit>
```
## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio}
The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`.
Default value: `0.5`.
**See also**
- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage)
- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit)
## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy}
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.

View File

@ -135,6 +135,7 @@ namespace CurrentMetrics
extern const Metric Revision;
extern const Metric VersionInteger;
extern const Metric MemoryTracking;
extern const Metric MergesMutationsMemoryTracking;
extern const Metric MaxDDLEntryID;
extern const Metric MaxPushedDDLEntryID;
}
@ -1225,6 +1226,25 @@ try
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
LOG_INFO(log, "Merges and mutations memory limit is set to {}",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit));
background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit);
background_memory_tracker.setDescription("(background)");
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();

View File

@ -53,6 +53,7 @@
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \

View File

@ -96,6 +96,7 @@ using namespace std::chrono_literals;
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User);
std::atomic<Int64> MemoryTracker::free_memory_in_allocator_arenas;
@ -528,3 +529,10 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value))
;
}
bool canEnqueueBackgroundTask()
{
auto limit = background_memory_tracker.getSoftLimit();
auto amount = background_memory_tracker.get();
return limit == 0 || amount < limit;
}

View File

@ -110,6 +110,22 @@ public:
return amount.load(std::memory_order_relaxed);
}
// Merges and mutations may pass memory ownership to other threads thus in the end of execution
// MemoryTracker for background task may have a non-zero counter.
// This method is intended to fix the counter inside of background_memory_tracker.
// NOTE: We can't use alloc/free methods to do it, because they also will change the value inside
// of total_memory_tracker.
void adjustOnBackgroundTaskEnd(const MemoryTracker * child)
{
auto background_memory_consumption = child->amount.load(std::memory_order_relaxed);
amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed);
// Also fix CurrentMetrics::MergesMutationsMemoryTracking
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::sub(metric_loaded, background_memory_consumption);
}
Int64 getPeak() const
{
return peak.load(std::memory_order_relaxed);
@ -220,3 +236,6 @@ public:
};
extern MemoryTracker total_memory_tracker;
extern MemoryTracker background_memory_tracker;
bool canEnqueueBackgroundTask();

View File

@ -40,6 +40,8 @@ namespace DB
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \
M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
\
M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \

View File

@ -84,6 +84,7 @@ ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_contex
group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
group->memory_tracker.setParent(&background_memory_tracker);
if (settings.memory_tracker_fault_probability > 0.0)
group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);

View File

@ -78,4 +78,9 @@ MergeInfo MergeListElement::getInfo() const
return res;
}
MergeListElement::~MergeListElement()
{
background_memory_tracker.adjustOnBackgroundTaskEnd(&getMemoryTracker());
}
}

View File

@ -113,6 +113,8 @@ struct MergeListElement : boost::noncopyable
MergeListElement * ptr() { return this; }
MergeListElement & ref() { return *this; }
~MergeListElement();
};
/** Maintains a list of currently running merges.

View File

@ -7,6 +7,7 @@
#include <base/sort.h>
#include <Backups/BackupEntriesCollector.h>
#include <Databases/IDatabase.h>
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <Common/ProfileEventsScope.h>
#include <Common/typeid_cast.h>
@ -41,6 +42,7 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <fmt/core.h>
namespace DB
{
@ -918,7 +920,14 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
SelectPartsDecision select_decision = SelectPartsDecision::CANNOT_SELECT;
if (partition_id.empty())
if (!canEnqueueBackgroundTask())
{
if (out_disable_reason)
*out_disable_reason = fmt::format("Current background tasks memory usage ({}) is more than the limit ({})",
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()));
}
else if (partition_id.empty())
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool;

View File

@ -5,6 +5,7 @@
#include <base/hex.h>
#include <Common/Macros.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEventsScope.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ZooKeeper/KeeperException.h>
@ -3223,7 +3224,14 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
auto merges_and_mutations_queued = queue.countMergesAndPartMutations();
size_t merges_and_mutations_sum = merges_and_mutations_queued.merges + merges_and_mutations_queued.mutations;
if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
if (!canEnqueueBackgroundTask())
{
LOG_TRACE(log, "Reached memory limit for the background tasks ({}), so won't select new parts to merge or mutate."
"Current background tasks memory usage: {}.",
formatReadableSizeWithBinarySuffix(background_memory_tracker.getSoftLimit()),
formatReadableSizeWithBinarySuffix(background_memory_tracker.get()));
}
else if (merges_and_mutations_sum >= storage_settings_ptr->max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges ({}) and part mutations ({})"
" is greater than max_replicated_merges_in_queue ({}), so won't select new parts to merge or mutate.",

View File

@ -0,0 +1,38 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node")
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_memory_limit_success():
node.query(
"CREATE TABLE test_merge_oom ENGINE=AggregatingMergeTree ORDER BY id EMPTY AS SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(20000)"
)
node.query("SYSTEM STOP MERGES test_merge_oom")
node.query(
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)"
)
node.query(
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)"
)
node.query(
"INSERT INTO test_merge_oom SELECT number%1024 AS id, arrayReduce( 'groupArrayState', arrayMap( x-> randomPrintableASCII(100), range(8192))) fat_state FROM numbers(10000)"
)
_, error = node.query_and_get_answer_with_error(
"SYSTEM START MERGES test_merge_oom;SET optimize_throw_if_noop=1;OPTIMIZE TABLE test_merge_oom FINAL"
)
assert not error
node.query("DROP TABLE test_merge_oom")