diff --git a/base/base/getMemoryAmount.cpp b/base/base/getMemoryAmount.cpp index 0311238caed..3d01e301f45 100644 --- a/base/base/getMemoryAmount.cpp +++ b/base/base/getMemoryAmount.cpp @@ -50,9 +50,6 @@ std::optional getCgroupsV2MemoryLimit() } -/** Returns the size of physical memory (RAM) in bytes. - * Returns 0 on unsupported platform - */ uint64_t getMemoryAmountOrZero() { int64_t num_pages = sysconf(_SC_PHYS_PAGES); diff --git a/base/base/getMemoryAmount.h b/base/base/getMemoryAmount.h index 7ebd92a8bcf..37ee0ebe7c6 100644 --- a/base/base/getMemoryAmount.h +++ b/base/base/getMemoryAmount.h @@ -2,11 +2,10 @@ #include -/** Returns the size of physical memory (RAM) in bytes. - * Returns 0 on unsupported platform or if it cannot determine the size of physical memory. - */ +/// Returns the size in bytes of physical memory (RAM) available to the process. The value can +/// be smaller than the total available RAM available to the system due to cgroups settings. +/// Returns 0 on unsupported platform or if it cannot determine the size of physical memory. uint64_t getMemoryAmountOrZero(); -/** Throws exception if it cannot determine the size of physical memory. - */ +/// Throws exception if it cannot determine the size of physical memory. uint64_t getMemoryAmount(); diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index 8972c82eab8..a558ed64bf9 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -623,6 +624,25 @@ try buildLoggers(config(), logger()); main_config_reloader->start(); + std::optional cgroups_memory_usage_observer; + try + { + auto wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15); + if (wait_time != 0) + { + cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time)); + /// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls + /// its memory usage by other means (via setting 'max_memory_usage_soft_limit'). + cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); }); + cgroups_memory_usage_observer->startThread(); + } + } + catch (Exception &) + { + tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization"); + } + + LOG_INFO(log, "Ready for connections."); waitForTerminationRequest(); diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index d7030e3b0aa..b67a4eccd15 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1296,7 +1296,7 @@ try std::optional cgroups_memory_usage_observer; try { - UInt64 wait_time = server_settings.cgroups_memory_usage_observer_wait_time; + auto wait_time = server_settings.cgroups_memory_usage_observer_wait_time; if (wait_time != 0) cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time)); } @@ -1362,7 +1362,7 @@ try { double hard_limit_ratio = new_server_settings.cgroup_memory_watcher_hard_limit_ratio; double soft_limit_ratio = new_server_settings.cgroup_memory_watcher_soft_limit_ratio; - cgroups_memory_usage_observer->setLimits( + cgroups_memory_usage_observer->setMemoryUsageLimits( static_cast(max_server_memory_usage * hard_limit_ratio), static_cast(max_server_memory_usage * soft_limit_ratio)); } @@ -1720,6 +1720,12 @@ try throw; } + if (cgroups_memory_usage_observer) + { + cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); }); + cgroups_memory_usage_observer->startThread(); + } + /// Reload config in SYSTEM RELOAD CONFIG query. global_context->setConfigReloadCallback([&]() { diff --git a/src/Common/CgroupsMemoryUsageObserver.cpp b/src/Common/CgroupsMemoryUsageObserver.cpp index 9bed6b191e4..dd68bd0da64 100644 --- a/src/Common/CgroupsMemoryUsageObserver.cpp +++ b/src/Common/CgroupsMemoryUsageObserver.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -36,7 +37,7 @@ namespace ErrorCodes CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_) : log(getLogger("CgroupsMemoryUsageObserver")) , wait_time(wait_time_) - , file(log) + , memory_usage_file(log) { LOG_INFO(log, "Initialized cgroups memory limit observer, wait time is {} sec", wait_time.count()); } @@ -46,13 +47,13 @@ CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver() stopThread(); } -void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_limit_) +void CgroupsMemoryUsageObserver::setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_) { + std::lock_guard limit_lock(limit_mutex); + if (hard_limit_ == hard_limit && soft_limit_ == soft_limit) return; - stopThread(); - hard_limit = hard_limit_; soft_limit = soft_limit_; @@ -83,10 +84,10 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0); #endif /// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them. - uint64_t current_usage = readMemoryUsage(); - MemoryTracker::setRSS(current_usage, 0); + uint64_t memory_usage = memory_usage_file.readMemoryUsage(); + MemoryTracker::setRSS(memory_usage, 0); - LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(current_usage)); + LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(memory_usage)); } else { @@ -94,14 +95,13 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l } }; - startThread(); - LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_)); } -uint64_t CgroupsMemoryUsageObserver::readMemoryUsage() const +void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_) { - return file.readMemoryUsage(); + std::lock_guard memory_amount_available_changed_lock(memory_amount_available_changed_mutex); + on_memory_amount_available_changed = on_memory_amount_available_changed_; } namespace @@ -163,7 +163,7 @@ std::pair getCgroupsFil } -CgroupsMemoryUsageObserver::File::File(LoggerPtr log_) +CgroupsMemoryUsageObserver::MemoryUsageFile::MemoryUsageFile(LoggerPtr log_) : log(log_) { std::tie(file_name, version) = getCgroupsFileName(); @@ -177,7 +177,7 @@ CgroupsMemoryUsageObserver::File::File(LoggerPtr log_) file_name, "Cannot open file '{}'", file_name); } -CgroupsMemoryUsageObserver::File::~File() +CgroupsMemoryUsageObserver::MemoryUsageFile::~MemoryUsageFile() { assert(fd != -1); if (::close(fd) != 0) @@ -195,7 +195,7 @@ CgroupsMemoryUsageObserver::File::~File() } } -uint64_t CgroupsMemoryUsageObserver::File::readMemoryUsage() const +uint64_t CgroupsMemoryUsageObserver::MemoryUsageFile::readMemoryUsage() const { /// File read is probably not read is thread-safe, just to be sure std::lock_guard lock(mutex); @@ -278,6 +278,9 @@ void CgroupsMemoryUsageObserver::runThread() { setThreadName("CgrpMemUsgObsr"); + last_available_memory_amount = getMemoryAmount(); + LOG_INFO(log, "Memory amount initially available to the process is {}", ReadableSize(last_available_memory_amount)); + std::unique_lock lock(thread_mutex); while (true) { @@ -286,8 +289,42 @@ void CgroupsMemoryUsageObserver::runThread() try { - uint64_t memory_usage = file.readMemoryUsage(); - processMemoryUsage(memory_usage); + uint64_t available_memory_amount = getMemoryAmount(); + if (available_memory_amount != last_available_memory_amount) + { + LOG_INFO(log, "Memory amount available to the process changed from {} to {}", ReadableSize(last_available_memory_amount), ReadableSize(available_memory_amount)); + last_available_memory_amount = available_memory_amount; + std::lock_guard memory_amount_available_changed_lock(memory_amount_available_changed_mutex); + on_memory_amount_available_changed(); + } + + std::lock_guard limit_lock(limit_mutex); + if (soft_limit > 0 && hard_limit > 0) + { + uint64_t memory_usage = memory_usage_file.readMemoryUsage(); + if (memory_usage > hard_limit) + { + if (last_memory_usage <= hard_limit) + on_hard_limit(true); + } + else + { + if (last_memory_usage > hard_limit) + on_hard_limit(false); + } + + if (memory_usage > soft_limit) + { + if (last_memory_usage <= soft_limit) + on_soft_limit(true); + } + else + { + if (last_memory_usage > soft_limit) + on_soft_limit(false); + } + last_memory_usage = memory_usage; + } } catch (...) { @@ -296,33 +333,6 @@ void CgroupsMemoryUsageObserver::runThread() } } -void CgroupsMemoryUsageObserver::processMemoryUsage(uint64_t current_usage) -{ - if (current_usage > hard_limit) - { - if (last_usage <= hard_limit) - on_hard_limit(true); - } - else - { - if (last_usage > hard_limit) - on_hard_limit(false); - } - - if (current_usage > soft_limit) - { - if (last_usage <= soft_limit) - on_soft_limit(true); - } - else - { - if (last_usage > soft_limit) - on_soft_limit(false); - } - - last_usage = current_usage; -} - } #endif diff --git a/src/Common/CgroupsMemoryUsageObserver.h b/src/Common/CgroupsMemoryUsageObserver.h index 28bf08c82b5..51c5cd08124 100644 --- a/src/Common/CgroupsMemoryUsageObserver.h +++ b/src/Common/CgroupsMemoryUsageObserver.h @@ -2,57 +2,71 @@ #include -#include #include #include namespace DB { -/// Periodically reads the current memory usage from Linux cgroups. -/// You can specify soft or hard memory limits: -/// - When the soft memory limit is hit, drop jemalloc cache. -/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster. +/// Does two things: +/// 1. Periodically reads the memory usage of the process from Linux cgroups. +/// You can specify soft or hard memory limits: +/// - When the soft memory limit is hit, drop jemalloc cache. +/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster. +/// The goal of this is to avoid that the process hits the maximum allowed memory limit at which there is a good +/// chance that the Limux OOM killer terminates it. All of this is done is because internal memory tracking in +/// ClickHouse can unfortunately under-estimate the actually used memory. +/// 2. Periodically reads the the maximum memory available to the process (which can change due to cgroups settings). +/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server +/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit' +/// (Keeper) from which various other internal limits are calculated, including the soft and hard limits for (1.). +/// The goal of this is to provide elasticity when the container is scaled-up/scaled-down. The mechanism (polling +/// cgroups) is quite implicit, unfortunately there is currently no better way to communicate memory threshold changes +/// to the database. #if defined(OS_LINUX) class CgroupsMemoryUsageObserver { public: + using OnMemoryLimitFn = std::function; + using OnMemoryAmountAvailableChangedFn = std::function; + enum class CgroupsVersion { V1, V2 - }; + explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_); ~CgroupsMemoryUsageObserver(); - void setLimits(uint64_t hard_limit_, uint64_t soft_limit_); + void setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_); + void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_); - size_t getHardLimit() const { return hard_limit; } - size_t getSoftLimit() const { return soft_limit; } - - uint64_t readMemoryUsage() const; + void startThread(); private: LoggerPtr log; - std::atomic hard_limit = 0; - std::atomic soft_limit = 0; - const std::chrono::seconds wait_time; - using CallbackFn = std::function; - CallbackFn on_hard_limit; - CallbackFn on_soft_limit; + std::mutex limit_mutex; + size_t hard_limit TSA_GUARDED_BY(limit_mutex) = 0; + size_t soft_limit TSA_GUARDED_BY(limit_mutex) = 0; + OnMemoryLimitFn on_hard_limit TSA_GUARDED_BY(limit_mutex); + OnMemoryLimitFn on_soft_limit TSA_GUARDED_BY(limit_mutex); - uint64_t last_usage = 0; + std::mutex memory_amount_available_changed_mutex; + OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed TSA_GUARDED_BY(memory_amount_available_changed_mutex); + + uint64_t last_memory_usage = 0; /// how much memory does the process use + uint64_t last_available_memory_amount; /// how much memory can the process use /// Represents the cgroup virtual file that shows the memory consumption of the process's cgroup. - struct File + struct MemoryUsageFile { public: - explicit File(LoggerPtr log_); - ~File(); + explicit MemoryUsageFile(LoggerPtr log_); + ~MemoryUsageFile(); uint64_t readMemoryUsage() const; private: LoggerPtr log; @@ -62,13 +76,11 @@ private: std::string file_name; }; - File file; + MemoryUsageFile memory_usage_file; - void startThread(); void stopThread(); void runThread(); - void processMemoryUsage(uint64_t usage); std::mutex thread_mutex; std::condition_variable cond; @@ -79,13 +91,13 @@ private: #else class CgroupsMemoryUsageObserver { + using OnMemoryAmountAvailableChangedFn = std::function; public: explicit CgroupsMemoryUsageObserver(std::chrono::seconds) {} - void setLimits(uint64_t, uint64_t) {} - size_t readMemoryUsage() { return 0; } - size_t getHardLimit() { return 0; } - size_t getSoftLimit() { return 0; } + void setMemoryUsageLimits(uint64_t, uint64_t) {} + void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn) {} + void startThread() {} }; #endif diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 52c0d8a8ee5..b695b493db7 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -1618,6 +1618,7 @@ class ClickHouseCluster: with_installed_binary=False, external_dirs=None, tmpfs=None, + mem_limit=None, zookeeper_docker_compose_path=None, minio_certs_dir=None, minio_data_dir=None, @@ -1728,6 +1729,7 @@ class ClickHouseCluster: with_installed_binary=with_installed_binary, external_dirs=external_dirs, tmpfs=tmpfs or [], + mem_limit=mem_limit, config_root_name=config_root_name, extra_configs=extra_configs, ) @@ -3203,6 +3205,7 @@ services: {krb5_conf} entrypoint: {entrypoint_cmd} tmpfs: {tmpfs} + {mem_limit} cap_add: - SYS_PTRACE - NET_ADMIN @@ -3288,6 +3291,7 @@ class ClickHouseInstance: with_installed_binary=False, external_dirs=None, tmpfs=None, + mem_limit=None, config_root_name="clickhouse", extra_configs=[], ): @@ -3299,6 +3303,10 @@ class ClickHouseInstance: self.external_dirs = external_dirs self.tmpfs = tmpfs or [] + if mem_limit is not None: + self.mem_limit = "mem_limit : " + mem_limit + else: + self.mem_limit = "" self.base_config_dir = ( p.abspath(p.join(base_path, base_config_dir)) if base_config_dir else None ) @@ -4644,6 +4652,7 @@ class ClickHouseInstance: db_dir=db_dir, external_dirs_volumes=external_dirs_volumes, tmpfs=str(self.tmpfs), + mem_limit=self.mem_limit, logs_dir=logs_dir, depends_on=str(depends_on), user=os.getuid(), diff --git a/tests/integration/test_manipulate_statistic/test.py b/tests/integration/test_manipulate_statistic/test.py index f1c00a61b07..7b96b392da8 100644 --- a/tests/integration/test_manipulate_statistic/test.py +++ b/tests/integration/test_manipulate_statistic/test.py @@ -6,7 +6,7 @@ from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( - "node1", user_configs=["config/config.xml"], with_zookeeper=True + "node1", user_configs=["config/config.xml"], with_zookeeper=False ) diff --git a/tests/integration/test_memory_limit_observer/__init__.py b/tests/integration/test_memory_limit_observer/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_memory_limit_observer/config/text_log.xml b/tests/integration/test_memory_limit_observer/config/text_log.xml new file mode 100644 index 00000000000..5c1cc663d0f --- /dev/null +++ b/tests/integration/test_memory_limit_observer/config/text_log.xml @@ -0,0 +1,7 @@ + + + system + text_log
+ 500 +
+
diff --git a/tests/integration/test_memory_limit_observer/test.py b/tests/integration/test_memory_limit_observer/test.py new file mode 100644 index 00000000000..fe3acd9a0cf --- /dev/null +++ b/tests/integration/test_memory_limit_observer/test.py @@ -0,0 +1,53 @@ +import pytest +import logging +import time + +from helpers.cluster import ClickHouseCluster, run_and_check + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance( + "node1", main_configs=["config/text_log.xml"], mem_limit="5g" +) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + + +def get_latest_mem_limit(): + for _ in range(10): + try: + mem_limit = float( + node1.query( + """ + select extract(message, '\\d+\\.\\d+') from system.text_log + where message like '%Setting max_server_memory_usage was set to%' and + message not like '%like%' order by event_time desc limit 1 + """ + ).strip() + ) + return mem_limit + except Exception as e: + time.sleep(1) + raise Exception("Cannot get memory limit") + + +def test_observe_memory_limit(started_cluster): + original_max_mem = get_latest_mem_limit() + logging.debug(f"get original memory limit {original_max_mem}") + run_and_check(["docker", "update", "--memory=10g", node1.docker_id]) + for _ in range(30): + time.sleep(10) + new_max_mem = get_latest_mem_limit() + logging.debug(f"get new memory limit {new_max_mem}") + if new_max_mem > original_max_mem: + return + raise Exception("the memory limit does not increase as expected")