Merge pull request #10362 from ClickHouse/rework-total-memory-tracker

Rework total memory tracker
This commit is contained in:
alexey-milovidov 2020-04-20 06:16:49 +03:00 committed by GitHub
commit 7e5b044e67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 72 additions and 55 deletions

View File

@ -81,6 +81,7 @@ namespace CurrentMetrics
{
extern const Metric Revision;
extern const Metric VersionInteger;
extern const Metric MemoryTracking;
}
namespace
@ -555,6 +556,28 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setFormatSchemaPath(format_schema_path.path());
format_schema_path.createDirectories();
/// Limit on total memory usage
size_t max_server_memory_usage = settings.max_server_memory_usage;
double max_server_memory_usage_to_ram_ratio = config().getDouble("max_server_memory_usage_to_ram_ratio", 0.9);
size_t default_max_server_memory_usage = memory_amount * max_server_memory_usage_to_ram_ratio;
if (max_server_memory_usage == 0)
{
max_server_memory_usage = default_max_server_memory_usage;
LOG_INFO(log, "Setting max_server_memory_usage was set to " << formatReadableSizeWithBinarySuffix(max_server_memory_usage));
}
else if (max_server_memory_usage > default_max_server_memory_usage)
{
max_server_memory_usage = default_max_server_memory_usage;
LOG_INFO(log, "Setting max_server_memory_usage was lowered to " << formatReadableSizeWithBinarySuffix(max_server_memory_usage)
<< " because the system has low amount of memory");
}
total_memory_tracker.setOrRaiseHardLimit(max_server_memory_usage);
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
LOG_INFO(log, "Loading metadata from " + path);
try

View File

@ -30,7 +30,7 @@
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(LeaderReplica, "Number of Replicated tables that are leaders. Leader replica is responsible for assigning merges, cleaning old blocks for deduplications and a few more bookkeeping tasks. There may be no more than one leader across all replicas at one moment of time. If there is no leader it will be elected soon or it indicate an issue.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated in currently executing queries. Note that some memory allocations may not be accounted.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MemoryTrackingInBackgroundProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for backround merges, mutations and fetches). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(MemoryTrackingInBackgroundMoveProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for backround moves). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(MemoryTrackingInBackgroundSchedulePool, "Total amount of memory (bytes) allocated in background schedule pool (that is dedicated for bookkeeping tasks of Replicated tables).") \

View File

@ -25,10 +25,16 @@ static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024;
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
MemoryTracker::~MemoryTracker()
{
if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak)
if ((level == VariableContext::Process || level == VariableContext::User) && peak)
{
try
{
@ -39,19 +45,6 @@ MemoryTracker::~MemoryTracker()
/// Exception in Logger, intentionally swallow.
}
}
/** This is needed for next memory tracker to be consistent with sum of all referring memory trackers.
*
* Sometimes, memory tracker could be destroyed before memory was freed, and on destruction, amount > 0.
* For example, a query could allocate some data and leave it in cache.
*
* If memory will be freed outside of context of this memory tracker,
* but in context of one of the 'next' memory trackers,
* then memory usage of 'next' memory trackers will be underestimated,
* because amount will be decreased twice (first - here, second - when real 'free' happens).
*/
if (auto value = amount.load(std::memory_order_relaxed))
free(value);
}
@ -62,10 +55,11 @@ void MemoryTracker::logPeakMemoryUsage() const
<< ": " << formatReadableSizeWithBinarySuffix(peak) << ".");
}
static void logMemoryUsage(Int64 amount)
void MemoryTracker::logMemoryUsage(Int64 current) const
{
LOG_DEBUG(&Logger::get("MemoryTracker"),
"Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << ".");
"Current memory usage" << (description ? " " + std::string(description) : "")
<< ": " << formatReadableSizeWithBinarySuffix(current) << ".");
}
@ -131,17 +125,24 @@ void MemoryTracker::alloc(Int64 size)
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
updatePeak(will_be);
if (auto loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size);
}
void MemoryTracker::updatePeak(Int64 will_be)
{
auto peak_old = peak.load(std::memory_order_relaxed);
if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth.
{
peak.store(will_be, std::memory_order_relaxed);
if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
if ((level == VariableContext::Process || level == VariableContext::Global)
&& will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
logMemoryUsage(will_be);
}
if (auto loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->alloc(size);
}
@ -198,6 +199,13 @@ void MemoryTracker::reset()
}
void MemoryTracker::set(Int64 to)
{
amount.store(to, std::memory_order_relaxed);
updatePeak(to);
}
void MemoryTracker::setOrRaiseHardLimit(Int64 value)
{
/// This is just atomic set to maximum.

View File

@ -13,6 +13,7 @@
*/
class MemoryTracker
{
private:
std::atomic<Int64> amount {0};
std::atomic<Int64> peak {0};
std::atomic<Int64> hard_limit {0};
@ -33,9 +34,12 @@ class MemoryTracker
/// This description will be used as prefix into log messages (if isn't nullptr)
const char * description = nullptr;
void updatePeak(Int64 will_be);
void logMemoryUsage(Int64 current) const;
public:
MemoryTracker(VariableContext level_ = VariableContext::Thread) : level(level_) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread) : parent(parent_), level(level_) {}
MemoryTracker(VariableContext level_ = VariableContext::Thread);
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
~MemoryTracker();
@ -113,6 +117,9 @@ public:
/// Reset the accumulated data and the parent.
void reset();
/// Reset current counter to a new value.
void set(Int64 to);
/// Prints info about peak memory consumption into log.
void logPeakMemoryUsage() const;
@ -120,6 +127,8 @@ public:
DB::SimpleActionBlocker blocker;
};
extern MemoryTracker total_memory_tracker;
/// Convenience methods, that use current thread's memory_tracker if it is available.
namespace CurrentMemoryTracker

View File

@ -339,7 +339,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.", 0) \
M(SettingUInt64, max_server_memory_usage, 0, "Maximum memory usage for server. Only has meaning at server startup. It can be specified only for default profile.", 0) \
M(SettingUInt64, memory_profiler_step, 0, "Every number of bytes the memory profiler will collect the allocating stack trace. The minimal effective step is 4 MiB (less values will work as clamped to 4 MiB). Zero means disabled memory profiler.", 0) \
\
M(SettingUInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.", 0) \
@ -423,6 +423,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, merge_tree_uniform_read_distribution, true, "Obsolete setting, does nothing. Will be removed after 2020-05-20", 0) \
M(SettingUInt64, mark_cache_min_lifetime, 0, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(SettingBool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -141,6 +141,11 @@ void AsynchronousMetrics::update()
set("MemoryShared", data.shared);
set("MemoryCode", data.code);
set("MemoryDataAndStack", data.data_and_stack);
/// We must update the value of total_memory_tracker periodically.
/// Otherwise it might be calculated incorrectly - it can include a "drift" of memory amount.
/// See https://github.com/ClickHouse/ClickHouse/issues/10293
total_memory_tracker.set(data.resident);
}
#endif

View File

@ -23,13 +23,10 @@ Context removeUserRestrictionsFromSettings(const Context & context, const Settin
/// Does not matter on remote servers, because queries are sent under different user.
new_settings.max_concurrent_queries_for_user = 0;
new_settings.max_memory_usage_for_user = 0;
/// This setting is really not for user and should not be sent to remote server.
new_settings.max_memory_usage_for_all_queries = 0;
/// Set as unchanged to avoid sending to remote server.
new_settings.max_concurrent_queries_for_user.changed = false;
new_settings.max_memory_usage_for_user.changed = false;
new_settings.max_memory_usage_for_all_queries.changed = false;
if (settings.force_optimize_skip_unused_shards_no_nested)
{

View File

@ -14,12 +14,6 @@
#include <chrono>
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
}
namespace DB
{
@ -68,7 +62,6 @@ static bool isUnlimitedQuery(const IAST * ast)
ProcessList::ProcessList(size_t max_size_)
: max_size(max_size_)
{
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
}
@ -171,19 +164,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
process_it->setUserProcessList(&user_process_list);
/// Limits are only raised (to be more relaxed) or set to something instead of zero,
/// because settings for different queries will interfere each other:
/// setting from one query effectively sets values for all other queries.
/// Track memory usage for all simultaneously running queries.
/// You should specify this value in configuration for default profile,
/// not for specific users, sessions or queries,
/// because this setting is effectively global.
total_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_all_queries);
total_memory_tracker.setDescription("(total)");
/// Track memory usage for all simultaneously running queries from single user.
user_process_list.user_memory_tracker.setParent(&total_memory_tracker);
user_process_list.user_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setDescription("(for user)");
@ -280,14 +261,9 @@ ProcessListEntry::~ProcessListEntry()
if (user_process_list.queries.empty())
user_process_list.resetTrackers();
/// This removes memory_tracker for all requests. At this time, no other memory_trackers live.
/// Reset throttler, similarly (see above).
if (parent.processes.empty())
{
/// Reset MemoryTracker, similarly (see above).
parent.total_memory_tracker.logPeakMemoryUsage();
parent.total_memory_tracker.reset();
parent.total_network_throttler.reset();
}
}

View File

@ -295,9 +295,6 @@ protected:
/// Stores info about queries grouped by their priority
QueryPriorities priorities;
/// Limit and counter for memory of all simultaneously running queries.
MemoryTracker total_memory_tracker{VariableContext::Global};
/// Limit network bandwidth for all users
ThrottlerPtr total_network_throttler;