diff --git a/src/Common/AsynchronousMetrics.cpp b/src/Common/AsynchronousMetrics.cpp index a5c9875188b..dc2f687004b 100644 --- a/src/Common/AsynchronousMetrics.cpp +++ b/src/Common/AsynchronousMetrics.cpp @@ -638,6 +638,10 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update) "The amount of virtual memory mapped for the use of stack and for the allocated memory, in bytes." " It is unspecified whether it includes the per-thread stacks and most of the allocated memory, that is allocated with the 'mmap' system call." " This metric exists only for completeness reasons. I recommend to use the `MemoryResident` metric for monitoring."}; + +#if !USE_JEMALLOC + MemoryTracker::updateValues(data.resident, data.resident, /*force_update=*/true); +#endif } { diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 49a3a6ef7ef..07d6ba98745 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -221,6 +221,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed { /// For global memory tracker always update memory usage. amount.fetch_add(size, std::memory_order_relaxed); + rss.fetch_add(size, std::memory_order_relaxed); auto metric_loaded = metric.load(std::memory_order_relaxed); if (metric_loaded != CurrentMetrics::end()) @@ -242,7 +243,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed * So, we allow over-allocations. */ Int64 will_be = size ? size + amount.fetch_add(size, std::memory_order_relaxed) : amount.load(std::memory_order_relaxed); - Int64 will_be_rss = size + rss.load(std::memory_order_relaxed); + Int64 will_be_rss = size ? size + rss.fetch_add(size, std::memory_order_relaxed) : rss.load(std::memory_order_relaxed); auto metric_loaded = metric.load(std::memory_order_relaxed); if (metric_loaded != CurrentMetrics::end() && size) @@ -269,6 +270,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed { /// Revert amount.fetch_sub(size, std::memory_order_relaxed); + rss.fetch_sub(size, std::memory_order_relaxed); /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); @@ -291,7 +293,8 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed } } - if (unlikely(current_hard_limit && (will_be > current_hard_limit || will_be_rss > current_hard_limit))) + if (unlikely( + current_hard_limit && (will_be > current_hard_limit || (level == VariableContext::Global && will_be_rss > current_hard_limit)))) { if (memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded) { @@ -303,6 +306,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed { /// Revert amount.fetch_sub(size, std::memory_order_relaxed); + rss.fetch_sub(size, std::memory_order_relaxed); /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global); @@ -411,6 +415,7 @@ AllocationTrace MemoryTracker::free(Int64 size, double _sample_probability) { /// For global memory tracker always update memory usage. amount.fetch_sub(size, std::memory_order_relaxed); + rss.fetch_sub(size, std::memory_order_relaxed); auto metric_loaded = metric.load(std::memory_order_relaxed); if (metric_loaded != CurrentMetrics::end()) CurrentMetrics::sub(metric_loaded, size); @@ -424,7 +429,12 @@ AllocationTrace MemoryTracker::free(Int64 size, double _sample_probability) } Int64 accounted_size = size; - if (level == VariableContext::Thread || level == VariableContext::Global) + if (level == VariableContext::Global) + { + amount.fetch_sub(accounted_size, std::memory_order_relaxed); + rss.fetch_sub(accounted_size, std::memory_order_relaxed); + } + else if (level == VariableContext::Thread) { /// Could become negative if memory allocated in this thread is freed in another one amount.fetch_sub(accounted_size, std::memory_order_relaxed); @@ -498,12 +508,21 @@ void MemoryTracker::reset() } -void MemoryTracker::updateValues(Int64 rss_, Int64 allocated_) +void MemoryTracker::updateValues(Int64 rss_, Int64 allocated_, bool force_update) { - Int64 new_amount = allocated_; - total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed); total_memory_tracker.rss.store(rss_, std::memory_order_relaxed); + if (likely(!force_update && total_memory_tracker.amount.load(std::memory_order_relaxed) >= 0)) + return; + + Int64 new_amount = allocated_; + LOG_INFO( + getLogger("MemoryTracker"), + "Correcting the value of global memory tracker from {} to {}", + ReadableSize(total_memory_tracker.amount.load(std::memory_order_relaxed)), + ReadableSize(allocated_)); + total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed); + auto metric_loaded = total_memory_tracker.metric.load(std::memory_order_relaxed); if (metric_loaded != CurrentMetrics::end()) CurrentMetrics::set(metric_loaded, new_amount); diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index add8bcb43d2..4913be9781f 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -240,7 +240,7 @@ public: void reset(); /// update values based on external information (e.g. jemalloc's stat) - static void updateValues(Int64 rss_, Int64 allocated_); + static void updateValues(Int64 rss_, Int64 allocated_, bool force_update); /// Prints info about peak memory consumption into log. void logPeakMemoryUsage(); diff --git a/src/Common/MemoryWorker.cpp b/src/Common/MemoryWorker.cpp index ae488a47b67..23cd90178ff 100644 --- a/src/Common/MemoryWorker.cpp +++ b/src/Common/MemoryWorker.cpp @@ -44,6 +44,7 @@ void MemoryWorker::backgroundThread() JemallocMibCache resident_mib("stats.resident"); JemallocMibCache allocated_mib("stats.allocated"); JemallocMibCache purge_mib("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge"); + bool first_run = false; std::unique_lock lock(mutex); while (true) { @@ -62,9 +63,13 @@ void MemoryWorker::backgroundThread() ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, purge_watch.elapsedMicroseconds()); } - MemoryTracker::updateValues(resident, allocated_mib.getValue()); + /// force update the allocated stat from jemalloc for the first run to cover the allocations we missed + /// during initialization + MemoryTracker::updateValues(resident, allocated_mib.getValue(), first_run); ProfileEvents::increment(ProfileEvents::MemoryWorkerRun); ProfileEvents::increment(ProfileEvents::MemoryWorkerRunElapsedMicroseconds, total_watch.elapsedMicroseconds()); + + first_run = false; } } #endif diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 8c7e6405153..332662117d8 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -148,7 +148,13 @@ void KeeperDispatcher::requestThread() Int64 mem_soft_limit = keeper_context->getKeeperMemorySoftLimit(); if (configuration_and_settings->standalone_keeper && isExceedingMemorySoftLimit() && checkIfRequestIncreaseMem(request.request)) { - LOG_WARNING(log, "Processing requests refused because of max_memory_usage_soft_limit {}, the total used memory is {}, request type is {}", ReadableSize(mem_soft_limit), ReadableSize(total_memory_tracker.get()), request.request->getOpNum()); + LOG_WARNING( + log, + "Processing requests refused because of max_memory_usage_soft_limit {}, the total used memory is {}, request type " + "is {}", + ReadableSize(mem_soft_limit), + ReadableSize(total_memory_tracker.get()), + request.request->getOpNum()); addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS); continue; } diff --git a/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config2.xml b/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config2.xml index 25ececea3e8..e71b93379d0 100644 --- a/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config2.xml +++ b/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config2.xml @@ -16,7 +16,7 @@ az-zoo2 1 - 20000000 + 200000000 10000 diff --git a/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config3.xml b/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config3.xml index 81e343b77c9..cf4a4686f2c 100644 --- a/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config3.xml +++ b/tests/integration/test_keeper_memory_soft_limit/configs/keeper_config3.xml @@ -13,7 +13,7 @@ 2181 3 - 20000000 + 200000000 10000