Initial implementation

This commit is contained in:
Robert Schulze 2024-02-01 21:09:39 +00:00
parent 57ddf196cf
commit f6f1fd4d26
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
5 changed files with 458 additions and 5 deletions

View File

@ -32,11 +32,6 @@ std::optional<uint64_t> getCgroupsV2MemoryLimit()
/// - cgroup.subtree_control defines which controllers *are* enabled.
/// (see https://docs.kernel.org/admin-guide/cgroup-v2.html)
/// Caveat: nested groups may disable controllers. For simplicity, check only the top-level group.
/// ReadBufferFromFile subtree_control_file(default_cgroups_mount / "cgroup.subtree_control");
/// std::string subtree_control;
/// readString(subtree_control, subtree_control_file);
/// if (subtree_control.find("memory") == std::string::npos)
/// return {};
std::ifstream subtree_control_file(default_cgroups_mount / "cgroup.subtree_control");
std::stringstream subtree_control_buf;
subtree_control_buf << subtree_control_file.rdbuf();

View File

@ -24,6 +24,7 @@
#include <Common/MemoryTracker.h>
#include <Common/ClickHouseRevision.h>
#include <Common/DNSResolver.h>
#include <Common/CgroupsMemoryUsageObserver.h>
#include <Common/CurrentMetrics.h>
#include <Common/ConcurrencyControl.h>
#include <Common/Macros.h>
@ -1262,6 +1263,18 @@ try
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
}
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
try
{
UInt64 wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
if (wait_time != 0)
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
}
catch (Exception &)
{
tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
}
const std::string cert_path = config().getString("openSSL.server.certificateFile", "");
const std::string key_path = config().getString("openSSL.server.privateKeyFile", "");
@ -1315,6 +1328,15 @@ try
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
if (cgroups_memory_usage_observer)
{
double hard_limit_ratio = new_server_settings.cgroup_memory_watcher_hard_limit_ratio;
double soft_limit_ratio = new_server_settings.cgroup_memory_watcher_soft_limit_ratio;
cgroups_memory_usage_observer->setLimits(
static_cast<uint64_t>(max_server_memory_usage * hard_limit_ratio),
static_cast<uint64_t>(max_server_memory_usage * soft_limit_ratio));
}
size_t merges_mutations_memory_usage_soft_limit = new_server_settings.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(current_physical_server_memory * new_server_settings.merges_mutations_memory_usage_to_ram_ratio);

View File

@ -0,0 +1,341 @@
#include <Common/CgroupsMemoryUsageObserver.h>
#if defined(OS_LINUX)
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <base/sleep.h>
#include <filesystem>
#include <optional>
#include "config.h"
#if USE_JEMALLOC
# include <jemalloc/jemalloc.h>
#define STRINGIFY_HELPER(x) #x
#define STRINGIFY(x) STRINGIFY_HELPER(x)
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_CLOSE_FILE;
extern const int CANNOT_OPEN_FILE;
extern const int FILE_DOESNT_EXIST;
extern const int INCORRECT_DATA;
}
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
: log(getLogger("CgroupsMemoryUsageObserver"))
, wait_time(wait_time_)
, file(log)
{
LOG_INFO(log, "Initialized cgroups memory limit observer, wait time is {} sec", wait_time.count());
}
CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
{
stopThread();
}
void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_limit_)
{
if (hard_limit_ == hard_limit && soft_limit_ == soft_limit)
return;
stopThread();
hard_limit = hard_limit_;
soft_limit = soft_limit_;
on_hard_limit = [this, hard_limit_](bool up)
{
if (up)
{
LOG_WARNING(log, "Exceeded hard memory limit ({})", ReadableSize(hard_limit_));
/// Update current usage in memory tracker. Also reset free_memory_in_allocator_arenas to zero though we don't know if they are
/// really zero. Trying to avoid OOM ...
MemoryTracker::setRSS(hard_limit_, 0);
}
else
{
LOG_INFO(log, "Dropped below hard memory limit ({})", ReadableSize(hard_limit_));
}
};
on_soft_limit = [this, soft_limit_](bool up)
{
if (up)
{
LOG_WARNING(log, "Exceeded sort memory limit ({})", ReadableSize(soft_limit_));
#if USE_JEMALLOC
LOG_INFO(log, "Purging jemalloc arenas");
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
#endif
/// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them.
uint64_t current_usage = readMemoryUsage();
MemoryTracker::setRSS(current_usage, 0);
LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(current_usage));
}
else
{
LOG_INFO(log, "Dropped below soft memory limit ({})", ReadableSize(soft_limit_));
}
};
startThread();
LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_));
}
uint64_t CgroupsMemoryUsageObserver::readMemoryUsage() const
{
return file.readMemoryUsage();
}
namespace
{
/// I think it is possible to mount the cgroups hierarchy somewhere else (e.g. when in containers).
/// /sys/fs/cgroup was still symlinked to the actual mount in the cases that I have seen.
const std::filesystem::path default_cgroups_mount = "/sys/fs/cgroup";
/// Caveats:
/// - All of the logic in this file assumes that the current process is the only process in the
/// containing cgroup (or more precisely: the only process with significant memory consumption).
/// If this is not the case, then other processe's memory consumption may affect the internal
/// memory tracker ...
/// - Cgroups v1 and v2 allow nested cgroup hierarchies. As v1 is deprecated for over half a
/// decade and will go away at some point, hierarchical detection is only implemented for v2.
/// - I did not test what happens if a host has v1 and v2 simultaneously enabled. I believe such
/// systems existed only for a short transition period.
std::optional<std::string> getCgroupsV2FileName()
{
/// This file exists iff the host has cgroups v2 enabled.
auto controllers_file_path = default_cgroups_mount / "cgroup.controllers";
if (!std::filesystem::exists(controllers_file_path))
return {};
/// Make sure that the memory controller is enabled.
/// - cgroup.controllers defines which controllers *can* be enabled.
/// - cgroup.subtree_control defines which controllers *are* enabled.
/// (see https://docs.kernel.org/admin-guide/cgroup-v2.html)
/// Caveat: child cgroups may disable controllers but such a situation should be very rare.
/// Therefore, only check the top-level cgroup for simplicity.
ReadBufferFromFile subtree_control_file(default_cgroups_mount / "cgroup.subtree_control");
std::string subtree_control;
readString(subtree_control, subtree_control_file);
if (subtree_control.find("memory") == std::string::npos)
return {};
/// Identify the cgroup the process belongs to.
/// All PIDs assigned to a cgroup are in /sys/fs/cgroups/{cgroup_name}/cgroup.procs
/// A simpler way to get the membership is:
ReadBufferFromFile cgroup_file("/proc/self/cgroup");
std::string cgroup;
readString(cgroup, cgroup_file);
/// With cgroups v2, there will be a *single* line with prefix "0::/"
const std::string v2_prefix = "0::/";
if (!cgroup.starts_with(v2_prefix))
return {};
cgroup = cgroup.substr(v2_prefix.length());
auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup);
/// Return the bottom-most nested current memory file. If there is no such file at the current
/// level, try again at the parent level as memory settings are inherited.
while (current_cgroup != default_cgroups_mount.parent_path())
{
auto path = current_cgroup / "memory.current";
if (std::filesystem::exists(path))
return {path};
current_cgroup = current_cgroup.parent_path();
}
return {};
}
std::optional<std::string> getCgroupsV1FileName()
{
auto path = default_cgroups_mount / "memory/memory.stat";
if (!std::filesystem::exists(path))
return {};
return {path};
}
std::pair<std::string, CgroupsMemoryUsageObserver::CgroupsVersion> getCgroupsFileName()
{
auto v2_file_name = getCgroupsV2FileName();
if (v2_file_name.has_value())
return {*v2_file_name, CgroupsMemoryUsageObserver::CgroupsVersion::V2};
auto v1_file_name = getCgroupsV1FileName();
if (v1_file_name.has_value())
return {*v1_file_name, CgroupsMemoryUsageObserver::CgroupsVersion::V1};
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot find cgroups v1 or v2 current memory file");
}
}
CgroupsMemoryUsageObserver::File::File(LoggerPtr log_)
: log(log_)
{
std::tie(file_name, version) = getCgroupsFileName();
LOG_INFO(log, "Will read the current memory usage from '{}' (cgroups version: {})", file_name, (version == CgroupsVersion::V1) ? "v1" : "v2");
fd = ::open(file_name.data(), O_RDONLY);
if (fd == -1)
ErrnoException::throwFromPath(
(errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE,
file_name, "Cannot open file '{}'", file_name);
}
CgroupsMemoryUsageObserver::File::~File()
{
assert(fd != -1);
if (::close(fd) != 0)
{
try
{
ErrnoException::throwFromPath(
ErrorCodes::CANNOT_CLOSE_FILE,
file_name, "Cannot close file '{}'", file_name);
}
catch (const ErrnoException &)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
}
uint64_t CgroupsMemoryUsageObserver::File::readMemoryUsage() const
{
/// File read is probably not read is thread-safe, just to be sure
std::lock_guard lock(mutex);
ReadBufferFromFileDescriptor buf(fd);
buf.rewind();
uint64_t mem_usage;
switch (version)
{
case CgroupsVersion::V1:
{
/// Format is
/// kernel 5
/// rss 15
/// [...]
std::string key;
while (!buf.eof())
{
readStringUntilWhitespace(key, buf);
if (key != "rss")
continue;
assertChar(' ', buf);
readIntText(mem_usage, buf);
assertChar('\n', buf);
break;
}
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot find 'rss' in '{}'", file_name);
}
case CgroupsVersion::V2:
{
readIntText(mem_usage, buf);
break;
}
}
LOG_TRACE(log, "Read current memory usage {} from cgroups", ReadableSize(mem_usage));
return mem_usage;
}
void CgroupsMemoryUsageObserver::startThread()
{
if (!thread.joinable())
{
thread = ThreadFromGlobalPool(&CgroupsMemoryUsageObserver::runThread, this);
LOG_INFO(log, "Started cgroup current memory usage observer thread");
}
}
void CgroupsMemoryUsageObserver::stopThread()
{
{
std::lock_guard lock(thread_mutex);
if (!thread.joinable())
return;
quit = true;
}
cond.notify_one();
thread.join();
LOG_INFO(log, "Stopped cgroup current memory usage observer thread");
}
void CgroupsMemoryUsageObserver::runThread()
{
setThreadName("CgrpMemUsgObsr");
std::unique_lock lock(thread_mutex);
while (true)
{
if (cond.wait_for(lock, wait_time, [this] { return quit; }))
break;
try
{
uint64_t memory_usage = file.readMemoryUsage();
processMemoryUsage(memory_usage);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
}
void CgroupsMemoryUsageObserver::processMemoryUsage(uint64_t current_usage)
{
if (current_usage > hard_limit)
{
if (last_usage <= hard_limit)
on_hard_limit(true);
}
else
{
if (last_usage > hard_limit)
on_hard_limit(false);
}
if (current_usage > soft_limit)
{
if (last_usage <= soft_limit)
on_soft_limit(true);
}
else
{
if (last_usage > soft_limit)
on_soft_limit(false);
}
last_usage = current_usage;
}
}
#endif

View File

@ -0,0 +1,92 @@
#pragma once
#include <Common/ThreadPool.h>
#include <atomic>
#include <chrono>
#include <mutex>
namespace DB
{
/// Periodically reads the current memory usage from Linux cgroups.
/// You can specify soft or hard memory limits:
/// - When the soft memory limit is hit, drop jemalloc cache.
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
#if defined(OS_LINUX)
class CgroupsMemoryUsageObserver
{
public:
enum class CgroupsVersion
{
V1,
V2
};
explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_);
~CgroupsMemoryUsageObserver();
void setLimits(uint64_t hard_limit_, uint64_t soft_limit_);
size_t getHardLimit() const { return hard_limit; }
size_t getSoftLimit() const { return soft_limit; }
uint64_t readMemoryUsage() const;
private:
LoggerPtr log;
std::atomic<size_t> hard_limit = 0;
std::atomic<size_t> soft_limit = 0;
const std::chrono::seconds wait_time;
using CallbackFn = std::function<void(bool)>;
CallbackFn on_hard_limit;
CallbackFn on_soft_limit;
uint64_t last_usage = 0;
/// Represents the cgroup virtual file that shows the memory consumption of the process's cgroup.
struct File
{
public:
explicit File(LoggerPtr log_);
~File();
uint64_t readMemoryUsage() const;
private:
LoggerPtr log;
mutable std::mutex mutex;
int fd TSA_GUARDED_BY(mutex) = -1;
CgroupsVersion version;
std::string file_name;
};
File file;
void startThread();
void stopThread();
void runThread();
void processMemoryUsage(uint64_t usage);
std::mutex thread_mutex;
std::condition_variable cond;
ThreadFromGlobalPool thread;
bool quit = false;
};
#else
class CgroupsMemoryUsageObserver
{
public:
explicit CgroupsMemoryUsageObserver(std::chrono::seconds) {}
void setLimits(uint64_t, uint64_t) {}
size_t readMemoryUsage() { return 0; }
size_t getHardLimit() { return 0; }
size_t getSoftLimit() { return 0; }
};
#endif
}

View File

@ -54,6 +54,9 @@ namespace DB
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Maximum total memory usage for merges and mutations in bytes. Zero means unlimited.", 0) \
M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to RAM ratio. Allows to lower memory limit on low-memory systems.", 0) \
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
M(UInt64, cgroups_memory_usage_observer_wait_time, 15, "Polling interval in seconds to read the current memory usage from cgroups. Zero means disabled.", 0) \
M(Double, cgroup_memory_watcher_hard_limit_ratio, 0.95, "Hard memory limit ratio for cgroup memory usage observer", 0) \
M(Double, cgroup_memory_watcher_soft_limit_ratio, 0.9, "Sort memory limit ratio limit for cgroup memory usage observer", 0) \
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
\