This commit is contained in:
Antonio Andelic 2024-07-18 16:51:49 +02:00
parent 05c7dc582a
commit 7d66f400b2
5 changed files with 45 additions and 11 deletions

View File

@ -65,6 +65,11 @@ struct JemallocMibCache
return value;
}
void run()
{
mallctlbymib(mib, mib_length, nullptr, nullptr, nullptr, 0);
}
private:
static constexpr size_t max_mib_length = 4;
size_t mib[max_mib_length];

View File

@ -242,6 +242,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
* So, we allow over-allocations.
*/
Int64 will_be = size ? size + amount.fetch_add(size, std::memory_order_relaxed) : amount.load(std::memory_order_relaxed);
Int64 will_be_rss = size + rss.load(std::memory_order_relaxed);
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end() && size)
@ -290,7 +291,7 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
}
}
if (unlikely(current_hard_limit && will_be > current_hard_limit))
if (unlikely(current_hard_limit && (will_be > current_hard_limit || will_be_rss > current_hard_limit)))
{
if (memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
{
@ -310,12 +311,13 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
throw DB::Exception(
DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED,
"Memory limit{}{} exceeded: "
"would use {} (attempt to allocate chunk of {} bytes), maximum: {}."
"would use {} (attempt to allocate chunk of {} bytes), current RSS {}, maximum: {}."
"{}{}",
description ? " " : "",
description ? description : "",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(rss.load(std::memory_order_relaxed)),
formatReadableSizeWithBinarySuffix(current_hard_limit),
overcommit_result == OvercommitResult::NONE ? "" : " OvercommitTracker decision: ",
toDescription(overcommit_result));
@ -496,17 +498,18 @@ void MemoryTracker::reset()
}
void MemoryTracker::setRSS(Int64 rss_)
void MemoryTracker::updateValues(Int64 rss_, Int64 allocated_)
{
Int64 new_amount = rss_;
Int64 new_amount = allocated_;
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
total_memory_tracker.rss.store(rss_, std::memory_order_relaxed);
auto metric_loaded = total_memory_tracker.metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::set(metric_loaded, new_amount);
bool log_memory_usage = true;
total_memory_tracker.updatePeak(rss_, log_memory_usage);
total_memory_tracker.updatePeak(new_amount, log_memory_usage);
}

View File

@ -57,6 +57,8 @@ private:
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> profiler_limit {0};
std::atomic<Int64> rss{0};
Int64 profiler_step = 0;
/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
@ -237,10 +239,8 @@ public:
/// Reset the accumulated data.
void reset();
/// Reset current counter to an RSS value.
/// Jemalloc may have pre-allocated arenas, they are accounted in RSS.
/// We can free this arenas in case of exception to avoid OOM.
static void setRSS(Int64 rss_);
/// update values based on external information (e.g. jemalloc's stat)
static void updateValues(Int64 rss_, Int64 allocated_);
/// Prints info about peak memory consumption into log.
void logPeakMemoryUsage();

View File

@ -3,11 +3,23 @@
#include <Common/Jemalloc.h>
#include <Common/MemoryTracker.h>
#include <Common/formatReadable.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event MemoryAllocatorPurge;
extern const Event MemoryAllocatorPurgeTimeMicroseconds;
extern const Event MemoryWorkerRun;
extern const Event MemoryWorkerRunElapsedMicroseconds;
}
namespace DB
{
#if USE_JEMALLOC
#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)
MemoryWorker::MemoryWorker(uint64_t period_ms_)
: period_ms(period_ms_)
{
@ -30,6 +42,8 @@ void MemoryWorker::backgroundThread()
{
JemallocMibCache<uint64_t> epoch_mib("epoch");
JemallocMibCache<size_t> resident_mib("stats.resident");
JemallocMibCache<size_t> allocated_mib("stats.allocated");
JemallocMibCache<size_t> purge_mib("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge");
std::unique_lock lock(mutex);
while (true)
{
@ -37,11 +51,20 @@ void MemoryWorker::backgroundThread()
if (shutdown)
return;
Stopwatch total_watch;
epoch_mib.setValue(0);
Int64 resident = resident_mib.getValue();
MemoryTracker::setRSS(resident);
if (resident > total_memory_tracker.getHardLimit())
purgeJemallocArenas();
{
Stopwatch purge_watch;
purge_mib.run();
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, purge_watch.elapsedMicroseconds());
}
MemoryTracker::updateValues(resident, allocated_mib.getValue());
ProfileEvents::increment(ProfileEvents::MemoryWorkerRun);
ProfileEvents::increment(ProfileEvents::MemoryWorkerRunElapsedMicroseconds, total_watch.elapsedMicroseconds());
}
}
#endif

View File

@ -778,6 +778,9 @@ The server successfully detected this situation and will download merged part fr
M(GWPAsanAllocateSuccess, "Number of successful allocations done by GWPAsan") \
M(GWPAsanAllocateFailed, "Number of failed allocations done by GWPAsan (i.e. filled pool)") \
M(GWPAsanFree, "Number of free operations done by GWPAsan") \
\
M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background") \
M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work") \
#ifdef APPLY_FOR_EXTERNAL_EVENTS