Dedicated memory background thread

This commit is contained in:
Antonio Andelic 2024-07-18 09:01:08 +02:00
parent 97f4ec2adb
commit 21009577d8
7 changed files with 78 additions and 85 deletions

View File

@ -447,9 +447,12 @@ void checkForUsersNotInMainConfig(
}
}
namespace
{
/// Unused in other builds
#if defined(OS_LINUX)
static String readLine(const String & path)
String readLine(const String & path)
{
ReadBufferFromFile in(path);
String contents;
@ -457,7 +460,7 @@ static String readLine(const String & path)
return contents;
}
static int readNumber(const String & path)
int readNumber(const String & path)
{
ReadBufferFromFile in(path);
int result;
@ -467,7 +470,7 @@ static int readNumber(const String & path)
#endif
static void sanityChecks(Server & server)
void sanityChecks(Server & server)
{
std::string data_path = getCanonicalPath(server.config().getString("path", DBMS_DEFAULT_PATH));
std::string logs_path = server.config().getString("logger.log", "");
@ -588,6 +591,31 @@ static void sanityChecks(Server & server)
}
}
[[noreturn]] void backgroundMemoryThread()
{
std::mutex mutex;
std::condition_variable cv;
std::unique_lock lock(mutex);
while (true)
{
cv.wait_for(lock, std::chrono::microseconds(200));
uint64_t epoch = 0;
mallctl("epoch", nullptr, nullptr, &epoch, sizeof(epoch));
auto maybe_resident = getJemallocValue<size_t>("stats.resident");
if (!maybe_resident.has_value())
continue;
Int64 resident = *maybe_resident;
//LOG_INFO(getLogger("JEmalloc"), "Resident {}", ReadableSize(resident));
MemoryTracker::setRSS(resident, false);
if (resident > total_memory_tracker.getHardLimit())
purgeJemallocArenas();
}
}
}
void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, ContextMutablePtr context, Poco::Logger * log)
{
try
@ -877,6 +905,11 @@ try
total_memory_tracker.setSampleMaxAllocationSize(server_settings.total_memory_profiler_sample_max_allocation_size);
}
ThreadFromGlobalPool background_memory_thread([]
{
backgroundMemoryThread();
});
Poco::ThreadPool server_pool(
/* minCapacity */3,
/* maxCapacity */server_settings.max_connections,

View File

@ -174,7 +174,7 @@ add_library (clickhouse_new_delete STATIC Common/new_delete.cpp)
target_link_libraries (clickhouse_new_delete PRIVATE clickhouse_common_io)
if (TARGET ch_contrib::jemalloc)
target_link_libraries (clickhouse_new_delete PRIVATE ch_contrib::jemalloc)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::jemalloc)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::jemalloc)
target_link_libraries (clickhouse_storages_system PRIVATE ch_contrib::jemalloc)
endif()

View File

@ -649,49 +649,6 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
"The amount of virtual memory mapped for the use of stack and for the allocated memory, in bytes."
" It is unspecified whether it includes the per-thread stacks and most of the allocated memory, that is allocated with the 'mmap' system call."
" This metric exists only for completeness reasons. I recommend to use the `MemoryResident` metric for monitoring."};
/// We must update the value of total_memory_tracker periodically.
/// Otherwise it might be calculated incorrectly - it can include a "drift" of memory amount.
/// See https://github.com/ClickHouse/ClickHouse/issues/10293
{
Int64 amount = total_memory_tracker.get();
Int64 peak = total_memory_tracker.getPeak();
Int64 rss = data.resident;
Int64 free_memory_in_allocator_arenas = 0;
#if USE_JEMALLOC
/// According to jemalloc man, pdirty is:
///
/// Number of pages within unused extents that are potentially
/// dirty, and for which madvise() or similar has not been called.
///
/// So they will be subtracted from RSS to make accounting more
/// accurate, since those pages are not really RSS but a memory
/// that can be used at anytime via jemalloc.
free_memory_in_allocator_arenas = je_malloc_pdirty * getPageSize();
#endif
if (cgroups_reader != nullptr)
{
rss = cgroups_reader->readMemoryUsage();
new_values["CgroupsMemoryUsage"] = { rss,
"The amount of physical memory used by the server process, reported by cgroups." };
}
Int64 difference = rss - amount;
/// Log only if difference is high. This is for convenience. The threshold is arbitrary.
if (difference >= 1048576 || difference <= -1048576)
LOG_TRACE(log,
"MemoryTracking: was {}, peak {}, free memory in arenas {}, will set to {} (RSS), difference: {}",
ReadableSize(amount),
ReadableSize(peak),
ReadableSize(free_memory_in_allocator_arenas),
ReadableSize(rss),
ReadableSize(difference));
MemoryTracker::setRSS(rss, /*has_free_memory_in_allocator_arenas_=*/free_memory_in_allocator_arenas > 0);
}
}
{

View File

@ -5,7 +5,6 @@
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <jemalloc/jemalloc.h>
#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)
@ -26,7 +25,6 @@ namespace ErrorCodes
void purgeJemallocArenas()
{
LOG_TRACE(getLogger("SystemJemalloc"), "Purging unused memory");
Stopwatch watch;
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
@ -46,20 +44,6 @@ void checkJemallocProfilingEnabled()
"set: MALLOC_CONF=background_thread:true,prof:true");
}
template <typename T>
void setJemallocValue(const char * name, T value)
{
T old_value;
size_t old_value_size = sizeof(T);
if (mallctl(name, &old_value, &old_value_size, reinterpret_cast<void*>(&value), sizeof(T)))
{
LOG_WARNING(getLogger("Jemalloc"), "mallctl for {} failed", name);
return;
}
LOG_INFO(getLogger("Jemalloc"), "Value for {} set to {} (from {})", name, value, old_value);
}
void setJemallocProfileActive(bool value)
{
checkJemallocProfilingEnabled();

View File

@ -5,6 +5,8 @@
#if USE_JEMALLOC
#include <string>
#include <Common/logger_useful.h>
#include <jemalloc/jemalloc.h>
namespace DB
{
@ -21,6 +23,33 @@ void setJemallocBackgroundThreads(bool enabled);
void setJemallocMaxBackgroundThreads(size_t max_threads);
template <typename T>
void setJemallocValue(const char * name, T value)
{
T old_value;
size_t old_value_size = sizeof(T);
if (mallctl(name, &old_value, &old_value_size, reinterpret_cast<void*>(&value), sizeof(T)))
{
LOG_WARNING(getLogger("Jemalloc"), "mallctl for {} failed", name);
return;
}
LOG_INFO(getLogger("Jemalloc"), "Value for {} set to {} (from {})", name, value, old_value);
}
template <typename T>
std::optional<T> getJemallocValue(const char * name)
{
T value;
size_t value_size = sizeof(T);
if (mallctl(name, &value, &value_size, nullptr, 0))
{
LOG_WARNING(getLogger("Jemalloc"), "mallctl for {} failed", name);
return std::nullopt;
}
return value;
}
}
#endif

View File

@ -20,9 +20,6 @@
#if USE_JEMALLOC
# include <jemalloc/jemalloc.h>
#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)
#endif
#include <atomic>
@ -126,11 +123,11 @@ std::atomic<bool> MemoryTracker::has_free_memory_in_allocator_arenas;
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_)
: parent(parent_)
, log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_)
, level(level_)
{}
: parent(parent_), log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_), level(level_)
{
}
MemoryTracker::~MemoryTracker()
{
@ -200,8 +197,12 @@ void MemoryTracker::debugLogBigAllocationWithoutCheck(Int64 size [[maybe_unused]
return;
MemoryTrackerBlockerInThread blocker(VariableContext::Global);
LOG_TEST(getLogger("MemoryTracker"), "Too big allocation ({} bytes) without checking memory limits, "
"it may lead to OOM. Stack trace: {}", size, StackTrace().toString());
LOG_TEST(
getLogger("MemoryTracker"),
"Too big allocation ({} bytes) without checking memory limits, "
"it may lead to OOM. Stack trace: {}",
size,
StackTrace().toString());
#else
/// Avoid trash logging in release builds
#endif
@ -293,17 +294,6 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
}
}
#if USE_JEMALLOC
if (level == VariableContext::Global && will_be > soft_limit.load(std::memory_order_relaxed)
&& has_free_memory_in_allocator_arenas.exchange(false))
{
Stopwatch watch;
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds());
}
#endif
if (unlikely(current_hard_limit && will_be > current_hard_limit))
{
if (memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
@ -513,7 +503,8 @@ void MemoryTracker::reset()
void MemoryTracker::setRSS(Int64 rss_, bool has_free_memory_in_allocator_arenas_)
{
Int64 new_amount = rss_;
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
if (rss_)
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
has_free_memory_in_allocator_arenas.store(has_free_memory_in_allocator_arenas_, std::memory_order_relaxed);
auto metric_loaded = total_memory_tracker.metric.load(std::memory_order_relaxed);

View File

@ -2,7 +2,6 @@
#include <atomic>
#include <chrono>
#include <optional>
#include <base/types.h>
#include <Common/CurrentMetrics.h>
#include <Common/VariableContext.h>