mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #61049 from hanfei1991/hanfei/check-limit-periodically
Check cgroups memory limit update periodically
This commit is contained in:
commit
8a54c85f3c
@ -50,9 +50,6 @@ std::optional<uint64_t> getCgroupsV2MemoryLimit()
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns the size of physical memory (RAM) in bytes.
|
|
||||||
* Returns 0 on unsupported platform
|
|
||||||
*/
|
|
||||||
uint64_t getMemoryAmountOrZero()
|
uint64_t getMemoryAmountOrZero()
|
||||||
{
|
{
|
||||||
int64_t num_pages = sysconf(_SC_PHYS_PAGES);
|
int64_t num_pages = sysconf(_SC_PHYS_PAGES);
|
||||||
|
@ -2,11 +2,10 @@
|
|||||||
|
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
|
|
||||||
/** Returns the size of physical memory (RAM) in bytes.
|
/// Returns the size in bytes of physical memory (RAM) available to the process. The value can
|
||||||
* Returns 0 on unsupported platform or if it cannot determine the size of physical memory.
|
/// be smaller than the total available RAM available to the system due to cgroups settings.
|
||||||
*/
|
/// Returns 0 on unsupported platform or if it cannot determine the size of physical memory.
|
||||||
uint64_t getMemoryAmountOrZero();
|
uint64_t getMemoryAmountOrZero();
|
||||||
|
|
||||||
/** Throws exception if it cannot determine the size of physical memory.
|
/// Throws exception if it cannot determine the size of physical memory.
|
||||||
*/
|
|
||||||
uint64_t getMemoryAmount();
|
uint64_t getMemoryAmount();
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
#include <IO/UseSSL.h>
|
#include <IO/UseSSL.h>
|
||||||
#include <Core/ServerUUID.h>
|
#include <Core/ServerUUID.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
|
#include <Common/CgroupsMemoryUsageObserver.h>
|
||||||
#include <Common/ErrorHandlers.h>
|
#include <Common/ErrorHandlers.h>
|
||||||
#include <Common/assertProcessUserMatchesDataOwner.h>
|
#include <Common/assertProcessUserMatchesDataOwner.h>
|
||||||
#include <Common/makeSocketAddress.h>
|
#include <Common/makeSocketAddress.h>
|
||||||
@ -623,6 +624,25 @@ try
|
|||||||
buildLoggers(config(), logger());
|
buildLoggers(config(), logger());
|
||||||
main_config_reloader->start();
|
main_config_reloader->start();
|
||||||
|
|
||||||
|
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
auto wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15);
|
||||||
|
if (wait_time != 0)
|
||||||
|
{
|
||||||
|
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
|
||||||
|
/// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls
|
||||||
|
/// its memory usage by other means (via setting 'max_memory_usage_soft_limit').
|
||||||
|
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
|
||||||
|
cgroups_memory_usage_observer->startThread();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception &)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
LOG_INFO(log, "Ready for connections.");
|
LOG_INFO(log, "Ready for connections.");
|
||||||
|
|
||||||
waitForTerminationRequest();
|
waitForTerminationRequest();
|
||||||
|
@ -1296,7 +1296,7 @@ try
|
|||||||
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
UInt64 wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
|
auto wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
|
||||||
if (wait_time != 0)
|
if (wait_time != 0)
|
||||||
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
|
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
|
||||||
}
|
}
|
||||||
@ -1362,7 +1362,7 @@ try
|
|||||||
{
|
{
|
||||||
double hard_limit_ratio = new_server_settings.cgroup_memory_watcher_hard_limit_ratio;
|
double hard_limit_ratio = new_server_settings.cgroup_memory_watcher_hard_limit_ratio;
|
||||||
double soft_limit_ratio = new_server_settings.cgroup_memory_watcher_soft_limit_ratio;
|
double soft_limit_ratio = new_server_settings.cgroup_memory_watcher_soft_limit_ratio;
|
||||||
cgroups_memory_usage_observer->setLimits(
|
cgroups_memory_usage_observer->setMemoryUsageLimits(
|
||||||
static_cast<uint64_t>(max_server_memory_usage * hard_limit_ratio),
|
static_cast<uint64_t>(max_server_memory_usage * hard_limit_ratio),
|
||||||
static_cast<uint64_t>(max_server_memory_usage * soft_limit_ratio));
|
static_cast<uint64_t>(max_server_memory_usage * soft_limit_ratio));
|
||||||
}
|
}
|
||||||
@ -1720,6 +1720,12 @@ try
|
|||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (cgroups_memory_usage_observer)
|
||||||
|
{
|
||||||
|
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
|
||||||
|
cgroups_memory_usage_observer->startThread();
|
||||||
|
}
|
||||||
|
|
||||||
/// Reload config in SYSTEM RELOAD CONFIG query.
|
/// Reload config in SYSTEM RELOAD CONFIG query.
|
||||||
global_context->setConfigReloadCallback([&]()
|
global_context->setConfigReloadCallback([&]()
|
||||||
{
|
{
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
#include <IO/ReadBufferFromFileDescriptor.h>
|
#include <IO/ReadBufferFromFileDescriptor.h>
|
||||||
#include <IO/ReadHelpers.h>
|
#include <IO/ReadHelpers.h>
|
||||||
#include <base/cgroupsv2.h>
|
#include <base/cgroupsv2.h>
|
||||||
|
#include <base/getMemoryAmount.h>
|
||||||
#include <base/sleep.h>
|
#include <base/sleep.h>
|
||||||
|
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
@ -36,7 +37,7 @@ namespace ErrorCodes
|
|||||||
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
|
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
|
||||||
: log(getLogger("CgroupsMemoryUsageObserver"))
|
: log(getLogger("CgroupsMemoryUsageObserver"))
|
||||||
, wait_time(wait_time_)
|
, wait_time(wait_time_)
|
||||||
, file(log)
|
, memory_usage_file(log)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Initialized cgroups memory limit observer, wait time is {} sec", wait_time.count());
|
LOG_INFO(log, "Initialized cgroups memory limit observer, wait time is {} sec", wait_time.count());
|
||||||
}
|
}
|
||||||
@ -46,13 +47,13 @@ CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
|
|||||||
stopThread();
|
stopThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_limit_)
|
void CgroupsMemoryUsageObserver::setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_)
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> limit_lock(limit_mutex);
|
||||||
|
|
||||||
if (hard_limit_ == hard_limit && soft_limit_ == soft_limit)
|
if (hard_limit_ == hard_limit && soft_limit_ == soft_limit)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
stopThread();
|
|
||||||
|
|
||||||
hard_limit = hard_limit_;
|
hard_limit = hard_limit_;
|
||||||
soft_limit = soft_limit_;
|
soft_limit = soft_limit_;
|
||||||
|
|
||||||
@ -83,10 +84,10 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l
|
|||||||
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
|
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
|
||||||
#endif
|
#endif
|
||||||
/// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them.
|
/// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them.
|
||||||
uint64_t current_usage = readMemoryUsage();
|
uint64_t memory_usage = memory_usage_file.readMemoryUsage();
|
||||||
MemoryTracker::setRSS(current_usage, 0);
|
MemoryTracker::setRSS(memory_usage, 0);
|
||||||
|
|
||||||
LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(current_usage));
|
LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(memory_usage));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -94,14 +95,13 @@ void CgroupsMemoryUsageObserver::setLimits(uint64_t hard_limit_, uint64_t soft_l
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
startThread();
|
|
||||||
|
|
||||||
LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_));
|
LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_));
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t CgroupsMemoryUsageObserver::readMemoryUsage() const
|
void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_)
|
||||||
{
|
{
|
||||||
return file.readMemoryUsage();
|
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
|
||||||
|
on_memory_amount_available_changed = on_memory_amount_available_changed_;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
@ -163,7 +163,7 @@ std::pair<std::string, CgroupsMemoryUsageObserver::CgroupsVersion> getCgroupsFil
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CgroupsMemoryUsageObserver::File::File(LoggerPtr log_)
|
CgroupsMemoryUsageObserver::MemoryUsageFile::MemoryUsageFile(LoggerPtr log_)
|
||||||
: log(log_)
|
: log(log_)
|
||||||
{
|
{
|
||||||
std::tie(file_name, version) = getCgroupsFileName();
|
std::tie(file_name, version) = getCgroupsFileName();
|
||||||
@ -177,7 +177,7 @@ CgroupsMemoryUsageObserver::File::File(LoggerPtr log_)
|
|||||||
file_name, "Cannot open file '{}'", file_name);
|
file_name, "Cannot open file '{}'", file_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
CgroupsMemoryUsageObserver::File::~File()
|
CgroupsMemoryUsageObserver::MemoryUsageFile::~MemoryUsageFile()
|
||||||
{
|
{
|
||||||
assert(fd != -1);
|
assert(fd != -1);
|
||||||
if (::close(fd) != 0)
|
if (::close(fd) != 0)
|
||||||
@ -195,7 +195,7 @@ CgroupsMemoryUsageObserver::File::~File()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t CgroupsMemoryUsageObserver::File::readMemoryUsage() const
|
uint64_t CgroupsMemoryUsageObserver::MemoryUsageFile::readMemoryUsage() const
|
||||||
{
|
{
|
||||||
/// File read is probably not read is thread-safe, just to be sure
|
/// File read is probably not read is thread-safe, just to be sure
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
@ -278,6 +278,9 @@ void CgroupsMemoryUsageObserver::runThread()
|
|||||||
{
|
{
|
||||||
setThreadName("CgrpMemUsgObsr");
|
setThreadName("CgrpMemUsgObsr");
|
||||||
|
|
||||||
|
last_available_memory_amount = getMemoryAmount();
|
||||||
|
LOG_INFO(log, "Memory amount initially available to the process is {}", ReadableSize(last_available_memory_amount));
|
||||||
|
|
||||||
std::unique_lock lock(thread_mutex);
|
std::unique_lock lock(thread_mutex);
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
@ -286,8 +289,42 @@ void CgroupsMemoryUsageObserver::runThread()
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
uint64_t memory_usage = file.readMemoryUsage();
|
uint64_t available_memory_amount = getMemoryAmount();
|
||||||
processMemoryUsage(memory_usage);
|
if (available_memory_amount != last_available_memory_amount)
|
||||||
|
{
|
||||||
|
LOG_INFO(log, "Memory amount available to the process changed from {} to {}", ReadableSize(last_available_memory_amount), ReadableSize(available_memory_amount));
|
||||||
|
last_available_memory_amount = available_memory_amount;
|
||||||
|
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
|
||||||
|
on_memory_amount_available_changed();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> limit_lock(limit_mutex);
|
||||||
|
if (soft_limit > 0 && hard_limit > 0)
|
||||||
|
{
|
||||||
|
uint64_t memory_usage = memory_usage_file.readMemoryUsage();
|
||||||
|
if (memory_usage > hard_limit)
|
||||||
|
{
|
||||||
|
if (last_memory_usage <= hard_limit)
|
||||||
|
on_hard_limit(true);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (last_memory_usage > hard_limit)
|
||||||
|
on_hard_limit(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (memory_usage > soft_limit)
|
||||||
|
{
|
||||||
|
if (last_memory_usage <= soft_limit)
|
||||||
|
on_soft_limit(true);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (last_memory_usage > soft_limit)
|
||||||
|
on_soft_limit(false);
|
||||||
|
}
|
||||||
|
last_memory_usage = memory_usage;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
@ -296,33 +333,6 @@ void CgroupsMemoryUsageObserver::runThread()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CgroupsMemoryUsageObserver::processMemoryUsage(uint64_t current_usage)
|
|
||||||
{
|
|
||||||
if (current_usage > hard_limit)
|
|
||||||
{
|
|
||||||
if (last_usage <= hard_limit)
|
|
||||||
on_hard_limit(true);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (last_usage > hard_limit)
|
|
||||||
on_hard_limit(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (current_usage > soft_limit)
|
|
||||||
{
|
|
||||||
if (last_usage <= soft_limit)
|
|
||||||
on_soft_limit(true);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (last_usage > soft_limit)
|
|
||||||
on_soft_limit(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
last_usage = current_usage;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -2,57 +2,71 @@
|
|||||||
|
|
||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/// Periodically reads the current memory usage from Linux cgroups.
|
/// Does two things:
|
||||||
|
/// 1. Periodically reads the memory usage of the process from Linux cgroups.
|
||||||
/// You can specify soft or hard memory limits:
|
/// You can specify soft or hard memory limits:
|
||||||
/// - When the soft memory limit is hit, drop jemalloc cache.
|
/// - When the soft memory limit is hit, drop jemalloc cache.
|
||||||
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
|
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
|
||||||
|
/// The goal of this is to avoid that the process hits the maximum allowed memory limit at which there is a good
|
||||||
|
/// chance that the Limux OOM killer terminates it. All of this is done is because internal memory tracking in
|
||||||
|
/// ClickHouse can unfortunately under-estimate the actually used memory.
|
||||||
|
/// 2. Periodically reads the the maximum memory available to the process (which can change due to cgroups settings).
|
||||||
|
/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server
|
||||||
|
/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit'
|
||||||
|
/// (Keeper) from which various other internal limits are calculated, including the soft and hard limits for (1.).
|
||||||
|
/// The goal of this is to provide elasticity when the container is scaled-up/scaled-down. The mechanism (polling
|
||||||
|
/// cgroups) is quite implicit, unfortunately there is currently no better way to communicate memory threshold changes
|
||||||
|
/// to the database.
|
||||||
#if defined(OS_LINUX)
|
#if defined(OS_LINUX)
|
||||||
class CgroupsMemoryUsageObserver
|
class CgroupsMemoryUsageObserver
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
using OnMemoryLimitFn = std::function<void(bool)>;
|
||||||
|
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
|
||||||
|
|
||||||
enum class CgroupsVersion
|
enum class CgroupsVersion
|
||||||
{
|
{
|
||||||
V1,
|
V1,
|
||||||
V2
|
V2
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_);
|
explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_);
|
||||||
~CgroupsMemoryUsageObserver();
|
~CgroupsMemoryUsageObserver();
|
||||||
|
|
||||||
void setLimits(uint64_t hard_limit_, uint64_t soft_limit_);
|
void setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_);
|
||||||
|
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_);
|
||||||
|
|
||||||
size_t getHardLimit() const { return hard_limit; }
|
void startThread();
|
||||||
size_t getSoftLimit() const { return soft_limit; }
|
|
||||||
|
|
||||||
uint64_t readMemoryUsage() const;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
LoggerPtr log;
|
LoggerPtr log;
|
||||||
|
|
||||||
std::atomic<size_t> hard_limit = 0;
|
|
||||||
std::atomic<size_t> soft_limit = 0;
|
|
||||||
|
|
||||||
const std::chrono::seconds wait_time;
|
const std::chrono::seconds wait_time;
|
||||||
|
|
||||||
using CallbackFn = std::function<void(bool)>;
|
std::mutex limit_mutex;
|
||||||
CallbackFn on_hard_limit;
|
size_t hard_limit TSA_GUARDED_BY(limit_mutex) = 0;
|
||||||
CallbackFn on_soft_limit;
|
size_t soft_limit TSA_GUARDED_BY(limit_mutex) = 0;
|
||||||
|
OnMemoryLimitFn on_hard_limit TSA_GUARDED_BY(limit_mutex);
|
||||||
|
OnMemoryLimitFn on_soft_limit TSA_GUARDED_BY(limit_mutex);
|
||||||
|
|
||||||
uint64_t last_usage = 0;
|
std::mutex memory_amount_available_changed_mutex;
|
||||||
|
OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed TSA_GUARDED_BY(memory_amount_available_changed_mutex);
|
||||||
|
|
||||||
|
uint64_t last_memory_usage = 0; /// how much memory does the process use
|
||||||
|
uint64_t last_available_memory_amount; /// how much memory can the process use
|
||||||
|
|
||||||
/// Represents the cgroup virtual file that shows the memory consumption of the process's cgroup.
|
/// Represents the cgroup virtual file that shows the memory consumption of the process's cgroup.
|
||||||
struct File
|
struct MemoryUsageFile
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit File(LoggerPtr log_);
|
explicit MemoryUsageFile(LoggerPtr log_);
|
||||||
~File();
|
~MemoryUsageFile();
|
||||||
uint64_t readMemoryUsage() const;
|
uint64_t readMemoryUsage() const;
|
||||||
private:
|
private:
|
||||||
LoggerPtr log;
|
LoggerPtr log;
|
||||||
@ -62,13 +76,11 @@ private:
|
|||||||
std::string file_name;
|
std::string file_name;
|
||||||
};
|
};
|
||||||
|
|
||||||
File file;
|
MemoryUsageFile memory_usage_file;
|
||||||
|
|
||||||
void startThread();
|
|
||||||
void stopThread();
|
void stopThread();
|
||||||
|
|
||||||
void runThread();
|
void runThread();
|
||||||
void processMemoryUsage(uint64_t usage);
|
|
||||||
|
|
||||||
std::mutex thread_mutex;
|
std::mutex thread_mutex;
|
||||||
std::condition_variable cond;
|
std::condition_variable cond;
|
||||||
@ -79,13 +91,13 @@ private:
|
|||||||
#else
|
#else
|
||||||
class CgroupsMemoryUsageObserver
|
class CgroupsMemoryUsageObserver
|
||||||
{
|
{
|
||||||
|
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
|
||||||
public:
|
public:
|
||||||
explicit CgroupsMemoryUsageObserver(std::chrono::seconds) {}
|
explicit CgroupsMemoryUsageObserver(std::chrono::seconds) {}
|
||||||
|
|
||||||
void setLimits(uint64_t, uint64_t) {}
|
void setMemoryUsageLimits(uint64_t, uint64_t) {}
|
||||||
size_t readMemoryUsage() { return 0; }
|
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn) {}
|
||||||
size_t getHardLimit() { return 0; }
|
void startThread() {}
|
||||||
size_t getSoftLimit() { return 0; }
|
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -1618,6 +1618,7 @@ class ClickHouseCluster:
|
|||||||
with_installed_binary=False,
|
with_installed_binary=False,
|
||||||
external_dirs=None,
|
external_dirs=None,
|
||||||
tmpfs=None,
|
tmpfs=None,
|
||||||
|
mem_limit=None,
|
||||||
zookeeper_docker_compose_path=None,
|
zookeeper_docker_compose_path=None,
|
||||||
minio_certs_dir=None,
|
minio_certs_dir=None,
|
||||||
minio_data_dir=None,
|
minio_data_dir=None,
|
||||||
@ -1728,6 +1729,7 @@ class ClickHouseCluster:
|
|||||||
with_installed_binary=with_installed_binary,
|
with_installed_binary=with_installed_binary,
|
||||||
external_dirs=external_dirs,
|
external_dirs=external_dirs,
|
||||||
tmpfs=tmpfs or [],
|
tmpfs=tmpfs or [],
|
||||||
|
mem_limit=mem_limit,
|
||||||
config_root_name=config_root_name,
|
config_root_name=config_root_name,
|
||||||
extra_configs=extra_configs,
|
extra_configs=extra_configs,
|
||||||
)
|
)
|
||||||
@ -3203,6 +3205,7 @@ services:
|
|||||||
{krb5_conf}
|
{krb5_conf}
|
||||||
entrypoint: {entrypoint_cmd}
|
entrypoint: {entrypoint_cmd}
|
||||||
tmpfs: {tmpfs}
|
tmpfs: {tmpfs}
|
||||||
|
{mem_limit}
|
||||||
cap_add:
|
cap_add:
|
||||||
- SYS_PTRACE
|
- SYS_PTRACE
|
||||||
- NET_ADMIN
|
- NET_ADMIN
|
||||||
@ -3288,6 +3291,7 @@ class ClickHouseInstance:
|
|||||||
with_installed_binary=False,
|
with_installed_binary=False,
|
||||||
external_dirs=None,
|
external_dirs=None,
|
||||||
tmpfs=None,
|
tmpfs=None,
|
||||||
|
mem_limit=None,
|
||||||
config_root_name="clickhouse",
|
config_root_name="clickhouse",
|
||||||
extra_configs=[],
|
extra_configs=[],
|
||||||
):
|
):
|
||||||
@ -3299,6 +3303,10 @@ class ClickHouseInstance:
|
|||||||
|
|
||||||
self.external_dirs = external_dirs
|
self.external_dirs = external_dirs
|
||||||
self.tmpfs = tmpfs or []
|
self.tmpfs = tmpfs or []
|
||||||
|
if mem_limit is not None:
|
||||||
|
self.mem_limit = "mem_limit : " + mem_limit
|
||||||
|
else:
|
||||||
|
self.mem_limit = ""
|
||||||
self.base_config_dir = (
|
self.base_config_dir = (
|
||||||
p.abspath(p.join(base_path, base_config_dir)) if base_config_dir else None
|
p.abspath(p.join(base_path, base_config_dir)) if base_config_dir else None
|
||||||
)
|
)
|
||||||
@ -4644,6 +4652,7 @@ class ClickHouseInstance:
|
|||||||
db_dir=db_dir,
|
db_dir=db_dir,
|
||||||
external_dirs_volumes=external_dirs_volumes,
|
external_dirs_volumes=external_dirs_volumes,
|
||||||
tmpfs=str(self.tmpfs),
|
tmpfs=str(self.tmpfs),
|
||||||
|
mem_limit=self.mem_limit,
|
||||||
logs_dir=logs_dir,
|
logs_dir=logs_dir,
|
||||||
depends_on=str(depends_on),
|
depends_on=str(depends_on),
|
||||||
user=os.getuid(),
|
user=os.getuid(),
|
||||||
|
@ -6,7 +6,7 @@ from helpers.cluster import ClickHouseCluster
|
|||||||
cluster = ClickHouseCluster(__file__)
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
|
||||||
node1 = cluster.add_instance(
|
node1 = cluster.add_instance(
|
||||||
"node1", user_configs=["config/config.xml"], with_zookeeper=True
|
"node1", user_configs=["config/config.xml"], with_zookeeper=False
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
<clickhouse>
|
||||||
|
<text_log>
|
||||||
|
<database>system</database>
|
||||||
|
<table>text_log</table>
|
||||||
|
<flush_interval_milliseconds>500</flush_interval_milliseconds>
|
||||||
|
</text_log>
|
||||||
|
</clickhouse>
|
53
tests/integration/test_memory_limit_observer/test.py
Normal file
53
tests/integration/test_memory_limit_observer/test.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import pytest
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
|
from helpers.cluster import ClickHouseCluster, run_and_check
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
|
||||||
|
node1 = cluster.add_instance(
|
||||||
|
"node1", main_configs=["config/text_log.xml"], mem_limit="5g"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def started_cluster():
|
||||||
|
try:
|
||||||
|
cluster.start()
|
||||||
|
|
||||||
|
yield cluster
|
||||||
|
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def get_latest_mem_limit():
|
||||||
|
for _ in range(10):
|
||||||
|
try:
|
||||||
|
mem_limit = float(
|
||||||
|
node1.query(
|
||||||
|
"""
|
||||||
|
select extract(message, '\\d+\\.\\d+') from system.text_log
|
||||||
|
where message like '%Setting max_server_memory_usage was set to%' and
|
||||||
|
message not like '%like%' order by event_time desc limit 1
|
||||||
|
"""
|
||||||
|
).strip()
|
||||||
|
)
|
||||||
|
return mem_limit
|
||||||
|
except Exception as e:
|
||||||
|
time.sleep(1)
|
||||||
|
raise Exception("Cannot get memory limit")
|
||||||
|
|
||||||
|
|
||||||
|
def test_observe_memory_limit(started_cluster):
|
||||||
|
original_max_mem = get_latest_mem_limit()
|
||||||
|
logging.debug(f"get original memory limit {original_max_mem}")
|
||||||
|
run_and_check(["docker", "update", "--memory=10g", node1.docker_id])
|
||||||
|
for _ in range(30):
|
||||||
|
time.sleep(10)
|
||||||
|
new_max_mem = get_latest_mem_limit()
|
||||||
|
logging.debug(f"get new memory limit {new_max_mem}")
|
||||||
|
if new_max_mem > original_max_mem:
|
||||||
|
return
|
||||||
|
raise Exception("the memory limit does not increase as expected")
|
Loading…
Reference in New Issue
Block a user