mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-01 03:52:15 +00:00
Read cgroup memory usage in async metrics thread
This commit is contained in:
parent
f01a285f60
commit
97f4ec2adb
@ -399,6 +399,18 @@ try
|
|||||||
|
|
||||||
registerDisks(/*global_skip_access_check=*/false);
|
registerDisks(/*global_skip_access_check=*/false);
|
||||||
|
|
||||||
|
auto cgroups_memory_observer_wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
auto cgroups_reader = createCgroupsReader();
|
||||||
|
global_context->setCgroupsReader(createCgroupsReader());
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
if (cgroups_memory_observer_wait_time != 0)
|
||||||
|
tryLogCurrentException(log, "Failed to create cgroups reader");
|
||||||
|
}
|
||||||
|
|
||||||
/// This object will periodically calculate some metrics.
|
/// This object will periodically calculate some metrics.
|
||||||
KeeperAsynchronousMetrics async_metrics(
|
KeeperAsynchronousMetrics async_metrics(
|
||||||
global_context,
|
global_context,
|
||||||
@ -622,21 +634,19 @@ try
|
|||||||
main_config_reloader->start();
|
main_config_reloader->start();
|
||||||
|
|
||||||
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
||||||
try
|
if (cgroups_memory_observer_wait_time != 0)
|
||||||
{
|
{
|
||||||
auto wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15);
|
auto cgroups_reader = global_context->getCgroupsReader();
|
||||||
if (wait_time != 0)
|
if (cgroups_reader)
|
||||||
{
|
{
|
||||||
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
|
cgroups_memory_usage_observer.emplace(std::chrono::seconds(cgroups_memory_observer_wait_time), global_context->getCgroupsReader());
|
||||||
/// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls
|
/// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls
|
||||||
/// its memory usage by other means (via setting 'max_memory_usage_soft_limit').
|
/// its memory usage by other means (via setting 'max_memory_usage_soft_limit').
|
||||||
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
|
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
|
||||||
cgroups_memory_usage_observer->startThread();
|
cgroups_memory_usage_observer->startThread();
|
||||||
}
|
}
|
||||||
}
|
else
|
||||||
catch (Exception &)
|
LOG_ERROR(log, "Disabling cgroup memory observer because of an error during initialization of cgroups reader");
|
||||||
{
|
|
||||||
tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -897,6 +897,17 @@ try
|
|||||||
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
|
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
auto cgroups_reader = createCgroupsReader();
|
||||||
|
global_context->setCgroupsReader(createCgroupsReader());
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
if (server_settings.cgroups_memory_usage_observer_wait_time != 0)
|
||||||
|
tryLogCurrentException(log, "Failed to create cgroups reader");
|
||||||
|
}
|
||||||
|
|
||||||
/// This object will periodically calculate some metrics.
|
/// This object will periodically calculate some metrics.
|
||||||
ServerAsynchronousMetrics async_metrics(
|
ServerAsynchronousMetrics async_metrics(
|
||||||
global_context,
|
global_context,
|
||||||
@ -1456,15 +1467,13 @@ try
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
||||||
try
|
if (auto wait_time = server_settings.cgroups_memory_usage_observer_wait_time; wait_time != 0)
|
||||||
{
|
{
|
||||||
auto wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
|
auto cgroups_reader = global_context->getCgroupsReader();
|
||||||
if (wait_time != 0)
|
if (cgroups_reader)
|
||||||
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
|
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time), std::move(cgroups_reader));
|
||||||
}
|
else
|
||||||
catch (Exception &)
|
LOG_ERROR(log, "Disabling cgroup memory observer because of an error during initialization of cgroups reader");
|
||||||
{
|
|
||||||
tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string cert_path = config().getString("openSSL.server.certificateFile", "");
|
std::string cert_path = config().getString("openSSL.server.certificateFile", "");
|
||||||
@ -1532,15 +1541,6 @@ try
|
|||||||
total_memory_tracker.setDescription("(total)");
|
total_memory_tracker.setDescription("(total)");
|
||||||
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
|
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
|
||||||
|
|
||||||
if (cgroups_memory_usage_observer)
|
|
||||||
{
|
|
||||||
double hard_limit_ratio = new_server_settings.cgroup_memory_watcher_hard_limit_ratio;
|
|
||||||
double soft_limit_ratio = new_server_settings.cgroup_memory_watcher_soft_limit_ratio;
|
|
||||||
cgroups_memory_usage_observer->setMemoryUsageLimits(
|
|
||||||
static_cast<uint64_t>(max_server_memory_usage * hard_limit_ratio),
|
|
||||||
static_cast<uint64_t>(max_server_memory_usage * soft_limit_ratio));
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t merges_mutations_memory_usage_soft_limit = new_server_settings.merges_mutations_memory_usage_soft_limit;
|
size_t merges_mutations_memory_usage_soft_limit = new_server_settings.merges_mutations_memory_usage_soft_limit;
|
||||||
|
|
||||||
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(current_physical_server_memory * new_server_settings.merges_mutations_memory_usage_to_ram_ratio);
|
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(current_physical_server_memory * new_server_settings.merges_mutations_memory_usage_to_ram_ratio);
|
||||||
|
@ -57,10 +57,12 @@ static std::unique_ptr<ReadBufferFromFilePRead> openFileIfExists(const std::stri
|
|||||||
|
|
||||||
AsynchronousMetrics::AsynchronousMetrics(
|
AsynchronousMetrics::AsynchronousMetrics(
|
||||||
unsigned update_period_seconds,
|
unsigned update_period_seconds,
|
||||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
|
||||||
|
std::shared_ptr<ICgroupsReader> cgroups_reader_)
|
||||||
: update_period(update_period_seconds)
|
: update_period(update_period_seconds)
|
||||||
, log(getLogger("AsynchronousMetrics"))
|
, log(getLogger("AsynchronousMetrics"))
|
||||||
, protocol_server_metrics_func(protocol_server_metrics_func_)
|
, protocol_server_metrics_func(protocol_server_metrics_func_)
|
||||||
|
, cgroups_reader(std::move(cgroups_reader_))
|
||||||
{
|
{
|
||||||
#if defined(OS_LINUX)
|
#if defined(OS_LINUX)
|
||||||
openFileIfExists("/proc/meminfo", meminfo);
|
openFileIfExists("/proc/meminfo", meminfo);
|
||||||
@ -669,6 +671,13 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
|
|||||||
free_memory_in_allocator_arenas = je_malloc_pdirty * getPageSize();
|
free_memory_in_allocator_arenas = je_malloc_pdirty * getPageSize();
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
if (cgroups_reader != nullptr)
|
||||||
|
{
|
||||||
|
rss = cgroups_reader->readMemoryUsage();
|
||||||
|
new_values["CgroupsMemoryUsage"] = { rss,
|
||||||
|
"The amount of physical memory used by the server process, reported by cgroups." };
|
||||||
|
}
|
||||||
|
|
||||||
Int64 difference = rss - amount;
|
Int64 difference = rss - amount;
|
||||||
|
|
||||||
/// Log only if difference is high. This is for convenience. The threshold is arbitrary.
|
/// Log only if difference is high. This is for convenience. The threshold is arbitrary.
|
||||||
@ -681,7 +690,7 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
|
|||||||
ReadableSize(rss),
|
ReadableSize(rss),
|
||||||
ReadableSize(difference));
|
ReadableSize(difference));
|
||||||
|
|
||||||
MemoryTracker::setRSS(rss, free_memory_in_allocator_arenas);
|
MemoryTracker::setRSS(rss, /*has_free_memory_in_allocator_arenas_=*/free_memory_in_allocator_arenas > 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <Common/CgroupsMemoryUsageObserver.h>
|
||||||
#include <Common/MemoryStatisticsOS.h>
|
#include <Common/MemoryStatisticsOS.h>
|
||||||
#include <Common/ThreadPool.h>
|
#include <Common/ThreadPool.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
@ -68,7 +69,8 @@ public:
|
|||||||
|
|
||||||
AsynchronousMetrics(
|
AsynchronousMetrics(
|
||||||
unsigned update_period_seconds,
|
unsigned update_period_seconds,
|
||||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_);
|
const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
|
||||||
|
std::shared_ptr<ICgroupsReader> cgroups_reader_);
|
||||||
|
|
||||||
virtual ~AsynchronousMetrics();
|
virtual ~AsynchronousMetrics();
|
||||||
|
|
||||||
@ -91,6 +93,7 @@ private:
|
|||||||
virtual void logImpl(AsynchronousMetricValues &) {}
|
virtual void logImpl(AsynchronousMetricValues &) {}
|
||||||
|
|
||||||
ProtocolServerMetricsFunc protocol_server_metrics_func;
|
ProtocolServerMetricsFunc protocol_server_metrics_func;
|
||||||
|
std::shared_ptr<ICgroupsReader> cgroups_reader;
|
||||||
|
|
||||||
std::unique_ptr<ThreadFromGlobalPool> thread;
|
std::unique_ptr<ThreadFromGlobalPool> thread;
|
||||||
|
|
||||||
|
@ -17,13 +17,6 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <optional>
|
#include <optional>
|
||||||
|
|
||||||
#include "config.h"
|
|
||||||
#if USE_JEMALLOC
|
|
||||||
# include <jemalloc/jemalloc.h>
|
|
||||||
#define STRINGIFY_HELPER(x) #x
|
|
||||||
#define STRINGIFY(x) STRINGIFY_HELPER(x)
|
|
||||||
#endif
|
|
||||||
|
|
||||||
using namespace DB;
|
using namespace DB;
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
|
|
||||||
@ -155,15 +148,21 @@ std::optional<std::string> getCgroupsV1Path()
|
|||||||
return {default_cgroups_mount / "memory"};
|
return {default_cgroups_mount / "memory"};
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<std::string, CgroupsMemoryUsageObserver::CgroupsVersion> getCgroupsPath()
|
enum class CgroupsVersion : uint8_t
|
||||||
|
{
|
||||||
|
V1,
|
||||||
|
V2
|
||||||
|
};
|
||||||
|
|
||||||
|
std::pair<std::string, CgroupsVersion> getCgroupsPath()
|
||||||
{
|
{
|
||||||
auto v2_path = getCgroupsV2Path();
|
auto v2_path = getCgroupsV2Path();
|
||||||
if (v2_path.has_value())
|
if (v2_path.has_value())
|
||||||
return {*v2_path, CgroupsMemoryUsageObserver::CgroupsVersion::V2};
|
return {*v2_path, CgroupsVersion::V2};
|
||||||
|
|
||||||
auto v1_path = getCgroupsV1Path();
|
auto v1_path = getCgroupsV1Path();
|
||||||
if (v1_path.has_value())
|
if (v1_path.has_value())
|
||||||
return {*v1_path, CgroupsMemoryUsageObserver::CgroupsVersion::V1};
|
return {*v1_path, CgroupsVersion::V1};
|
||||||
|
|
||||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot find cgroups v1 or v2 current memory file");
|
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot find cgroups v1 or v2 current memory file");
|
||||||
}
|
}
|
||||||
@ -173,22 +172,29 @@ std::pair<std::string, CgroupsMemoryUsageObserver::CgroupsVersion> getCgroupsPat
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
|
std::shared_ptr<ICgroupsReader> createCgroupsReader()
|
||||||
: log(getLogger("CgroupsMemoryUsageObserver")), wait_time(wait_time_)
|
|
||||||
{
|
{
|
||||||
const auto [cgroup_path, version] = getCgroupsPath();
|
const auto [cgroup_path, version] = getCgroupsPath();
|
||||||
|
LOG_INFO(
|
||||||
|
getLogger("CgroupsReader"),
|
||||||
|
"Will create cgroup reader from '{}' (cgroups version: {})",
|
||||||
|
cgroup_path,
|
||||||
|
(version == CgroupsVersion::V1) ? "v1" : "v2");
|
||||||
|
|
||||||
if (version == CgroupsVersion::V2)
|
if (version == CgroupsVersion::V2)
|
||||||
cgroup_reader = std::make_unique<CgroupsV2Reader>(cgroup_path);
|
return std::make_shared<CgroupsV2Reader>(cgroup_path);
|
||||||
else
|
else
|
||||||
cgroup_reader = std::make_unique<CgroupsV1Reader>(cgroup_path);
|
{
|
||||||
|
chassert(version == CgroupsVersion::V1);
|
||||||
|
return std::make_shared<CgroupsV1Reader>(cgroup_path);
|
||||||
|
}
|
||||||
|
|
||||||
LOG_INFO(
|
}
|
||||||
log,
|
|
||||||
"Will read the current memory usage from '{}' (cgroups version: {}), wait time is {} sec",
|
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_, std::shared_ptr<ICgroupsReader> cgroups_reader_)
|
||||||
cgroup_path,
|
: log(getLogger("CgroupsMemoryUsageObserver")), wait_time(wait_time_), cgroups_reader(std::move(cgroups_reader_))
|
||||||
(version == CgroupsVersion::V1) ? "v1" : "v2",
|
{
|
||||||
wait_time.count());
|
cgroups_reader = createCgroupsReader();
|
||||||
}
|
}
|
||||||
|
|
||||||
CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
|
CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
|
||||||
@ -196,58 +202,6 @@ CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
|
|||||||
stopThread();
|
stopThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
void CgroupsMemoryUsageObserver::setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_)
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> limit_lock(limit_mutex);
|
|
||||||
|
|
||||||
if (hard_limit_ == hard_limit && soft_limit_ == soft_limit)
|
|
||||||
return;
|
|
||||||
|
|
||||||
hard_limit = hard_limit_;
|
|
||||||
soft_limit = soft_limit_;
|
|
||||||
|
|
||||||
on_hard_limit = [this, hard_limit_](bool up)
|
|
||||||
{
|
|
||||||
if (up)
|
|
||||||
{
|
|
||||||
LOG_WARNING(log, "Exceeded hard memory limit ({})", ReadableSize(hard_limit_));
|
|
||||||
|
|
||||||
/// Update current usage in memory tracker. Also reset free_memory_in_allocator_arenas to zero though we don't know if they are
|
|
||||||
/// really zero. Trying to avoid OOM ...
|
|
||||||
MemoryTracker::setRSS(hard_limit_, 0);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG_INFO(log, "Dropped below hard memory limit ({})", ReadableSize(hard_limit_));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
on_soft_limit = [this, soft_limit_](bool up)
|
|
||||||
{
|
|
||||||
if (up)
|
|
||||||
{
|
|
||||||
LOG_WARNING(log, "Exceeded soft memory limit ({})", ReadableSize(soft_limit_));
|
|
||||||
|
|
||||||
# if USE_JEMALLOC
|
|
||||||
LOG_INFO(log, "Purging jemalloc arenas");
|
|
||||||
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
|
|
||||||
# endif
|
|
||||||
/// Reset current usage in memory tracker. Expect zero for free_memory_in_allocator_arenas as we just purged them.
|
|
||||||
uint64_t memory_usage = cgroup_reader->readMemoryUsage();
|
|
||||||
LOG_TRACE(log, "Read current memory usage {} bytes ({}) from cgroups", memory_usage, ReadableSize(memory_usage));
|
|
||||||
MemoryTracker::setRSS(memory_usage, 0);
|
|
||||||
|
|
||||||
LOG_INFO(log, "Purged jemalloc arenas. Current memory usage is {}", ReadableSize(memory_usage));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG_INFO(log, "Dropped below soft memory limit ({})", ReadableSize(soft_limit_));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
LOG_INFO(log, "Set new limits, soft limit: {}, hard limit: {}", ReadableSize(soft_limit_), ReadableSize(hard_limit_));
|
|
||||||
}
|
|
||||||
|
|
||||||
void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_)
|
void CgroupsMemoryUsageObserver::setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
|
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
|
||||||
@ -301,35 +255,6 @@ void CgroupsMemoryUsageObserver::runThread()
|
|||||||
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
|
std::lock_guard<std::mutex> memory_amount_available_changed_lock(memory_amount_available_changed_mutex);
|
||||||
on_memory_amount_available_changed();
|
on_memory_amount_available_changed();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> limit_lock(limit_mutex);
|
|
||||||
if (soft_limit > 0 && hard_limit > 0)
|
|
||||||
{
|
|
||||||
uint64_t memory_usage = cgroup_reader->readMemoryUsage();
|
|
||||||
LOG_TRACE(log, "Read current memory usage {} bytes ({}) from cgroups", memory_usage, ReadableSize(memory_usage));
|
|
||||||
if (memory_usage > hard_limit)
|
|
||||||
{
|
|
||||||
if (last_memory_usage <= hard_limit)
|
|
||||||
on_hard_limit(true);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (last_memory_usage > hard_limit)
|
|
||||||
on_hard_limit(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (memory_usage > soft_limit)
|
|
||||||
{
|
|
||||||
if (last_memory_usage <= soft_limit)
|
|
||||||
on_soft_limit(true);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (last_memory_usage > soft_limit)
|
|
||||||
on_soft_limit(false);
|
|
||||||
}
|
|
||||||
last_memory_usage = memory_usage;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
@ -16,6 +16,8 @@ struct ICgroupsReader
|
|||||||
virtual uint64_t readMemoryUsage() = 0;
|
virtual uint64_t readMemoryUsage() = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
std::shared_ptr<ICgroupsReader> createCgroupsReader();
|
||||||
|
|
||||||
/// Does two things:
|
/// Does two things:
|
||||||
/// 1. Periodically reads the memory usage of the process from Linux cgroups.
|
/// 1. Periodically reads the memory usage of the process from Linux cgroups.
|
||||||
/// You can specify soft or hard memory limits:
|
/// You can specify soft or hard memory limits:
|
||||||
@ -35,19 +37,11 @@ struct ICgroupsReader
|
|||||||
class CgroupsMemoryUsageObserver
|
class CgroupsMemoryUsageObserver
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using OnMemoryLimitFn = std::function<void(bool)>;
|
|
||||||
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
|
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
|
||||||
|
|
||||||
enum class CgroupsVersion : uint8_t
|
explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_, std::shared_ptr<ICgroupsReader> cgroups_reader_);
|
||||||
{
|
|
||||||
V1,
|
|
||||||
V2
|
|
||||||
};
|
|
||||||
|
|
||||||
explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_);
|
|
||||||
~CgroupsMemoryUsageObserver();
|
~CgroupsMemoryUsageObserver();
|
||||||
|
|
||||||
void setMemoryUsageLimits(uint64_t hard_limit_, uint64_t soft_limit_);
|
|
||||||
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_);
|
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_);
|
||||||
|
|
||||||
void startThread();
|
void startThread();
|
||||||
@ -58,22 +52,17 @@ private:
|
|||||||
const std::chrono::seconds wait_time;
|
const std::chrono::seconds wait_time;
|
||||||
|
|
||||||
std::mutex limit_mutex;
|
std::mutex limit_mutex;
|
||||||
size_t hard_limit TSA_GUARDED_BY(limit_mutex) = 0;
|
|
||||||
size_t soft_limit TSA_GUARDED_BY(limit_mutex) = 0;
|
|
||||||
OnMemoryLimitFn on_hard_limit TSA_GUARDED_BY(limit_mutex);
|
|
||||||
OnMemoryLimitFn on_soft_limit TSA_GUARDED_BY(limit_mutex);
|
|
||||||
|
|
||||||
std::mutex memory_amount_available_changed_mutex;
|
std::mutex memory_amount_available_changed_mutex;
|
||||||
OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed TSA_GUARDED_BY(memory_amount_available_changed_mutex);
|
OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed TSA_GUARDED_BY(memory_amount_available_changed_mutex);
|
||||||
|
|
||||||
uint64_t last_memory_usage = 0; /// how much memory does the process use
|
|
||||||
uint64_t last_available_memory_amount; /// how much memory can the process use
|
uint64_t last_available_memory_amount; /// how much memory can the process use
|
||||||
|
|
||||||
void stopThread();
|
void stopThread();
|
||||||
|
|
||||||
void runThread();
|
void runThread();
|
||||||
|
|
||||||
std::unique_ptr<ICgroupsReader> cgroup_reader;
|
std::shared_ptr<ICgroupsReader> cgroups_reader;
|
||||||
|
|
||||||
std::mutex thread_mutex;
|
std::mutex thread_mutex;
|
||||||
std::condition_variable cond;
|
std::condition_variable cond;
|
||||||
|
@ -26,7 +26,6 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <cmath>
|
|
||||||
#include <random>
|
#include <random>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <string>
|
#include <string>
|
||||||
@ -123,7 +122,7 @@ static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
|
|||||||
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
|
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
|
||||||
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false);
|
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false);
|
||||||
|
|
||||||
std::atomic<Int64> MemoryTracker::free_memory_in_allocator_arenas;
|
std::atomic<bool> MemoryTracker::has_free_memory_in_allocator_arenas;
|
||||||
|
|
||||||
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
|
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
|
||||||
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
|
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
|
||||||
@ -204,7 +203,7 @@ void MemoryTracker::debugLogBigAllocationWithoutCheck(Int64 size [[maybe_unused]
|
|||||||
LOG_TEST(getLogger("MemoryTracker"), "Too big allocation ({} bytes) without checking memory limits, "
|
LOG_TEST(getLogger("MemoryTracker"), "Too big allocation ({} bytes) without checking memory limits, "
|
||||||
"it may lead to OOM. Stack trace: {}", size, StackTrace().toString());
|
"it may lead to OOM. Stack trace: {}", size, StackTrace().toString());
|
||||||
#else
|
#else
|
||||||
return; /// Avoid trash logging in release builds
|
/// Avoid trash logging in release builds
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -294,33 +293,18 @@ AllocationTrace MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceed
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Int64 limit_to_check = current_hard_limit;
|
|
||||||
|
|
||||||
#if USE_JEMALLOC
|
#if USE_JEMALLOC
|
||||||
if (level == VariableContext::Global && allow_use_jemalloc_memory.load(std::memory_order_relaxed))
|
if (level == VariableContext::Global && will_be > soft_limit.load(std::memory_order_relaxed)
|
||||||
{
|
&& has_free_memory_in_allocator_arenas.exchange(false))
|
||||||
/// Jemalloc arenas may keep some extra memory.
|
|
||||||
/// This memory was substucted from RSS to decrease memory drift.
|
|
||||||
/// In case memory is close to limit, try to pugre the arenas.
|
|
||||||
/// This is needed to avoid OOM, because some allocations are directly done with mmap.
|
|
||||||
Int64 current_free_memory_in_allocator_arenas = free_memory_in_allocator_arenas.load(std::memory_order_relaxed);
|
|
||||||
|
|
||||||
if (current_free_memory_in_allocator_arenas > 0 && current_hard_limit && current_free_memory_in_allocator_arenas + will_be > current_hard_limit)
|
|
||||||
{
|
|
||||||
if (free_memory_in_allocator_arenas.exchange(-current_free_memory_in_allocator_arenas) > 0)
|
|
||||||
{
|
{
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
|
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
|
||||||
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
|
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurge);
|
||||||
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds());
|
ProfileEvents::increment(ProfileEvents::MemoryAllocatorPurgeTimeMicroseconds, watch.elapsedMicroseconds());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
limit_to_check += abs(current_free_memory_in_allocator_arenas);
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (unlikely(current_hard_limit && will_be > limit_to_check))
|
if (unlikely(current_hard_limit && will_be > current_hard_limit))
|
||||||
{
|
{
|
||||||
if (memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
|
if (memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
|
||||||
{
|
{
|
||||||
@ -526,11 +510,11 @@ void MemoryTracker::reset()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void MemoryTracker::setRSS(Int64 rss_, Int64 free_memory_in_allocator_arenas_)
|
void MemoryTracker::setRSS(Int64 rss_, bool has_free_memory_in_allocator_arenas_)
|
||||||
{
|
{
|
||||||
Int64 new_amount = rss_;
|
Int64 new_amount = rss_;
|
||||||
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
|
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
|
||||||
free_memory_in_allocator_arenas.store(free_memory_in_allocator_arenas_, std::memory_order_relaxed);
|
has_free_memory_in_allocator_arenas.store(has_free_memory_in_allocator_arenas_, std::memory_order_relaxed);
|
||||||
|
|
||||||
auto metric_loaded = total_memory_tracker.metric.load(std::memory_order_relaxed);
|
auto metric_loaded = total_memory_tracker.metric.load(std::memory_order_relaxed);
|
||||||
if (metric_loaded != CurrentMetrics::end())
|
if (metric_loaded != CurrentMetrics::end())
|
||||||
|
@ -59,7 +59,7 @@ private:
|
|||||||
std::atomic<Int64> profiler_limit {0};
|
std::atomic<Int64> profiler_limit {0};
|
||||||
std::atomic_bool allow_use_jemalloc_memory {true};
|
std::atomic_bool allow_use_jemalloc_memory {true};
|
||||||
|
|
||||||
static std::atomic<Int64> free_memory_in_allocator_arenas;
|
static std::atomic<bool> has_free_memory_in_allocator_arenas;
|
||||||
|
|
||||||
Int64 profiler_step = 0;
|
Int64 profiler_step = 0;
|
||||||
|
|
||||||
@ -252,7 +252,7 @@ public:
|
|||||||
/// Reset current counter to an RSS value.
|
/// Reset current counter to an RSS value.
|
||||||
/// Jemalloc may have pre-allocated arenas, they are accounted in RSS.
|
/// Jemalloc may have pre-allocated arenas, they are accounted in RSS.
|
||||||
/// We can free this arenas in case of exception to avoid OOM.
|
/// We can free this arenas in case of exception to avoid OOM.
|
||||||
static void setRSS(Int64 rss_, Int64 free_memory_in_allocator_arenas_);
|
static void setRSS(Int64 rss_, bool has_free_memory_in_allocator_arenas_);
|
||||||
|
|
||||||
/// Prints info about peak memory consumption into log.
|
/// Prints info about peak memory consumption into log.
|
||||||
void logPeakMemoryUsage();
|
void logPeakMemoryUsage();
|
||||||
|
@ -115,7 +115,7 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
|
|||||||
|
|
||||||
KeeperAsynchronousMetrics::KeeperAsynchronousMetrics(
|
KeeperAsynchronousMetrics::KeeperAsynchronousMetrics(
|
||||||
ContextPtr context_, unsigned update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
ContextPtr context_, unsigned update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
||||||
: AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_), context(std::move(context_))
|
: AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_, context_->getCgroupsReader()), context(std::move(context_))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <Poco/UUID.h>
|
#include <Poco/UUID.h>
|
||||||
#include <Poco/Util/Application.h>
|
#include <Poco/Util/Application.h>
|
||||||
#include <Common/AsyncLoader.h>
|
#include <Common/AsyncLoader.h>
|
||||||
|
#include <Common/CgroupsMemoryUsageObserver.h>
|
||||||
#include <Common/PoolId.h>
|
#include <Common/PoolId.h>
|
||||||
#include <Common/SensitiveDataMasker.h>
|
#include <Common/SensitiveDataMasker.h>
|
||||||
#include <Common/Macros.h>
|
#include <Common/Macros.h>
|
||||||
@ -405,6 +406,8 @@ struct ContextSharedPart : boost::noncopyable
|
|||||||
std::unique_ptr<ClusterDiscovery> cluster_discovery TSA_GUARDED_BY(clusters_mutex);
|
std::unique_ptr<ClusterDiscovery> cluster_discovery TSA_GUARDED_BY(clusters_mutex);
|
||||||
size_t clusters_version TSA_GUARDED_BY(clusters_mutex) = 0;
|
size_t clusters_version TSA_GUARDED_BY(clusters_mutex) = 0;
|
||||||
|
|
||||||
|
std::shared_ptr<ICgroupsReader> cgroups_reader;
|
||||||
|
|
||||||
/// No lock required for async_insert_queue modified only during initialization
|
/// No lock required for async_insert_queue modified only during initialization
|
||||||
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
|
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
|
||||||
|
|
||||||
@ -5635,6 +5638,16 @@ const ServerSettings & Context::getServerSettings() const
|
|||||||
return shared->server_settings;
|
return shared->server_settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Context::setCgroupsReader(std::shared_ptr<ICgroupsReader> cgroups_reader_)
|
||||||
|
{
|
||||||
|
shared->cgroups_reader = std::move(cgroups_reader_);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<ICgroupsReader> Context::getCgroupsReader() const
|
||||||
|
{
|
||||||
|
return shared->cgroups_reader;
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t HTTPContext::getMaxHstsAge() const
|
uint64_t HTTPContext::getMaxHstsAge() const
|
||||||
{
|
{
|
||||||
return context->getSettingsRef().hsts_max_age;
|
return context->getSettingsRef().hsts_max_age;
|
||||||
|
@ -150,6 +150,7 @@ class ServerType;
|
|||||||
template <class Queue>
|
template <class Queue>
|
||||||
class MergeTreeBackgroundExecutor;
|
class MergeTreeBackgroundExecutor;
|
||||||
class AsyncLoader;
|
class AsyncLoader;
|
||||||
|
struct ICgroupsReader;
|
||||||
|
|
||||||
struct TemporaryTableHolder;
|
struct TemporaryTableHolder;
|
||||||
using TemporaryTablesMapping = std::map<String, std::shared_ptr<TemporaryTableHolder>>;
|
using TemporaryTablesMapping = std::map<String, std::shared_ptr<TemporaryTableHolder>>;
|
||||||
@ -1344,6 +1345,9 @@ public:
|
|||||||
|
|
||||||
const ServerSettings & getServerSettings() const;
|
const ServerSettings & getServerSettings() const;
|
||||||
|
|
||||||
|
void setCgroupsReader(std::shared_ptr<ICgroupsReader> cgroups_reader_);
|
||||||
|
std::shared_ptr<ICgroupsReader> getCgroupsReader() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<const SettingsConstraintsAndProfileIDs> getSettingsConstraintsAndCurrentProfilesWithLock() const;
|
std::shared_ptr<const SettingsConstraintsAndProfileIDs> getSettingsConstraintsAndCurrentProfilesWithLock() const;
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ ServerAsynchronousMetrics::ServerAsynchronousMetrics(
|
|||||||
unsigned heavy_metrics_update_period_seconds,
|
unsigned heavy_metrics_update_period_seconds,
|
||||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
||||||
: WithContext(global_context_)
|
: WithContext(global_context_)
|
||||||
, AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_)
|
, AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_, getContext()->getCgroupsReader())
|
||||||
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
|
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
|
||||||
{
|
{
|
||||||
/// sanity check
|
/// sanity check
|
||||||
|
Loading…
Reference in New Issue
Block a user