Use cgroups as source

This commit is contained in:
Antonio Andelic 2024-07-22 21:47:46 +02:00
parent 1c3f7d0fd0
commit d78cfd030f
15 changed files with 367 additions and 243 deletions

View File

@ -376,7 +376,8 @@ try
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds()); LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
}); });
MemoryWorker memory_worker(config().getUInt64("memory_worker_period_ms", 100)); MemoryWorker memory_worker(config().getUInt64("memory_worker_period_ms", 0));
memory_worker.start();
static ServerErrorHandler error_handler; static ServerErrorHandler error_handler;
Poco::ErrorHandler::set(&error_handler); Poco::ErrorHandler::set(&error_handler);
@ -419,8 +420,9 @@ try
for (const auto & server : *servers) for (const auto & server : *servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
return metrics; return metrics;
} },
); /*update_jemalloc_epoch_=*/memory_worker.getSource() != MemoryWorker::MemoryUsageSource::Jemalloc,
/*update_rss_=*/memory_worker.getSource() == MemoryWorker::MemoryUsageSource::None);
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host"); std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");

View File

@ -904,6 +904,8 @@ try
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds()); LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
}); });
MemoryWorker memory_worker(global_context->getServerSettings().memory_worker_period_ms);
/// This object will periodically calculate some metrics. /// This object will periodically calculate some metrics.
ServerAsynchronousMetrics async_metrics( ServerAsynchronousMetrics async_metrics(
global_context, global_context,
@ -922,8 +924,9 @@ try
for (const auto & server : servers) for (const auto & server : servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
return metrics; return metrics;
} },
); /*update_jemalloc_epoch_=*/memory_worker.getSource() != MemoryWorker::MemoryUsageSource::Jemalloc,
/*update_rss_=*/memory_worker.getSource() == MemoryWorker::MemoryUsageSource::None);
/// NOTE: global context should be destroyed *before* GlobalThreadPool::shutdown() /// NOTE: global context should be destroyed *before* GlobalThreadPool::shutdown()
/// Otherwise GlobalThreadPool::shutdown() will hang, since Context holds some threads. /// Otherwise GlobalThreadPool::shutdown() will hang, since Context holds some threads.
@ -1196,7 +1199,7 @@ try
FailPointInjection::enableFromGlobalConfig(config()); FailPointInjection::enableFromGlobalConfig(config());
MemoryWorker memory_worker(global_context->getServerSettings().memory_worker_period_ms); memory_worker.start();
int default_oom_score = 0; int default_oom_score = 0;

View File

@ -58,10 +58,14 @@ static std::unique_ptr<ReadBufferFromFilePRead> openFileIfExists(const std::stri
AsynchronousMetrics::AsynchronousMetrics( AsynchronousMetrics::AsynchronousMetrics(
unsigned update_period_seconds, unsigned update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_) const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
bool update_jemalloc_epoch_,
bool update_rss_)
: update_period(update_period_seconds) : update_period(update_period_seconds)
, log(getLogger("AsynchronousMetrics")) , log(getLogger("AsynchronousMetrics"))
, protocol_server_metrics_func(protocol_server_metrics_func_) , protocol_server_metrics_func(protocol_server_metrics_func_)
, update_jemalloc_epoch(update_jemalloc_epoch_)
, update_rss(update_rss_)
{ {
#if defined(OS_LINUX) #if defined(OS_LINUX)
openFileIfExists("/proc/meminfo", meminfo); openFileIfExists("/proc/meminfo", meminfo);
@ -377,6 +381,14 @@ void AsynchronousMetrics::run()
namespace namespace
{ {
uint64_t updateJemallocEpoch()
{
uint64_t value = 0;
size_t size = sizeof(value);
mallctl("epoch", &value, &size, &value, size);
return value;
}
template <typename Value> template <typename Value>
Value saveJemallocMetricImpl( Value saveJemallocMetricImpl(
AsynchronousMetricValues & values, AsynchronousMetricValues & values,
@ -593,8 +605,11 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
// 'epoch' is a special mallctl -- it updates the statistics. Without it, all // 'epoch' is a special mallctl -- it updates the statistics. Without it, all
// the following calls will return stale values. It increments and returns // the following calls will return stale values. It increments and returns
// the current epoch number, which might be useful to log as a sanity check. // the current epoch number, which might be useful to log as a sanity check.
auto epoch = getJemallocValue<uint64_t>("epoch"); auto epoch = update_jemalloc_epoch ? updateJemallocEpoch() : getJemallocValue<uint64_t>("epoch");
new_values["jemalloc.epoch"] = { epoch, "An internal incremental update number of the statistics of jemalloc (Jason Evans' memory allocator), used in all other `jemalloc` metrics." }; new_values["jemalloc.epoch"]
= {epoch,
"An internal incremental update number of the statistics of jemalloc (Jason Evans' memory allocator), used in all other "
"`jemalloc` metrics."};
// Collect the statistics themselves. // Collect the statistics themselves.
saveJemallocMetric<size_t>(new_values, "allocated"); saveJemallocMetric<size_t>(new_values, "allocated");
@ -607,10 +622,10 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
saveJemallocMetric<size_t>(new_values, "background_thread.num_threads"); saveJemallocMetric<size_t>(new_values, "background_thread.num_threads");
saveJemallocMetric<uint64_t>(new_values, "background_thread.num_runs"); saveJemallocMetric<uint64_t>(new_values, "background_thread.num_runs");
saveJemallocMetric<uint64_t>(new_values, "background_thread.run_intervals"); saveJemallocMetric<uint64_t>(new_values, "background_thread.run_intervals");
saveJemallocProf<size_t>(new_values, "active"); saveJemallocProf<bool>(new_values, "active");
saveAllArenasMetric<size_t>(new_values, "pactive"); saveAllArenasMetric<size_t>(new_values, "pactive");
[[maybe_unused]] size_t je_malloc_pdirty = saveAllArenasMetric<size_t>(new_values, "pdirty"); saveAllArenasMetric<size_t>(new_values, "pdirty");
[[maybe_unused]] size_t je_malloc_pmuzzy = saveAllArenasMetric<size_t>(new_values, "pmuzzy"); saveAllArenasMetric<size_t>(new_values, "pmuzzy");
saveAllArenasMetric<size_t>(new_values, "dirty_purged"); saveAllArenasMetric<size_t>(new_values, "dirty_purged");
saveAllArenasMetric<size_t>(new_values, "muzzy_purged"); saveAllArenasMetric<size_t>(new_values, "muzzy_purged");
#endif #endif
@ -639,9 +654,8 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
" It is unspecified whether it includes the per-thread stacks and most of the allocated memory, that is allocated with the 'mmap' system call." " It is unspecified whether it includes the per-thread stacks and most of the allocated memory, that is allocated with the 'mmap' system call."
" This metric exists only for completeness reasons. I recommend to use the `MemoryResident` metric for monitoring."}; " This metric exists only for completeness reasons. I recommend to use the `MemoryResident` metric for monitoring."};
#if !USE_JEMALLOC if (update_rss)
MemoryTracker::updateValues(data.resident, data.resident, /*force_update=*/true); MemoryTracker::updateRSS(data.resident);
#endif
} }
{ {

View File

@ -7,10 +7,8 @@
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <condition_variable> #include <condition_variable>
#include <map>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <thread>
#include <vector> #include <vector>
#include <optional> #include <optional>
#include <unordered_map> #include <unordered_map>
@ -69,7 +67,9 @@ public:
AsynchronousMetrics( AsynchronousMetrics(
unsigned update_period_seconds, unsigned update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_); const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
bool update_jemalloc_epoch_,
bool update_rss_);
virtual ~AsynchronousMetrics(); virtual ~AsynchronousMetrics();
@ -92,7 +92,6 @@ private:
virtual void logImpl(AsynchronousMetricValues &) {} virtual void logImpl(AsynchronousMetricValues &) {}
ProtocolServerMetricsFunc protocol_server_metrics_func; ProtocolServerMetricsFunc protocol_server_metrics_func;
std::shared_ptr<ICgroupsReader> cgroups_reader;
std::unique_ptr<ThreadFromGlobalPool> thread; std::unique_ptr<ThreadFromGlobalPool> thread;
@ -113,6 +112,9 @@ private:
MemoryStatisticsOS memory_stat TSA_GUARDED_BY(data_mutex); MemoryStatisticsOS memory_stat TSA_GUARDED_BY(data_mutex);
#endif #endif
const bool update_jemalloc_epoch;
const bool update_rss;
#if defined(OS_LINUX) #if defined(OS_LINUX)
std::optional<ReadBufferFromFilePRead> meminfo TSA_GUARDED_BY(data_mutex); std::optional<ReadBufferFromFilePRead> meminfo TSA_GUARDED_BY(data_mutex);
std::optional<ReadBufferFromFilePRead> loadavg TSA_GUARDED_BY(data_mutex); std::optional<ReadBufferFromFilePRead> loadavg TSA_GUARDED_BY(data_mutex);

View File

@ -13,12 +13,8 @@
#include <base/sleep.h> #include <base/sleep.h>
#include <cstdint> #include <cstdint>
#include <filesystem>
#include <memory>
#include <optional>
using namespace DB; using namespace DB;
namespace fs = std::filesystem;
namespace DB namespace DB
{ {
@ -29,170 +25,8 @@ extern const int FILE_DOESNT_EXIST;
extern const int INCORRECT_DATA; extern const int INCORRECT_DATA;
} }
}
namespace
{
/// Format is
/// kernel 5
/// rss 15
/// [...]
uint64_t readMetricFromStatFile(ReadBufferFromFile & buf, const std::string & key)
{
while (!buf.eof())
{
std::string current_key;
readStringUntilWhitespace(current_key, buf);
if (current_key != key)
{
std::string dummy;
readStringUntilNewlineInto(dummy, buf);
buf.ignore();
continue;
}
assertChar(' ', buf);
uint64_t value = 0;
readIntText(value, buf);
return value;
}
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot find '{}' in '{}'", key, buf.getFileName());
}
struct CgroupsV1Reader : ICgroupsReader
{
explicit CgroupsV1Reader(const fs::path & stat_file_dir) : buf(stat_file_dir / "memory.stat") { }
uint64_t readMemoryUsage() override
{
std::lock_guard lock(mutex);
buf.rewind();
return readMetricFromStatFile(buf, "rss");
}
private:
std::mutex mutex;
ReadBufferFromFile buf TSA_GUARDED_BY(mutex);
};
struct CgroupsV2Reader : ICgroupsReader
{
explicit CgroupsV2Reader(const fs::path & stat_file_dir)
: current_buf(stat_file_dir / "memory.current"), stat_buf(stat_file_dir / "memory.stat")
{
}
uint64_t readMemoryUsage() override
{
std::lock_guard lock(mutex);
current_buf.rewind();
stat_buf.rewind();
int64_t mem_usage = 0;
/// memory.current contains a single number
/// the reason why we subtract it described here: https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667
readIntText(mem_usage, current_buf);
mem_usage -= readMetricFromStatFile(stat_buf, "inactive_file");
chassert(mem_usage >= 0, "Negative memory usage");
return mem_usage;
}
private:
std::mutex mutex;
ReadBufferFromFile current_buf TSA_GUARDED_BY(mutex);
ReadBufferFromFile stat_buf TSA_GUARDED_BY(mutex);
};
/// Caveats:
/// - All of the logic in this file assumes that the current process is the only process in the
/// containing cgroup (or more precisely: the only process with significant memory consumption).
/// If this is not the case, then other processe's memory consumption may affect the internal
/// memory tracker ...
/// - Cgroups v1 and v2 allow nested cgroup hierarchies. As v1 is deprecated for over half a
/// decade and will go away at some point, hierarchical detection is only implemented for v2.
/// - I did not test what happens if a host has v1 and v2 simultaneously enabled. I believe such
/// systems existed only for a short transition period.
std::optional<std::string> getCgroupsV2Path()
{
if (!cgroupsV2Enabled())
return {};
if (!cgroupsV2MemoryControllerEnabled())
return {};
fs::path current_cgroup = cgroupV2PathOfProcess();
if (current_cgroup.empty())
return {};
/// Return the bottom-most nested current memory file. If there is no such file at the current
/// level, try again at the parent level as memory settings are inherited.
while (current_cgroup != default_cgroups_mount.parent_path())
{
const auto current_path = current_cgroup / "memory.current";
const auto stat_path = current_cgroup / "memory.stat";
if (fs::exists(current_path) && fs::exists(stat_path))
return {current_cgroup};
current_cgroup = current_cgroup.parent_path();
}
return {};
}
std::optional<std::string> getCgroupsV1Path()
{
auto path = default_cgroups_mount / "memory/memory.stat";
if (!fs::exists(path))
return {};
return {default_cgroups_mount / "memory"};
}
enum class CgroupsVersion : uint8_t
{
V1,
V2
};
std::pair<std::string, CgroupsVersion> getCgroupsPath()
{
auto v2_path = getCgroupsV2Path();
if (v2_path.has_value())
return {*v2_path, CgroupsVersion::V2};
auto v1_path = getCgroupsV1Path();
if (v1_path.has_value())
return {*v1_path, CgroupsVersion::V1};
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot find cgroups v1 or v2 current memory file");
}
}
namespace DB
{
std::shared_ptr<ICgroupsReader> createCgroupsReader()
{
const auto [cgroup_path, version] = getCgroupsPath();
LOG_INFO(
getLogger("CgroupsReader"),
"Will create cgroup reader from '{}' (cgroups version: {})",
cgroup_path,
(version == CgroupsVersion::V1) ? "v1" : "v2");
if (version == CgroupsVersion::V2)
return std::make_shared<CgroupsV2Reader>(cgroup_path);
else
{
chassert(version == CgroupsVersion::V1);
return std::make_shared<CgroupsV1Reader>(cgroup_path);
}
}
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_) CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
: log(getLogger("CgroupsMemoryUsageObserver")), wait_time(wait_time_), cgroups_reader(createCgroupsReader()) : log(getLogger("CgroupsMemoryUsageObserver")), wait_time(wait_time_)
{} {}
CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver() CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()

View File

@ -3,21 +3,11 @@
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <chrono> #include <chrono>
#include <memory>
#include <mutex> #include <mutex>
namespace DB namespace DB
{ {
struct ICgroupsReader
{
virtual ~ICgroupsReader() = default;
virtual uint64_t readMemoryUsage() = 0;
};
std::shared_ptr<ICgroupsReader> createCgroupsReader();
/// Periodically reads the the maximum memory available to the process (which can change due to cgroups settings). /// Periodically reads the the maximum memory available to the process (which can change due to cgroups settings).
/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server /// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server
/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit' /// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit'
@ -54,8 +44,6 @@ private:
void runThread(); void runThread();
std::shared_ptr<ICgroupsReader> cgroups_reader;
std::mutex thread_mutex; std::mutex thread_mutex;
std::condition_variable cond; std::condition_variable cond;
ThreadFromGlobalPool thread; ThreadFromGlobalPool thread;

View File

@ -508,13 +508,13 @@ void MemoryTracker::reset()
} }
void MemoryTracker::updateValues(Int64 rss_, Int64 allocated_, bool force_update) void MemoryTracker::updateRSS(Int64 rss_)
{ {
total_memory_tracker.rss.store(rss_, std::memory_order_relaxed); total_memory_tracker.rss.store(rss_, std::memory_order_relaxed);
}
if (likely(!force_update && total_memory_tracker.amount.load(std::memory_order_relaxed) >= 0)) void MemoryTracker::updateAllocated(Int64 allocated_)
return; {
Int64 new_amount = allocated_; Int64 new_amount = allocated_;
LOG_INFO( LOG_INFO(
getLogger("MemoryTracker"), getLogger("MemoryTracker"),
@ -531,7 +531,6 @@ void MemoryTracker::updateValues(Int64 rss_, Int64 allocated_, bool force_update
total_memory_tracker.updatePeak(new_amount, log_memory_usage); total_memory_tracker.updatePeak(new_amount, log_memory_usage);
} }
void MemoryTracker::setSoftLimit(Int64 value) void MemoryTracker::setSoftLimit(Int64 value)
{ {
soft_limit.store(value, std::memory_order_relaxed); soft_limit.store(value, std::memory_order_relaxed);

View File

@ -240,7 +240,8 @@ public:
void reset(); void reset();
/// update values based on external information (e.g. jemalloc's stat) /// update values based on external information (e.g. jemalloc's stat)
static void updateValues(Int64 rss_, Int64 allocated_, bool force_update); static void updateRSS(Int64 rss_);
static void updateAllocated(Int64 allocated_);
/// Prints info about peak memory consumption into log. /// Prints info about peak memory consumption into log.
void logPeakMemoryUsage(); void logPeakMemoryUsage();

View File

@ -1,11 +1,21 @@
#include <Common/MemoryWorker.h> #include <Common/MemoryWorker.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <base/cgroupsv2.h>
#include <Common/Jemalloc.h> #include <Common/Jemalloc.h>
#include <Common/MemoryTracker.h> #include <Common/MemoryTracker.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <filesystem>
#include <memory>
#include <optional>
namespace fs = std::filesystem;
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event MemoryAllocatorPurge; extern const Event MemoryAllocatorPurge;
@ -17,14 +27,227 @@ namespace ProfileEvents
namespace DB namespace DB
{ {
#if USE_JEMALLOC namespace ErrorCodes
#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)
MemoryWorker::MemoryWorker(uint64_t period_ms_)
: period_ms(period_ms_)
{ {
LOG_INFO(getLogger("MemoryWorker"), "Starting background memory thread with period of {}ms", period_ms.count()); extern const int FILE_DOESNT_EXIST;
extern const int INCORRECT_DATA;
}
#if defined(OS_LINUX)
struct ICgroupsReader
{
virtual ~ICgroupsReader() = default;
virtual uint64_t readMemoryUsage() = 0;
};
namespace
{
/// Format is
/// kernel 5
/// rss 15
/// [...]
uint64_t readMetricFromStatFile(ReadBufferFromFile & buf, const std::string & key)
{
while (!buf.eof())
{
std::string current_key;
readStringUntilWhitespace(current_key, buf);
if (current_key != key)
{
std::string dummy;
readStringUntilNewlineInto(dummy, buf);
buf.ignore();
continue;
}
assertChar(' ', buf);
uint64_t value = 0;
readIntText(value, buf);
return value;
}
LOG_ERROR(getLogger("CgroupsReader"), "Cannot find '{}' in '{}'", key, buf.getFileName());
return 0;
}
struct CgroupsV1Reader : ICgroupsReader
{
explicit CgroupsV1Reader(const fs::path & stat_file_dir) : buf(stat_file_dir / "memory.stat") { }
uint64_t readMemoryUsage() override
{
std::lock_guard lock(mutex);
buf.rewind();
return readMetricFromStatFile(buf, "rss");
}
private:
std::mutex mutex;
ReadBufferFromFile buf TSA_GUARDED_BY(mutex);
};
struct CgroupsV2Reader : ICgroupsReader
{
explicit CgroupsV2Reader(const fs::path & stat_file_dir) : stat_buf(stat_file_dir / "memory.stat") { }
uint64_t readMemoryUsage() override
{
std::lock_guard lock(mutex);
stat_buf.rewind();
return readMetricFromStatFile(stat_buf, "anon");
}
private:
std::mutex mutex;
ReadBufferFromFile stat_buf TSA_GUARDED_BY(mutex);
};
/// Caveats:
/// - All of the logic in this file assumes that the current process is the only process in the
/// containing cgroup (or more precisely: the only process with significant memory consumption).
/// If this is not the case, then other processe's memory consumption may affect the internal
/// memory tracker ...
/// - Cgroups v1 and v2 allow nested cgroup hierarchies. As v1 is deprecated for over half a
/// decade and will go away at some point, hierarchical detection is only implemented for v2.
/// - I did not test what happens if a host has v1 and v2 simultaneously enabled. I believe such
/// systems existed only for a short transition period.
std::optional<std::string> getCgroupsV2Path()
{
if (!cgroupsV2Enabled())
return {};
if (!cgroupsV2MemoryControllerEnabled())
return {};
fs::path current_cgroup = cgroupV2PathOfProcess();
if (current_cgroup.empty())
return {};
/// Return the bottom-most nested current memory file. If there is no such file at the current
/// level, try again at the parent level as memory settings are inherited.
while (current_cgroup != default_cgroups_mount.parent_path())
{
const auto current_path = current_cgroup / "memory.current";
const auto stat_path = current_cgroup / "memory.stat";
if (fs::exists(current_path) && fs::exists(stat_path))
return {current_cgroup};
current_cgroup = current_cgroup.parent_path();
}
return {};
}
std::optional<std::string> getCgroupsV1Path()
{
auto path = default_cgroups_mount / "memory/memory.stat";
if (!fs::exists(path))
return {};
return {default_cgroups_mount / "memory"};
}
enum class CgroupsVersion : uint8_t
{
V1,
V2
};
std::pair<std::string, CgroupsVersion> getCgroupsPath()
{
auto v2_path = getCgroupsV2Path();
if (v2_path.has_value())
return {*v2_path, CgroupsVersion::V2};
auto v1_path = getCgroupsV1Path();
if (v1_path.has_value())
return {*v1_path, CgroupsVersion::V1};
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot find cgroups v1 or v2 current memory file");
}
std::shared_ptr<ICgroupsReader> createCgroupsReader()
{
const auto [cgroup_path, version] = getCgroupsPath();
LOG_INFO(
getLogger("CgroupsReader"),
"Will create cgroup reader from '{}' (cgroups version: {})",
cgroup_path,
(version == CgroupsVersion::V1) ? "v1" : "v2");
if (version == CgroupsVersion::V2)
return std::make_shared<CgroupsV2Reader>(cgroup_path);
else
{
chassert(version == CgroupsVersion::V1);
return std::make_shared<CgroupsV1Reader>(cgroup_path);
}
}
#endif
constexpr uint64_t cgroups_memory_usage_tick_ms{50};
constexpr uint64_t jemalloc_memory_usage_tick_ms{100};
std::string_view sourceToString(MemoryWorker::MemoryUsageSource source)
{
switch (source)
{
case MemoryWorker::MemoryUsageSource::Cgroups: return "Cgroups";
case MemoryWorker::MemoryUsageSource::Jemalloc: return "Jemalloc";
case MemoryWorker::MemoryUsageSource::None: return "None";
}
}
}
/// We try to pick the best possible supported source for reading memory usage.
/// Supported sources in order of priority
/// - reading from cgroups' pseudo-files (fastest and most accurate)
/// - reading jemalloc's resident stat (doesn't take into account allocations that didn't use jemalloc)
/// Also, different tick rates are used because not all options are equally fast
MemoryWorker::MemoryWorker(uint64_t period_ms_)
: log(getLogger("MemoryWorker"))
, period_ms(period_ms_)
{
#if defined(OS_LINUX)
try
{
cgroups_reader = createCgroupsReader();
source = MemoryUsageSource::Cgroups;
if (period_ms == 0)
period_ms = cgroups_memory_usage_tick_ms;
return;
}
catch (...)
{
tryLogCurrentException(log, "Cannot use cgroups reader");
}
#endif
#if USE_JEMALLOC
source = MemoryUsageSource::Jemalloc;
if (period_ms == 0)
period_ms = jemalloc_memory_usage_tick_ms;
#endif
}
MemoryWorker::MemoryUsageSource MemoryWorker::getSource()
{
return source;
}
void MemoryWorker::start()
{
if (source == MemoryUsageSource::None)
return;
LOG_INFO(
getLogger("MemoryWorker"),
"Starting background memory thread with period of {}ms, using {} as source",
period_ms,
sourceToString(source));
background_thread = ThreadFromGlobalPool([this] { backgroundThread(); }); background_thread = ThreadFromGlobalPool([this] { backgroundThread(); });
} }
@ -40,29 +263,39 @@ MemoryWorker::~MemoryWorker()
background_thread.join(); background_thread.join();
} }
uint64_t MemoryWorker::getMemoryUsage()
{
switch (source)
{
case MemoryUsageSource::Cgroups:
return cgroups_reader->readMemoryUsage();
case MemoryUsageSource::Jemalloc:
return resident_mib.getValue();
case MemoryUsageSource::None:
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Trying to fetch memory usage while no memory source can be used");
}
}
void MemoryWorker::backgroundThread() void MemoryWorker::backgroundThread()
{ {
JemallocMibCache<uint64_t> epoch_mib("epoch"); std::chrono::milliseconds chrono_period_ms{period_ms};
JemallocMibCache<size_t> resident_mib("stats.resident");
JemallocMibCache<size_t> active_mib("stats.active");
JemallocMibCache<size_t> allocated_mib("stats.allocated");
JemallocMibCache<size_t> purge_mib("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge");
bool first_run = true; bool first_run = true;
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
while (true) while (true)
{ {
cv.wait_for(lock, period_ms, [this] { return shutdown; }); cv.wait_for(lock, chrono_period_ms, [this] { return shutdown; });
if (shutdown) if (shutdown)
return; return;
Stopwatch total_watch; Stopwatch total_watch;
epoch_mib.setValue(0);
Int64 resident = resident_mib.getValue();
/// force update the allocated stat from jemalloc for the first run to cover the allocations we missed if (source == MemoryUsageSource::Jemalloc)
/// during initialization epoch_mib.setValue(0);
MemoryTracker::updateValues(resident, allocated_mib.getValue(), first_run);
Int64 resident = getMemoryUsage();
MemoryTracker::updateRSS(resident);
#if USE_JEMALLOC
if (resident > total_memory_tracker.getHardLimit()) if (resident > total_memory_tracker.getHardLimit())
{ {
Stopwatch purge_watch; Stopwatch purge_watch;
@ -71,12 +304,19 @@ void MemoryWorker::backgroundThread()
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, purge_watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, purge_watch.elapsedMicroseconds());
} }
if (unlikely(first_run || total_memory_tracker.get() < 0))
{
if (source != MemoryUsageSource::Jemalloc)
epoch_mib.setValue(0);
MemoryTracker::updateAllocated(allocated_mib.getValue());
}
#endif
ProfileEvents::increment(ProfileEvents::MemoryWorkerRun); ProfileEvents::increment(ProfileEvents::MemoryWorkerRun);
ProfileEvents::increment(ProfileEvents::MemoryWorkerRunElapsedMicroseconds, total_watch.elapsedMicroseconds()); ProfileEvents::increment(ProfileEvents::MemoryWorkerRunElapsedMicroseconds, total_watch.elapsedMicroseconds());
first_run = false; first_run = false;
} }
} }
#endif
} }

View File

@ -1,13 +1,14 @@
#pragma once #pragma once
#include <Common/CgroupsMemoryUsageObserver.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/Jemalloc.h>
#include "config.h"
namespace DB namespace DB
{ {
#if USE_JEMALLOC struct ICgroupsReader;
/// Correct MemoryTracker based on stats.resident read from jemalloc. /// Correct MemoryTracker based on stats.resident read from jemalloc.
/// This requires jemalloc built with --enable-stats which we use. /// This requires jemalloc built with --enable-stats which we use.
/// The worker spawns a background thread which moves the jemalloc epoch (updates internal stats), /// The worker spawns a background thread which moves the jemalloc epoch (updates internal stats),
@ -19,8 +20,21 @@ class MemoryWorker
public: public:
explicit MemoryWorker(uint64_t period_ms_); explicit MemoryWorker(uint64_t period_ms_);
enum class MemoryUsageSource : uint8_t
{
None,
Cgroups,
Jemalloc
};
MemoryUsageSource getSource();
void start();
~MemoryWorker(); ~MemoryWorker();
private: private:
uint64_t getMemoryUsage();
void backgroundThread(); void backgroundThread();
ThreadFromGlobalPool background_thread; ThreadFromGlobalPool background_thread;
@ -29,14 +43,27 @@ private:
std::condition_variable cv; std::condition_variable cv;
bool shutdown = false; bool shutdown = false;
std::chrono::milliseconds period_ms; LoggerPtr log;
};
#else uint64_t period_ms;
class MemoryWorker
{ MemoryUsageSource source{MemoryUsageSource::None};
public:
explicit MemoryWorker(uint64_t /*period_ms_*/) {} #if defined(OS_LINUX)
}; std::shared_ptr<ICgroupsReader> cgroups_reader;
#endif #endif
#if USE_JEMALLOC
JemallocMibCache<uint64_t> epoch_mib{"epoch"};
JemallocMibCache<size_t> resident_mib{"stats.resident"};
JemallocMibCache<size_t> allocated_mib{"stats.allocated"};
#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)
JemallocMibCache<size_t> purge_mib{"arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge"};
#undef STRINGIFY
#undef STRINGIFY_HELPER
#endif
};
} }

View File

@ -114,8 +114,13 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
} }
KeeperAsynchronousMetrics::KeeperAsynchronousMetrics( KeeperAsynchronousMetrics::KeeperAsynchronousMetrics(
ContextPtr context_, unsigned update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_) ContextPtr context_,
: AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_), context(std::move(context_)) unsigned update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
bool update_jemalloc_epoch_,
bool update_rss_)
: AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_, update_jemalloc_epoch_, update_rss_)
, context(std::move(context_))
{ {
} }

View File

@ -13,9 +13,13 @@ class KeeperAsynchronousMetrics : public AsynchronousMetrics
{ {
public: public:
KeeperAsynchronousMetrics( KeeperAsynchronousMetrics(
ContextPtr context_, unsigned update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_); ContextPtr context_,
~KeeperAsynchronousMetrics() override; unsigned update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
bool update_jemalloc_epoch_,
bool update_rss_);
~KeeperAsynchronousMetrics() override;
private: private:
ContextPtr context; ContextPtr context;

View File

@ -157,7 +157,7 @@ namespace DB
M(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \ M(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \
M(Double, gwp_asan_force_sample_probability, 0.0003, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \ M(Double, gwp_asan_force_sample_probability, 0.0003, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \
M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \ M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \
M(UInt64, memory_worker_period_ms, 100, "Period of background memory worker which corrects memory tracker memory usages and cleans up unused pages during higher memory usage.", 0) \ M(UInt64, memory_worker_period_ms, 0, "Tick period of background memory worker which corrects memory tracker memory usages and cleans up unused pages during higher memory usage. If set to 0, default value will be used depending on the memory usage source", 0) \
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp /// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

View File

@ -55,9 +55,11 @@ ServerAsynchronousMetrics::ServerAsynchronousMetrics(
ContextPtr global_context_, ContextPtr global_context_,
unsigned update_period_seconds, unsigned update_period_seconds,
unsigned heavy_metrics_update_period_seconds, unsigned heavy_metrics_update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_) const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
bool update_jemalloc_epoch_,
bool update_rss_)
: WithContext(global_context_) : WithContext(global_context_)
, AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_) , AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_, update_jemalloc_epoch_, update_rss_)
, heavy_metric_update_period(heavy_metrics_update_period_seconds) , heavy_metric_update_period(heavy_metrics_update_period_seconds)
{ {
/// sanity check /// sanity check

View File

@ -14,7 +14,10 @@ public:
ContextPtr global_context_, ContextPtr global_context_,
unsigned update_period_seconds, unsigned update_period_seconds,
unsigned heavy_metrics_update_period_seconds, unsigned heavy_metrics_update_period_seconds,
const ProtocolServerMetricsFunc & protocol_server_metrics_func_); const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
bool update_jemalloc_epoch_,
bool update_rss_);
~ServerAsynchronousMetrics() override; ~ServerAsynchronousMetrics() override;
private: private: