This commit is contained in:
Antonio Andelic 2024-07-21 11:32:57 +02:00
parent 7d66f400b2
commit 2147a96475
7 changed files with 45 additions and 11 deletions

View File

@ -638,6 +638,10 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
"The amount of virtual memory mapped for the use of stack and for the allocated memory, in bytes."
" It is unspecified whether it includes the per-thread stacks and most of the allocated memory, that is allocated with the 'mmap' system call."
" This metric exists only for completeness reasons. I recommend to use the `MemoryResident` metric for monitoring."};
#if !USE_JEMALLOC
MemoryTracker::updateValues(data.resident, data.resident, /*force_update=*/true);
#endif
}
{

View File

@ -221,6 +221,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
{
/// For global memory tracker always update memory usage.
amount.fetch_add(size, std::memory_order_relaxed);
rss.fetch_add(size, std::memory_order_relaxed);
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
@ -242,7 +243,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
* So, we allow over-allocations.
*/
Int64 will_be = size ? size + amount.fetch_add(size, std::memory_order_relaxed) : amount.load(std::memory_order_relaxed);
Int64 will_be_rss = size + rss.load(std::memory_order_relaxed);
Int64 will_be_rss = size ? size + rss.fetch_add(size, std::memory_order_relaxed) : rss.load(std::memory_order_relaxed);
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end() && size)
@ -269,6 +270,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
{
/// Revert
amount.fetch_sub(size, std::memory_order_relaxed);
rss.fetch_sub(size, std::memory_order_relaxed);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
@ -291,7 +293,8 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
}
}
if (unlikely(current_hard_limit && (will_be > current_hard_limit || will_be_rss > current_hard_limit)))
if (unlikely(
current_hard_limit && (will_be > current_hard_limit || (level == VariableContext::Global && will_be_rss > current_hard_limit))))
{
if (memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
{
@ -303,6 +306,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
{
/// Revert
amount.fetch_sub(size, std::memory_order_relaxed);
rss.fetch_sub(size, std::memory_order_relaxed);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
@ -411,6 +415,7 @@ AllocationTrace MemoryTracker::free(Int64 size, double _sample_probability)
{
/// For global memory tracker always update memory usage.
amount.fetch_sub(size, std::memory_order_relaxed);
rss.fetch_sub(size, std::memory_order_relaxed);
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::sub(metric_loaded, size);
@ -424,7 +429,12 @@ AllocationTrace MemoryTracker::free(Int64 size, double _sample_probability)
}
Int64 accounted_size = size;
if (level == VariableContext::Thread || level == VariableContext::Global)
if (level == VariableContext::Global)
{
amount.fetch_sub(accounted_size, std::memory_order_relaxed);
rss.fetch_sub(accounted_size, std::memory_order_relaxed);
}
else if (level == VariableContext::Thread)
{
/// Could become negative if memory allocated in this thread is freed in another one
amount.fetch_sub(accounted_size, std::memory_order_relaxed);
@ -498,12 +508,21 @@ void MemoryTracker::reset()
}
void MemoryTracker::updateValues(Int64 rss_, Int64 allocated_)
void MemoryTracker::updateValues(Int64 rss_, Int64 allocated_, bool force_update)
{
Int64 new_amount = allocated_;
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
total_memory_tracker.rss.store(rss_, std::memory_order_relaxed);
if (likely(!force_update && total_memory_tracker.amount.load(std::memory_order_relaxed) >= 0))
return;
Int64 new_amount = allocated_;
LOG_INFO(
getLogger("MemoryTracker"),
"Correcting the value of global memory tracker from {} to {}",
ReadableSize(total_memory_tracker.amount.load(std::memory_order_relaxed)),
ReadableSize(allocated_));
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
auto metric_loaded = total_memory_tracker.metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::set(metric_loaded, new_amount);

View File

@ -240,7 +240,7 @@ public:
void reset();
/// update values based on external information (e.g. jemalloc's stat)
static void updateValues(Int64 rss_, Int64 allocated_);
static void updateValues(Int64 rss_, Int64 allocated_, bool force_update);
/// Prints info about peak memory consumption into log.
void logPeakMemoryUsage();

View File

@ -44,6 +44,7 @@ void MemoryWorker::backgroundThread()
JemallocMibCache<size_t> resident_mib("stats.resident");
JemallocMibCache<size_t> allocated_mib("stats.allocated");
JemallocMibCache<size_t> purge_mib("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge");
bool first_run = false;
std::unique_lock lock(mutex);
while (true)
{
@ -62,9 +63,13 @@ void MemoryWorker::backgroundThread()
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, purge_watch.elapsedMicroseconds());
}
MemoryTracker::updateValues(resident, allocated_mib.getValue());
/// force update the allocated stat from jemalloc for the first run to cover the allocations we missed
/// during initialization
MemoryTracker::updateValues(resident, allocated_mib.getValue(), first_run);
ProfileEvents::increment(ProfileEvents::MemoryWorkerRun);
ProfileEvents::increment(ProfileEvents::MemoryWorkerRunElapsedMicroseconds, total_watch.elapsedMicroseconds());
first_run = false;
}
}
#endif

View File

@ -148,7 +148,13 @@ void KeeperDispatcher::requestThread()
Int64 mem_soft_limit = keeper_context->getKeeperMemorySoftLimit();
if (configuration_and_settings->standalone_keeper && isExceedingMemorySoftLimit() && checkIfRequestIncreaseMem(request.request))
{
LOG_WARNING(log, "Processing requests refused because of max_memory_usage_soft_limit {}, the total used memory is {}, request type is {}", ReadableSize(mem_soft_limit), ReadableSize(total_memory_tracker.get()), request.request->getOpNum());
LOG_WARNING(
log,
"Processing requests refused because of max_memory_usage_soft_limit {}, the total used memory is {}, request type "
"is {}",
ReadableSize(mem_soft_limit),
ReadableSize(total_memory_tracker.get()),
request.request->getOpNum());
addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS);
continue;
}

View File

@ -16,7 +16,7 @@
<value>az-zoo2</value>
<enable_auto_detection_on_cloud>1</enable_auto_detection_on_cloud>
</availability_zone>
<max_memory_usage_soft_limit>20000000</max_memory_usage_soft_limit>
<max_memory_usage_soft_limit>200000000</max_memory_usage_soft_limit>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>

View File

@ -13,7 +13,7 @@
<tcp_port>2181</tcp_port>
<server_id>3</server_id>
<max_memory_usage_soft_limit>20000000</max_memory_usage_soft_limit>
<max_memory_usage_soft_limit>200000000</max_memory_usage_soft_limit>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>