mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Finish background memory thread
This commit is contained in:
parent
c413e7a494
commit
9a43183eb3
@ -11,6 +11,7 @@
|
|||||||
#include <Core/ServerUUID.h>
|
#include <Core/ServerUUID.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <Common/CgroupsMemoryUsageObserver.h>
|
#include <Common/CgroupsMemoryUsageObserver.h>
|
||||||
|
#include <Common/MemoryWorker.h>
|
||||||
#include <Common/ErrorHandlers.h>
|
#include <Common/ErrorHandlers.h>
|
||||||
#include <Common/assertProcessUserMatchesDataOwner.h>
|
#include <Common/assertProcessUserMatchesDataOwner.h>
|
||||||
#include <Common/makeSocketAddress.h>
|
#include <Common/makeSocketAddress.h>
|
||||||
@ -371,6 +372,8 @@ try
|
|||||||
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
|
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
MemoryWorker memory_worker(config().getUInt64("memory_worker_period_ms", 100));
|
||||||
|
|
||||||
static ServerErrorHandler error_handler;
|
static ServerErrorHandler error_handler;
|
||||||
Poco::ErrorHandler::set(&error_handler);
|
Poco::ErrorHandler::set(&error_handler);
|
||||||
|
|
||||||
@ -399,18 +402,6 @@ try
|
|||||||
|
|
||||||
registerDisks(/*global_skip_access_check=*/false);
|
registerDisks(/*global_skip_access_check=*/false);
|
||||||
|
|
||||||
auto cgroups_memory_observer_wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15);
|
|
||||||
try
|
|
||||||
{
|
|
||||||
auto cgroups_reader = createCgroupsReader();
|
|
||||||
global_context->setCgroupsReader(createCgroupsReader());
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
if (cgroups_memory_observer_wait_time != 0)
|
|
||||||
tryLogCurrentException(log, "Failed to create cgroups reader");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This object will periodically calculate some metrics.
|
/// This object will periodically calculate some metrics.
|
||||||
KeeperAsynchronousMetrics async_metrics(
|
KeeperAsynchronousMetrics async_metrics(
|
||||||
global_context,
|
global_context,
|
||||||
@ -634,21 +625,22 @@ try
|
|||||||
main_config_reloader->start();
|
main_config_reloader->start();
|
||||||
|
|
||||||
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
||||||
if (cgroups_memory_observer_wait_time != 0)
|
try
|
||||||
{
|
{
|
||||||
auto cgroups_reader = global_context->getCgroupsReader();
|
auto wait_time = config().getUInt64("keeper_server.cgroups_memory_observer_wait_time", 15);
|
||||||
if (cgroups_reader)
|
if (wait_time != 0)
|
||||||
{
|
{
|
||||||
cgroups_memory_usage_observer.emplace(std::chrono::seconds(cgroups_memory_observer_wait_time), global_context->getCgroupsReader());
|
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
|
||||||
/// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls
|
/// Not calling cgroups_memory_usage_observer->setLimits() here (as for the normal ClickHouse server) because Keeper controls
|
||||||
/// its memory usage by other means (via setting 'max_memory_usage_soft_limit').
|
/// its memory usage by other means (via setting 'max_memory_usage_soft_limit').
|
||||||
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
|
cgroups_memory_usage_observer->setOnMemoryAmountAvailableChangedFn([&]() { main_config_reloader->reload(); });
|
||||||
cgroups_memory_usage_observer->startThread();
|
cgroups_memory_usage_observer->startThread();
|
||||||
}
|
}
|
||||||
else
|
|
||||||
LOG_ERROR(log, "Disabling cgroup memory observer because of an error during initialization of cgroups reader");
|
|
||||||
}
|
}
|
||||||
|
catch (Exception &)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
|
||||||
|
}
|
||||||
|
|
||||||
LOG_INFO(log, "Ready for connections.");
|
LOG_INFO(log, "Ready for connections.");
|
||||||
|
|
||||||
|
@ -11,7 +11,6 @@
|
|||||||
#include <Poco/Util/HelpFormatter.h>
|
#include <Poco/Util/HelpFormatter.h>
|
||||||
#include <Poco/Environment.h>
|
#include <Poco/Environment.h>
|
||||||
#include <Poco/Config.h>
|
#include <Poco/Config.h>
|
||||||
#include <Common/Jemalloc.h>
|
|
||||||
#include <Common/scope_guard_safe.h>
|
#include <Common/scope_guard_safe.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <base/phdr_cache.h>
|
#include <base/phdr_cache.h>
|
||||||
@ -24,6 +23,7 @@
|
|||||||
#include <base/safeExit.h>
|
#include <base/safeExit.h>
|
||||||
#include <Common/PoolId.h>
|
#include <Common/PoolId.h>
|
||||||
#include <Common/MemoryTracker.h>
|
#include <Common/MemoryTracker.h>
|
||||||
|
#include <Common/MemoryWorker.h>
|
||||||
#include <Common/ClickHouseRevision.h>
|
#include <Common/ClickHouseRevision.h>
|
||||||
#include <Common/DNSResolver.h>
|
#include <Common/DNSResolver.h>
|
||||||
#include <Common/CgroupsMemoryUsageObserver.h>
|
#include <Common/CgroupsMemoryUsageObserver.h>
|
||||||
@ -110,6 +110,8 @@
|
|||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <unordered_set>
|
#include <unordered_set>
|
||||||
|
|
||||||
|
#include <Common/Jemalloc.h>
|
||||||
|
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
#include <Common/config_version.h>
|
#include <Common/config_version.h>
|
||||||
|
|
||||||
@ -591,29 +593,6 @@ void sanityChecks(Server & server)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
[[noreturn]] void backgroundMemoryThread()
|
|
||||||
{
|
|
||||||
std::mutex mutex;
|
|
||||||
std::condition_variable cv;
|
|
||||||
|
|
||||||
std::unique_lock lock(mutex);
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
cv.wait_for(lock, std::chrono::microseconds(200));
|
|
||||||
uint64_t epoch = 0;
|
|
||||||
mallctl("epoch", nullptr, nullptr, &epoch, sizeof(epoch));
|
|
||||||
auto maybe_resident = getJemallocValue<size_t>("stats.resident");
|
|
||||||
if (!maybe_resident.has_value())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
Int64 resident = *maybe_resident;
|
|
||||||
//LOG_INFO(getLogger("JEmalloc"), "Resident {}", ReadableSize(resident));
|
|
||||||
MemoryTracker::setRSS(resident, false);
|
|
||||||
if (resident > total_memory_tracker.getHardLimit())
|
|
||||||
purgeJemallocArenas();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, ContextMutablePtr context, Poco::Logger * log)
|
void loadStartupScripts(const Poco::Util::AbstractConfiguration & config, ContextMutablePtr context, Poco::Logger * log)
|
||||||
@ -905,11 +884,6 @@ try
|
|||||||
total_memory_tracker.setSampleMaxAllocationSize(server_settings.total_memory_profiler_sample_max_allocation_size);
|
total_memory_tracker.setSampleMaxAllocationSize(server_settings.total_memory_profiler_sample_max_allocation_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadFromGlobalPool background_memory_thread([]
|
|
||||||
{
|
|
||||||
backgroundMemoryThread();
|
|
||||||
});
|
|
||||||
|
|
||||||
Poco::ThreadPool server_pool(
|
Poco::ThreadPool server_pool(
|
||||||
/* minCapacity */3,
|
/* minCapacity */3,
|
||||||
/* maxCapacity */server_settings.max_connections,
|
/* maxCapacity */server_settings.max_connections,
|
||||||
@ -930,16 +904,7 @@ try
|
|||||||
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
|
LOG_INFO(log, "Background threads finished in {} ms", watch.elapsedMilliseconds());
|
||||||
});
|
});
|
||||||
|
|
||||||
try
|
MemoryWorker memory_worker(global_context->getServerSettings().memory_worker_period_ms);
|
||||||
{
|
|
||||||
auto cgroups_reader = createCgroupsReader();
|
|
||||||
global_context->setCgroupsReader(createCgroupsReader());
|
|
||||||
}
|
|
||||||
catch (...)
|
|
||||||
{
|
|
||||||
if (server_settings.cgroups_memory_usage_observer_wait_time != 0)
|
|
||||||
tryLogCurrentException(log, "Failed to create cgroups reader");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This object will periodically calculate some metrics.
|
/// This object will periodically calculate some metrics.
|
||||||
ServerAsynchronousMetrics async_metrics(
|
ServerAsynchronousMetrics async_metrics(
|
||||||
@ -1500,13 +1465,15 @@ try
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
std::optional<CgroupsMemoryUsageObserver> cgroups_memory_usage_observer;
|
||||||
if (auto wait_time = server_settings.cgroups_memory_usage_observer_wait_time; wait_time != 0)
|
try
|
||||||
{
|
{
|
||||||
auto cgroups_reader = global_context->getCgroupsReader();
|
auto wait_time = server_settings.cgroups_memory_usage_observer_wait_time;
|
||||||
if (cgroups_reader)
|
if (wait_time != 0)
|
||||||
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time), std::move(cgroups_reader));
|
cgroups_memory_usage_observer.emplace(std::chrono::seconds(wait_time));
|
||||||
else
|
}
|
||||||
LOG_ERROR(log, "Disabling cgroup memory observer because of an error during initialization of cgroups reader");
|
catch (Exception &)
|
||||||
|
{
|
||||||
|
tryLogCurrentException(log, "Disabling cgroup memory observer because of an error during initialization");
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string cert_path = config().getString("openSSL.server.certificateFile", "");
|
std::string cert_path = config().getString("openSSL.server.certificateFile", "");
|
||||||
@ -1602,8 +1569,6 @@ try
|
|||||||
background_memory_tracker.setDescription("(background)");
|
background_memory_tracker.setDescription("(background)");
|
||||||
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
|
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
|
||||||
|
|
||||||
total_memory_tracker.setAllowUseJemallocMemory(new_server_settings.allow_use_jemalloc_memory);
|
|
||||||
|
|
||||||
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
|
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
|
||||||
total_memory_tracker.setOvercommitTracker(global_overcommit_tracker);
|
total_memory_tracker.setOvercommitTracker(global_overcommit_tracker);
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#include <Common/setThreadName.h>
|
#include <Common/setThreadName.h>
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
#include <Common/filesystemHelpers.h>
|
#include <Common/filesystemHelpers.h>
|
||||||
|
#include <Common/Jemalloc.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <IO/UncompressedCache.h>
|
#include <IO/UncompressedCache.h>
|
||||||
#include <IO/MMappedFileCache.h>
|
#include <IO/MMappedFileCache.h>
|
||||||
@ -57,12 +58,10 @@ static std::unique_ptr<ReadBufferFromFilePRead> openFileIfExists(const std::stri
|
|||||||
|
|
||||||
AsynchronousMetrics::AsynchronousMetrics(
|
AsynchronousMetrics::AsynchronousMetrics(
|
||||||
unsigned update_period_seconds,
|
unsigned update_period_seconds,
|
||||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
|
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
||||||
std::shared_ptr<ICgroupsReader> cgroups_reader_)
|
|
||||||
: update_period(update_period_seconds)
|
: update_period(update_period_seconds)
|
||||||
, log(getLogger("AsynchronousMetrics"))
|
, log(getLogger("AsynchronousMetrics"))
|
||||||
, protocol_server_metrics_func(protocol_server_metrics_func_)
|
, protocol_server_metrics_func(protocol_server_metrics_func_)
|
||||||
, cgroups_reader(std::move(cgroups_reader_))
|
|
||||||
{
|
{
|
||||||
#if defined(OS_LINUX)
|
#if defined(OS_LINUX)
|
||||||
openFileIfExists("/proc/meminfo", meminfo);
|
openFileIfExists("/proc/meminfo", meminfo);
|
||||||
@ -378,23 +377,13 @@ void AsynchronousMetrics::run()
|
|||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
|
||||||
uint64_t updateJemallocEpoch()
|
|
||||||
{
|
|
||||||
uint64_t value = 0;
|
|
||||||
size_t size = sizeof(value);
|
|
||||||
mallctl("epoch", &value, &size, &value, size);
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename Value>
|
template <typename Value>
|
||||||
Value saveJemallocMetricImpl(
|
Value saveJemallocMetricImpl(
|
||||||
AsynchronousMetricValues & values,
|
AsynchronousMetricValues & values,
|
||||||
const std::string & jemalloc_full_name,
|
const std::string & jemalloc_full_name,
|
||||||
const std::string & clickhouse_full_name)
|
const std::string & clickhouse_full_name)
|
||||||
{
|
{
|
||||||
Value value{};
|
auto value = getJemallocValue<Value>(jemalloc_full_name.c_str());
|
||||||
size_t size = sizeof(value);
|
|
||||||
mallctl(jemalloc_full_name.c_str(), &value, &size, nullptr, 0);
|
|
||||||
values[clickhouse_full_name] = AsynchronousMetricValue(value, "An internal metric of the low-level memory allocator (jemalloc). See https://jemalloc.net/jemalloc.3.html");
|
values[clickhouse_full_name] = AsynchronousMetricValue(value, "An internal metric of the low-level memory allocator (jemalloc). See https://jemalloc.net/jemalloc.3.html");
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
@ -604,7 +593,7 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
|
|||||||
// 'epoch' is a special mallctl -- it updates the statistics. Without it, all
|
// 'epoch' is a special mallctl -- it updates the statistics. Without it, all
|
||||||
// the following calls will return stale values. It increments and returns
|
// the following calls will return stale values. It increments and returns
|
||||||
// the current epoch number, which might be useful to log as a sanity check.
|
// the current epoch number, which might be useful to log as a sanity check.
|
||||||
auto epoch = updateJemallocEpoch();
|
auto epoch = getJemallocValue<uint64_t>("epoch");
|
||||||
new_values["jemalloc.epoch"] = { epoch, "An internal incremental update number of the statistics of jemalloc (Jason Evans' memory allocator), used in all other `jemalloc` metrics." };
|
new_values["jemalloc.epoch"] = { epoch, "An internal incremental update number of the statistics of jemalloc (Jason Evans' memory allocator), used in all other `jemalloc` metrics." };
|
||||||
|
|
||||||
// Collect the statistics themselves.
|
// Collect the statistics themselves.
|
||||||
|
@ -69,8 +69,7 @@ public:
|
|||||||
|
|
||||||
AsynchronousMetrics(
|
AsynchronousMetrics(
|
||||||
unsigned update_period_seconds,
|
unsigned update_period_seconds,
|
||||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_,
|
const ProtocolServerMetricsFunc & protocol_server_metrics_func_);
|
||||||
std::shared_ptr<ICgroupsReader> cgroups_reader_);
|
|
||||||
|
|
||||||
virtual ~AsynchronousMetrics();
|
virtual ~AsynchronousMetrics();
|
||||||
|
|
||||||
|
@ -191,11 +191,9 @@ std::shared_ptr<ICgroupsReader> createCgroupsReader()
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_, std::shared_ptr<ICgroupsReader> cgroups_reader_)
|
CgroupsMemoryUsageObserver::CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_)
|
||||||
: log(getLogger("CgroupsMemoryUsageObserver")), wait_time(wait_time_), cgroups_reader(std::move(cgroups_reader_))
|
: log(getLogger("CgroupsMemoryUsageObserver")), wait_time(wait_time_), cgroups_reader(createCgroupsReader())
|
||||||
{
|
{}
|
||||||
cgroups_reader = createCgroupsReader();
|
|
||||||
}
|
|
||||||
|
|
||||||
CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
|
CgroupsMemoryUsageObserver::~CgroupsMemoryUsageObserver()
|
||||||
{
|
{
|
||||||
|
@ -18,15 +18,7 @@ struct ICgroupsReader
|
|||||||
|
|
||||||
std::shared_ptr<ICgroupsReader> createCgroupsReader();
|
std::shared_ptr<ICgroupsReader> createCgroupsReader();
|
||||||
|
|
||||||
/// Does two things:
|
/// Periodically reads the the maximum memory available to the process (which can change due to cgroups settings).
|
||||||
/// 1. Periodically reads the memory usage of the process from Linux cgroups.
|
|
||||||
/// You can specify soft or hard memory limits:
|
|
||||||
/// - When the soft memory limit is hit, drop jemalloc cache.
|
|
||||||
/// - When the hard memory limit is hit, update MemoryTracking metric to throw memory exceptions faster.
|
|
||||||
/// The goal of this is to avoid that the process hits the maximum allowed memory limit at which there is a good
|
|
||||||
/// chance that the Limux OOM killer terminates it. All of this is done is because internal memory tracking in
|
|
||||||
/// ClickHouse can unfortunately under-estimate the actually used memory.
|
|
||||||
/// 2. Periodically reads the the maximum memory available to the process (which can change due to cgroups settings).
|
|
||||||
/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server
|
/// You can specify a callback to react on changes. The callback typically reloads the configuration, i.e. Server
|
||||||
/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit'
|
/// or Keeper configuration file. This reloads settings 'max_server_memory_usage' (Server) and 'max_memory_usage_soft_limit'
|
||||||
/// (Keeper) from which various other internal limits are calculated, including the soft and hard limits for (1.).
|
/// (Keeper) from which various other internal limits are calculated, including the soft and hard limits for (1.).
|
||||||
@ -39,7 +31,7 @@ class CgroupsMemoryUsageObserver
|
|||||||
public:
|
public:
|
||||||
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
|
using OnMemoryAmountAvailableChangedFn = std::function<void()>;
|
||||||
|
|
||||||
explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_, std::shared_ptr<ICgroupsReader> cgroups_reader_);
|
explicit CgroupsMemoryUsageObserver(std::chrono::seconds wait_time_);
|
||||||
~CgroupsMemoryUsageObserver();
|
~CgroupsMemoryUsageObserver();
|
||||||
|
|
||||||
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_);
|
void setOnMemoryAmountAvailableChangedFn(OnMemoryAmountAvailableChangedFn on_memory_amount_available_changed_);
|
||||||
|
@ -28,28 +28,46 @@ void setJemallocValue(const char * name, T value)
|
|||||||
{
|
{
|
||||||
T old_value;
|
T old_value;
|
||||||
size_t old_value_size = sizeof(T);
|
size_t old_value_size = sizeof(T);
|
||||||
if (mallctl(name, &old_value, &old_value_size, reinterpret_cast<void*>(&value), sizeof(T)))
|
mallctl(name, &old_value, &old_value_size, reinterpret_cast<void*>(&value), sizeof(T));
|
||||||
{
|
|
||||||
LOG_WARNING(getLogger("Jemalloc"), "mallctl for {} failed", name);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_INFO(getLogger("Jemalloc"), "Value for {} set to {} (from {})", name, value, old_value);
|
LOG_INFO(getLogger("Jemalloc"), "Value for {} set to {} (from {})", name, value, old_value);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T>
|
template <typename T>
|
||||||
std::optional<T> getJemallocValue(const char * name)
|
T getJemallocValue(const char * name)
|
||||||
{
|
{
|
||||||
T value;
|
T value;
|
||||||
size_t value_size = sizeof(T);
|
size_t value_size = sizeof(T);
|
||||||
if (mallctl(name, &value, &value_size, nullptr, 0))
|
mallctl(name, &value, &value_size, nullptr, 0);
|
||||||
{
|
|
||||||
LOG_WARNING(getLogger("Jemalloc"), "mallctl for {} failed", name);
|
|
||||||
return std::nullopt;
|
|
||||||
}
|
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
struct JemallocMibCache
|
||||||
|
{
|
||||||
|
explicit JemallocMibCache(const char * name)
|
||||||
|
{
|
||||||
|
mallctlnametomib(name, mib, &mib_length);
|
||||||
|
}
|
||||||
|
|
||||||
|
void setValue(T value)
|
||||||
|
{
|
||||||
|
mallctlbymib(mib, mib_length, nullptr, nullptr, reinterpret_cast<void*>(&value), sizeof(T));
|
||||||
|
}
|
||||||
|
|
||||||
|
T getValue()
|
||||||
|
{
|
||||||
|
T value;
|
||||||
|
size_t value_size = sizeof(T);
|
||||||
|
mallctlbymib(mib, mib_length, &value, &value_size, nullptr, 0);
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
static constexpr size_t max_mib_length = 4;
|
||||||
|
size_t mib[max_mib_length];
|
||||||
|
size_t mib_length = max_mib_length;
|
||||||
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -108,8 +108,6 @@ void AllocationTrace::onFreeImpl(void * ptr, size_t size) const
|
|||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
{
|
{
|
||||||
extern const Event QueryMemoryLimitExceeded;
|
extern const Event QueryMemoryLimitExceeded;
|
||||||
extern const Event MemoryAllocatorPurge;
|
|
||||||
extern const Event MemoryAllocatorPurgeTimeMicroseconds;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
using namespace std::chrono_literals;
|
using namespace std::chrono_literals;
|
||||||
@ -119,8 +117,6 @@ static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
|
|||||||
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
|
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
|
||||||
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false);
|
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false);
|
||||||
|
|
||||||
std::atomic<bool> MemoryTracker::has_free_memory_in_allocator_arenas;
|
|
||||||
|
|
||||||
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
|
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
|
||||||
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
|
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
|
||||||
|
|
||||||
@ -500,12 +496,10 @@ void MemoryTracker::reset()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void MemoryTracker::setRSS(Int64 rss_, bool has_free_memory_in_allocator_arenas_)
|
void MemoryTracker::setRSS(Int64 rss_)
|
||||||
{
|
{
|
||||||
Int64 new_amount = rss_;
|
Int64 new_amount = rss_;
|
||||||
if (rss_)
|
|
||||||
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
|
total_memory_tracker.amount.store(new_amount, std::memory_order_relaxed);
|
||||||
has_free_memory_in_allocator_arenas.store(has_free_memory_in_allocator_arenas_, std::memory_order_relaxed);
|
|
||||||
|
|
||||||
auto metric_loaded = total_memory_tracker.metric.load(std::memory_order_relaxed);
|
auto metric_loaded = total_memory_tracker.metric.load(std::memory_order_relaxed);
|
||||||
if (metric_loaded != CurrentMetrics::end())
|
if (metric_loaded != CurrentMetrics::end())
|
||||||
|
@ -56,9 +56,6 @@ private:
|
|||||||
std::atomic<Int64> soft_limit {0};
|
std::atomic<Int64> soft_limit {0};
|
||||||
std::atomic<Int64> hard_limit {0};
|
std::atomic<Int64> hard_limit {0};
|
||||||
std::atomic<Int64> profiler_limit {0};
|
std::atomic<Int64> profiler_limit {0};
|
||||||
std::atomic_bool allow_use_jemalloc_memory {true};
|
|
||||||
|
|
||||||
static std::atomic<bool> has_free_memory_in_allocator_arenas;
|
|
||||||
|
|
||||||
Int64 profiler_step = 0;
|
Int64 profiler_step = 0;
|
||||||
|
|
||||||
@ -153,14 +150,6 @@ public:
|
|||||||
{
|
{
|
||||||
return soft_limit.load(std::memory_order_relaxed);
|
return soft_limit.load(std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
void setAllowUseJemallocMemory(bool value)
|
|
||||||
{
|
|
||||||
allow_use_jemalloc_memory.store(value, std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
bool getAllowUseJemallocMmemory() const
|
|
||||||
{
|
|
||||||
return allow_use_jemalloc_memory.load(std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Set limit if it was not set.
|
/** Set limit if it was not set.
|
||||||
* Otherwise, set limit to new value, if new value is greater than previous limit.
|
* Otherwise, set limit to new value, if new value is greater than previous limit.
|
||||||
@ -251,7 +240,7 @@ public:
|
|||||||
/// Reset current counter to an RSS value.
|
/// Reset current counter to an RSS value.
|
||||||
/// Jemalloc may have pre-allocated arenas, they are accounted in RSS.
|
/// Jemalloc may have pre-allocated arenas, they are accounted in RSS.
|
||||||
/// We can free this arenas in case of exception to avoid OOM.
|
/// We can free this arenas in case of exception to avoid OOM.
|
||||||
static void setRSS(Int64 rss_, bool has_free_memory_in_allocator_arenas_);
|
static void setRSS(Int64 rss_);
|
||||||
|
|
||||||
/// Prints info about peak memory consumption into log.
|
/// Prints info about peak memory consumption into log.
|
||||||
void logPeakMemoryUsage();
|
void logPeakMemoryUsage();
|
||||||
|
49
src/Common/MemoryWorker.cpp
Normal file
49
src/Common/MemoryWorker.cpp
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
#include "Common/ThreadPool.h"
|
||||||
|
#include <Common/MemoryWorker.h>
|
||||||
|
|
||||||
|
#include <Common/Jemalloc.h>
|
||||||
|
#include <Common/MemoryTracker.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
#if USE_JEMALLOC
|
||||||
|
MemoryWorker::MemoryWorker(uint64_t period_ms_)
|
||||||
|
: period_ms(period_ms_)
|
||||||
|
{
|
||||||
|
background_thread = ThreadFromGlobalPool([this] { backgroundThread(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
MemoryWorker::~MemoryWorker()
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::unique_lock lock(mutex);
|
||||||
|
shutdown = true;
|
||||||
|
}
|
||||||
|
cv.notify_all();
|
||||||
|
|
||||||
|
if (background_thread.joinable())
|
||||||
|
background_thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
void MemoryWorker::backgroundThread()
|
||||||
|
{
|
||||||
|
JemallocMibCache<uint64_t> epoch_mib("epoch");
|
||||||
|
JemallocMibCache<size_t> resident_mib("stats.resident");
|
||||||
|
std::unique_lock lock(mutex);
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
cv.wait_for(lock, period_ms, [this] { return shutdown; });
|
||||||
|
if (shutdown)
|
||||||
|
return;
|
||||||
|
|
||||||
|
epoch_mib.setValue(0);
|
||||||
|
Int64 resident = resident_mib.getValue();
|
||||||
|
MemoryTracker::setRSS(resident);
|
||||||
|
if (resident > total_memory_tracker.getHardLimit())
|
||||||
|
purgeJemallocArenas();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
}
|
34
src/Common/MemoryWorker.h
Normal file
34
src/Common/MemoryWorker.h
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <Common/ThreadPool.h>
|
||||||
|
|
||||||
|
#include "config.h"
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
#if USE_JEMALLOC
|
||||||
|
class MemoryWorker
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit MemoryWorker(uint64_t period_ms_);
|
||||||
|
|
||||||
|
~MemoryWorker();
|
||||||
|
private:
|
||||||
|
void backgroundThread();
|
||||||
|
|
||||||
|
ThreadFromGlobalPool background_thread;
|
||||||
|
|
||||||
|
std::mutex mutex;
|
||||||
|
std::condition_variable cv;
|
||||||
|
bool shutdown = false;
|
||||||
|
|
||||||
|
std::chrono::milliseconds period_ms;
|
||||||
|
};
|
||||||
|
#else
|
||||||
|
class MemoryWorker
|
||||||
|
{
|
||||||
|
};
|
||||||
|
#endif
|
||||||
|
|
||||||
|
}
|
@ -115,7 +115,7 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
|
|||||||
|
|
||||||
KeeperAsynchronousMetrics::KeeperAsynchronousMetrics(
|
KeeperAsynchronousMetrics::KeeperAsynchronousMetrics(
|
||||||
ContextPtr context_, unsigned update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
ContextPtr context_, unsigned update_period_seconds, const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
||||||
: AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_, context_->getCgroupsReader()), context(std::move(context_))
|
: AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_), context(std::move(context_))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,6 +157,7 @@ namespace DB
|
|||||||
M(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \
|
M(Bool, prepare_system_log_tables_on_startup, false, "If true, ClickHouse creates all configured `system.*_log` tables before the startup. It can be helpful if some startup scripts depend on these tables.", 0) \
|
||||||
M(Double, gwp_asan_force_sample_probability, 0.0003, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \
|
M(Double, gwp_asan_force_sample_probability, 0.0003, "Probability that an allocation from specific places will be sampled by GWP Asan (i.e. PODArray allocations)", 0) \
|
||||||
M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \
|
M(UInt64, config_reload_interval_ms, 2000, "How often clickhouse will reload config and check for new changes", 0) \
|
||||||
|
M(UInt64, memory_worker_period_ms, 100, "Period of background memory worker which corrects memory tracker memory usages and cleans up unused pages during higher memory usage.", 0) \
|
||||||
|
|
||||||
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp
|
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp
|
||||||
|
|
||||||
|
@ -406,8 +406,6 @@ struct ContextSharedPart : boost::noncopyable
|
|||||||
std::unique_ptr<ClusterDiscovery> cluster_discovery TSA_GUARDED_BY(clusters_mutex);
|
std::unique_ptr<ClusterDiscovery> cluster_discovery TSA_GUARDED_BY(clusters_mutex);
|
||||||
size_t clusters_version TSA_GUARDED_BY(clusters_mutex) = 0;
|
size_t clusters_version TSA_GUARDED_BY(clusters_mutex) = 0;
|
||||||
|
|
||||||
std::shared_ptr<ICgroupsReader> cgroups_reader;
|
|
||||||
|
|
||||||
/// No lock required for async_insert_queue modified only during initialization
|
/// No lock required for async_insert_queue modified only during initialization
|
||||||
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
|
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
|
||||||
|
|
||||||
@ -5631,16 +5629,6 @@ const ServerSettings & Context::getServerSettings() const
|
|||||||
return shared->server_settings;
|
return shared->server_settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Context::setCgroupsReader(std::shared_ptr<ICgroupsReader> cgroups_reader_)
|
|
||||||
{
|
|
||||||
shared->cgroups_reader = std::move(cgroups_reader_);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<ICgroupsReader> Context::getCgroupsReader() const
|
|
||||||
{
|
|
||||||
return shared->cgroups_reader;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t HTTPContext::getMaxHstsAge() const
|
uint64_t HTTPContext::getMaxHstsAge() const
|
||||||
{
|
{
|
||||||
return context->getSettingsRef().hsts_max_age;
|
return context->getSettingsRef().hsts_max_age;
|
||||||
|
@ -1344,9 +1344,6 @@ public:
|
|||||||
|
|
||||||
const ServerSettings & getServerSettings() const;
|
const ServerSettings & getServerSettings() const;
|
||||||
|
|
||||||
void setCgroupsReader(std::shared_ptr<ICgroupsReader> cgroups_reader_);
|
|
||||||
std::shared_ptr<ICgroupsReader> getCgroupsReader() const;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<const SettingsConstraintsAndProfileIDs> getSettingsConstraintsAndCurrentProfilesWithLock() const;
|
std::shared_ptr<const SettingsConstraintsAndProfileIDs> getSettingsConstraintsAndCurrentProfilesWithLock() const;
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ ServerAsynchronousMetrics::ServerAsynchronousMetrics(
|
|||||||
unsigned heavy_metrics_update_period_seconds,
|
unsigned heavy_metrics_update_period_seconds,
|
||||||
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
const ProtocolServerMetricsFunc & protocol_server_metrics_func_)
|
||||||
: WithContext(global_context_)
|
: WithContext(global_context_)
|
||||||
, AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_, getContext()->getCgroupsReader())
|
, AsynchronousMetrics(update_period_seconds, protocol_server_metrics_func_)
|
||||||
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
|
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
|
||||||
{
|
{
|
||||||
/// sanity check
|
/// sanity check
|
||||||
|
@ -63,7 +63,6 @@ void StorageSystemServerSettings::fillData(MutableColumns & res_columns, Context
|
|||||||
/// current setting values, one needs to ask the components directly.
|
/// current setting values, one needs to ask the components directly.
|
||||||
std::unordered_map<String, std::pair<String, ChangeableWithoutRestart>> changeable_settings = {
|
std::unordered_map<String, std::pair<String, ChangeableWithoutRestart>> changeable_settings = {
|
||||||
{"max_server_memory_usage", {std::to_string(total_memory_tracker.getHardLimit()), ChangeableWithoutRestart::Yes}},
|
{"max_server_memory_usage", {std::to_string(total_memory_tracker.getHardLimit()), ChangeableWithoutRestart::Yes}},
|
||||||
{"allow_use_jemalloc_memory", {std::to_string(total_memory_tracker.getAllowUseJemallocMmemory()), ChangeableWithoutRestart::Yes}},
|
|
||||||
|
|
||||||
{"max_table_size_to_drop", {std::to_string(context->getMaxTableSizeToDrop()), ChangeableWithoutRestart::Yes}},
|
{"max_table_size_to_drop", {std::to_string(context->getMaxTableSizeToDrop()), ChangeableWithoutRestart::Yes}},
|
||||||
{"max_partition_size_to_drop", {std::to_string(context->getMaxPartitionSizeToDrop()), ChangeableWithoutRestart::Yes}},
|
{"max_partition_size_to_drop", {std::to_string(context->getMaxPartitionSizeToDrop()), ChangeableWithoutRestart::Yes}},
|
||||||
|
@ -13,7 +13,6 @@ node = cluster.add_instance(
|
|||||||
"configs/async_metrics_no.xml",
|
"configs/async_metrics_no.xml",
|
||||||
],
|
],
|
||||||
mem_limit="4g",
|
mem_limit="4g",
|
||||||
env_variables={"MALLOC_CONF": "dirty_decay_ms:0"},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user