diff --git a/src/Common/CgroupsMemoryUsageObserver.cpp b/src/Common/CgroupsMemoryUsageObserver.cpp index 39dd51bb5e4..4d4a2f86bab 100644 --- a/src/Common/CgroupsMemoryUsageObserver.cpp +++ b/src/Common/CgroupsMemoryUsageObserver.cpp @@ -12,7 +12,9 @@ #include #include +#include #include +#include #include #include "config.h" @@ -22,24 +24,170 @@ #define STRINGIFY(x) STRINGIFY_HELPER(x) #endif +using namespace DB; namespace DB { namespace ErrorCodes { - extern const int CANNOT_CLOSE_FILE; - extern const int CANNOT_OPEN_FILE; - extern const int FILE_DOESNT_EXIST; - extern const int INCORRECT_DATA; +extern const int FILE_DOESNT_EXIST; +extern const int INCORRECT_DATA; } -CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_) - : log(getLogger("CgroupsMemoryUsageObserver")) - , wait_time(wait_time_) - , memory_usage_file(log) +} + +namespace { - LOG_INFO(log, "Initialized cgroups memory limit observer, wait time is {} sec", wait_time.count()); + +/// Format is +/// kernel 5 +/// rss 15 +/// [...] +uint64_t readMetricFromStatFile(ReadBufferFromFile & buf, const std::string & key) +{ + while (!buf.eof()) + { + std::string current_key; + readStringUntilWhitespace(current_key, buf); + if (current_key != key) + { + std::string dummy; + readStringUntilNewlineInto(dummy, buf); + buf.ignore(); + continue; + } + + assertChar(' ', buf); + uint64_t value = 0; + readIntText(value, buf); + return value; + } + + throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot find '{}' in '{}'", key, buf.getFileName()); +} + +struct CgroupsV1Reader : ICgroupsReader +{ + explicit CgroupsV1Reader(const std::filesystem::path & stat_file_dir) : buf(stat_file_dir / "memory.stat") { } + + uint64_t readMemoryUsage() override + { + std::lock_guard lock(mutex); + buf.rewind(); + return readMetricFromStatFile(buf, "rss"); + } + +private: + std::mutex mutex; + ReadBufferFromFile buf TSA_GUARDED_BY(mutex); +}; + +struct CgroupsV2Reader : ICgroupsReader +{ + explicit CgroupsV2Reader(const std::filesystem::path & stat_file_dir) + : current_buf(stat_file_dir / "memory.current"), stat_buf(stat_file_dir / "memory.stat") + { + } + + uint64_t readMemoryUsage() override + { + std::lock_guard lock(mutex); + current_buf.rewind(); + stat_buf.rewind(); + + int64_t mem_usage = 0; + /// memory.current contains a single number + /// the reason why we subtract it described here: https://github.com/ClickHouse/ClickHouse/issues/64652#issuecomment-2149630667 + readIntText(mem_usage, current_buf); + mem_usage -= readMetricFromStatFile(stat_buf, "inactive_file"); + chassert(mem_usage >= 0, "Negative memory usage"); + return mem_usage; + } + +private: + std::mutex mutex; + ReadBufferFromFile current_buf TSA_GUARDED_BY(mutex); + ReadBufferFromFile stat_buf TSA_GUARDED_BY(mutex); +}; + +/// Caveats: +/// - All of the logic in this file assumes that the current process is the only process in the +/// containing cgroup (or more precisely: the only process with significant memory consumption). +/// If this is not the case, then other processe's memory consumption may affect the internal +/// memory tracker ... +/// - Cgroups v1 and v2 allow nested cgroup hierarchies. As v1 is deprecated for over half a +/// decade and will go away at some point, hierarchical detection is only implemented for v2. +/// - I did not test what happens if a host has v1 and v2 simultaneously enabled. I believe such +/// systems existed only for a short transition period. + +std::optional getCgroupsV2Path() +{ + if (!cgroupsV2Enabled()) + return {}; + + if (!cgroupsV2MemoryControllerEnabled()) + return {}; + + std::filesystem::path current_cgroup = cgroupV2PathOfProcess(); + if (current_cgroup.empty()) + return {}; + + /// Return the bottom-most nested current memory file. If there is no such file at the current + /// level, try again at the parent level as memory settings are inherited. + while (current_cgroup != default_cgroups_mount.parent_path()) + { + const auto current_path = current_cgroup / "memory.current"; + const auto stat_path = current_cgroup / "memory.stat"; + if (std::filesystem::exists(current_path) && std::filesystem::exists(stat_path)) + return {current_cgroup}; + current_cgroup = current_cgroup.parent_path(); + } + return {}; +} + +std::optional getCgroupsV1Path() +{ + auto path = default_cgroups_mount / "memory/memory.stat"; + if (!std::filesystem::exists(path)) + return {}; + return {default_cgroups_mount / "memory"}; +} + +std::pair getCgroupsPath() +{ + auto v2_path = getCgroupsV2Path(); + if (v2_path.has_value()) + return {*v2_path, CgroupsMemoryUsageObserver::CgroupsVersion::V2}; + + auto v1_path = getCgroupsV1Path(); + if (v1_path.has_value()) + return {*v1_path, CgroupsMemoryUsageObserver::CgroupsVersion::V1}; + + throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot find cgroups v1 or v2 current memory file"); +} + +} + +namespace DB +{ + +CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_) + : log(getLogger("CgroupsMemoryUsageObserver")), wait_time(wait_time_) +{ + const auto [cgroup_path, version] = getCgroupsPath(); + + if (version == CgroupsVersion::V2) + cgroup_reader = std::make_unique(cgroup_path); + else + cgroup_reader = std::make_unique(cgroup_path); + + LOG_INFO( + log, + "Will read the current memory usage from '{}' (cgroups version: {}), wait time is {} sec", + cgroup_path, + (version == CgroupsVersion::V1) ? "v1" : "v2", + wait_time.count()); } CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver() @@ -77,14 +225,15 @@ void CgroupsMemoryUsageObserver::setMemoryUsageLimits(uint64_t hard_limit_, uint { if (up) { - LOG_WARNING(log, "Exceeded sort memory limit ({})", ReadableSize(soft_limit_)); + LOG_WARNING(log, "Exceeded soft memory limit ({})", ReadableSize(soft_limit_)); -#if USE_JEMALLOC +# if USE_JEMALLOC LOG_INFO(log, "Purging jemalloc arenas"); mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0); -#endif +# endif /// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them. - uint64_t memory_usage = memory_usage_file.readMemoryUsage(); + uint64_t memory_usage = cgroup_reader->readMemoryUsage(); + LOG_TRACE(log, "Read current memory usage {} bytes ({}) from cgroups", memory_usage, ReadableSize(memory_usage)); MemoryTracker::setRSS(memory_usage, 0); LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(memory_usage)); @@ -104,153 +253,6 @@ void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmo on_memory_amount_available_changed = on_memory_amount_available_changed_; } -namespace -{ - -/// Caveats: -/// - All of the logic in this file assumes that the current process is the only process in the -/// containing cgroup (or more precisely: the only process with significant memory consumption). -/// If this is not the case, then other processe's memory consumption may affect the internal -/// memory tracker ... -/// - Cgroups v1 and v2 allow nested cgroup hierarchies. As v1 is deprecated for over half a -/// decade and will go away at some point, hierarchical detection is only implemented for v2. -/// - I did not test what happens if a host has v1 and v2 simultaneously enabled. I believe such -/// systems existed only for a short transition period. - -std::optional getCgroupsV2FileName() -{ - if (!cgroupsV2Enabled()) - return {}; - - if (!cgroupsV2MemoryControllerEnabled()) - return {}; - - std::filesystem::path current_cgroup = cgroupV2PathOfProcess(); - if (current_cgroup.empty()) - return {}; - - /// Return the bottom-most nested current memory file. If there is no such file at the current - /// level, try again at the parent level as memory settings are inherited. - while (current_cgroup != default_cgroups_mount.parent_path()) - { - auto path = current_cgroup / "memory.current"; - if (std::filesystem::exists(path)) - return {path}; - current_cgroup = current_cgroup.parent_path(); - } - return {}; -} - -std::optional getCgroupsV1FileName() -{ - auto path = default_cgroups_mount / "memory/memory.stat"; - if (!std::filesystem::exists(path)) - return {}; - return {path}; -} - -std::pair getCgroupsFileName() -{ - auto v2_file_name = getCgroupsV2FileName(); - if (v2_file_name.has_value()) - return {*v2_file_name, CgroupsMemoryUsageObserver::CgroupsVersion::V2}; - - auto v1_file_name = getCgroupsV1FileName(); - if (v1_file_name.has_value()) - return {*v1_file_name, CgroupsMemoryUsageObserver::CgroupsVersion::V1}; - - throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot find cgroups v1 or v2 current memory file"); -} - -} - -CgroupsMemoryUsageObserver::MemoryUsageFile::MemoryUsageFile(LoggerPtr log_) - : log(log_) -{ - std::tie(file_name, version) = getCgroupsFileName(); - - LOG_INFO(log, "Will read the current memory usage from '{}' (cgroups version: {})", file_name, (version == CgroupsVersion::V1) ? "v1" : "v2"); - - fd = ::open(file_name.data(), O_RDONLY); - if (fd == -1) - ErrnoException::throwFromPath( - (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE, - file_name, "Cannot open file '{}'", file_name); -} - -CgroupsMemoryUsageObserver::MemoryUsageFile::~MemoryUsageFile() -{ - assert(fd != -1); - if (::close(fd) != 0) - { - try - { - ErrnoException::throwFromPath( - ErrorCodes::CANNOT_CLOSE_FILE, - file_name, "Cannot close file '{}'", file_name); - } - catch (const ErrnoException &) - { - tryLogCurrentException(log, __PRETTY_FUNCTION__); - } - } -} - -uint64_t CgroupsMemoryUsageObserver::MemoryUsageFile::readMemoryUsage() const -{ - /// File read is probably not read is thread-safe, just to be sure - std::lock_guard lock(mutex); - - ReadBufferFromFileDescriptor buf(fd); - buf.rewind(); - - uint64_t mem_usage = 0; - - switch (version) - { - case CgroupsVersion::V1: - { - /// Format is - /// kernel 5 - /// rss 15 - /// [...] - std::string key; - bool found_rss = false; - - while (!buf.eof()) - { - readStringUntilWhitespace(key, buf); - if (key != "rss") - { - std::string dummy; - readStringUntilNewlineInto(dummy, buf); - buf.ignore(); - continue; - } - - assertChar(' ', buf); - readIntText(mem_usage, buf); - found_rss = true; - break; - } - - if (!found_rss) - throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot find 'rss' in '{}'", file_name); - - break; - } - case CgroupsVersion::V2: - { - readIntText(mem_usage, buf); - break; - } - } - - LOG_TRACE(log, "Read current memory usage {} from cgroups", ReadableSize(mem_usage)); - - return mem_usage; -} - void CgroupsMemoryUsageObserver::startThread() { if (!thread.joinable()) @@ -302,7 +304,8 @@ void CgroupsMemoryUsageObserver::runThread() std::lock_guard limit_lock(limit_mutex); if (soft_limit > 0 && hard_limit > 0) { - uint64_t memory_usage = memory_usage_file.readMemoryUsage(); + uint64_t memory_usage = cgroup_reader->readMemoryUsage(); + LOG_TRACE(log, "Read current memory usage {} bytes ({}) from cgroups", memory_usage, ReadableSize(memory_usage)); if (memory_usage > hard_limit) { if (last_memory_usage <= hard_limit) diff --git a/src/Common/CgroupsMemoryUsageObserver.h b/src/Common/CgroupsMemoryUsageObserver.h index 51c5cd08124..26b7dbf036e 100644 --- a/src/Common/CgroupsMemoryUsageObserver.h +++ b/src/Common/CgroupsMemoryUsageObserver.h @@ -3,11 +3,19 @@ #include #include +#include #include namespace DB { +struct ICgroupsReader +{ + virtual ~ICgroupsReader() = default; + + virtual uint64_t readMemoryUsage() = 0; +}; + /// Does two things: /// 1. Periodically reads the memory usage of the process from Linux cgroups. /// You can specify soft or hard memory limits: @@ -61,27 +69,12 @@ private: uint64_t last_memory_usage = 0; /// how much memory does the process use uint64_t last_available_memory_amount; /// how much memory can the process use - /// Represents the cgroup virtual file that shows the memory consumption of the process's cgroup. - struct MemoryUsageFile - { - public: - explicit MemoryUsageFile(LoggerPtr log_); - ~MemoryUsageFile(); - uint64_t readMemoryUsage() const; - private: - LoggerPtr log; - mutable std::mutex mutex; - int fd TSA_GUARDED_BY(mutex) = -1; - CgroupsVersion version; - std::string file_name; - }; - - MemoryUsageFile memory_usage_file; - void stopThread(); void runThread(); + std::unique_ptr cgroup_reader; + std::mutex thread_mutex; std::condition_variable cond; ThreadFromGlobalPool thread; diff --git a/tests/integration/test_memory_limit_observer/test.py b/tests/integration/test_memory_limit_observer/test.py index fe3acd9a0cf..0eda165b1d2 100644 --- a/tests/integration/test_memory_limit_observer/test.py +++ b/tests/integration/test_memory_limit_observer/test.py @@ -35,7 +35,7 @@ def get_latest_mem_limit(): ).strip() ) return mem_limit - except Exception as e: + except Exception: time.sleep(1) raise Exception("Cannot get memory limit") @@ -51,3 +51,29 @@ def test_observe_memory_limit(started_cluster): if new_max_mem > original_max_mem: return raise Exception("the memory limit does not increase as expected") + + +def test_memory_usage_doesnt_include_page_cache_size(started_cluster): + try: + # populate page cache with 4GB of data; it might be killed by OOM killer but it is fine + node1.exec_in_container( + ["dd", "if=/dev/zero", "of=outputfile", "bs=1M", "count=4K"] + ) + except Exception: + pass + + observer_refresh_period = int( + node1.query( + "select value from system.server_settings where name = 'cgroups_memory_usage_observer_wait_time'" + ).strip() + ) + time.sleep(observer_refresh_period + 1) + + max_mem_usage_from_cgroup = node1.query( + """ + SELECT max(toUInt64(replaceRegexpAll(message, 'Read current memory usage (\\d+) bytes.*', '\\1'))) AS max_mem + FROM system.text_log + WHERE logger_name = 'CgroupsMemoryUsageObserver' AND message LIKE 'Read current memory usage%bytes%' + """ + ).strip() + assert int(max_mem_usage_from_cgroup) < 2 * 2**30