Add settings for soft limit raising.

This commit is contained in:
Ivan Lezhankin 2020-01-21 16:53:30 +03:00
parent 1934706ca9
commit 008faaa760
8 changed files with 48 additions and 32 deletions

View File

@ -9,6 +9,7 @@
#include <common/logger_useful.h>
#include <common/singleton.h>
#include <atomic>
#include <cstdlib>
@ -84,7 +85,8 @@ void MemoryTracker::alloc(Int64 size)
if (metric != CurrentMetrics::end())
CurrentMetrics::add(metric, size);
Int64 current_limit = limit.load(std::memory_order_relaxed);
Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
Int64 current_soft_limit = soft_limit.load(std::memory_order_relaxed);
/// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform.
/// In this case, it doesn't matter.
@ -101,12 +103,19 @@ void MemoryTracker::alloc(Int64 size)
message << " " << description;
message << ": fault injected. Would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
if (unlikely(current_limit && will_be > current_limit))
if (unlikely(current_soft_limit && will_be > current_soft_limit))
{
auto no_track = blocker.cancel();
Singleton<DB::TraceCollector>()->collect(size);
setOrRaiseSoftLimit(current_soft_limit + Int64(ceil((will_be - current_soft_limit) / soft_limit_step)) * soft_limit_step);
}
if (unlikely(current_hard_limit && will_be > current_hard_limit))
{
free(size);
@ -119,7 +128,7 @@ void MemoryTracker::alloc(Int64 size)
message << " " << description;
message << " exceeded: would use " << formatReadableSizeWithBinarySuffix(will_be)
<< " (attempt to allocate chunk of " << size << " bytes)"
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
<< ", maximum: " << formatReadableSizeWithBinarySuffix(current_hard_limit);
throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
}
@ -177,7 +186,8 @@ void MemoryTracker::resetCounters()
{
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
limit.store(0, std::memory_order_relaxed);
hard_limit.store(0, std::memory_order_relaxed);
soft_limit.store(0, std::memory_order_relaxed);
}
@ -190,11 +200,20 @@ void MemoryTracker::reset()
}
void MemoryTracker::setOrRaiseLimit(Int64 value)
void MemoryTracker::setOrRaiseHardLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = limit.load(std::memory_order_relaxed);
while (old_value < value && !limit.compare_exchange_weak(old_value, value))
Int64 old_value = hard_limit.load(std::memory_order_relaxed);
while (old_value < value && !hard_limit.compare_exchange_weak(old_value, value))
;
}
void MemoryTracker::setOrRaiseSoftLimit(Int64 value)
{
/// This is just atomic set to maximum.
Int64 old_value = soft_limit.load(std::memory_order_relaxed);
while (old_value < value && !soft_limit.compare_exchange_weak(old_value, value))
;
}
@ -214,9 +233,6 @@ namespace CurrentMemoryTracker
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
auto no_track = memory_tracker->blocker.cancel();
Singleton<DB::TraceCollector>()->collect(tmp);
}
}
}

View File

@ -15,7 +15,10 @@ class MemoryTracker
{
std::atomic<Int64> amount {0};
std::atomic<Int64> peak {0};
std::atomic<Int64> limit {0};
std::atomic<Int64> hard_limit {0};
std::atomic<Int64> soft_limit {0};
Int64 soft_limit_step = 0;
/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
double fault_probability = 0;
@ -32,7 +35,6 @@ class MemoryTracker
public:
MemoryTracker(VariableContext level_ = VariableContext::Thread) : level(level_) {}
MemoryTracker(Int64 limit_, VariableContext level_ = VariableContext::Thread) : limit(limit_), level(level_) {}
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread) : parent(parent_), level(level_) {}
~MemoryTracker();
@ -66,21 +68,22 @@ public:
return peak.load(std::memory_order_relaxed);
}
void setLimit(Int64 limit_)
{
limit.store(limit_, std::memory_order_relaxed);
}
/** Set limit if it was not set.
* Otherwise, set limit to new value, if new value is greater than previous limit.
*/
void setOrRaiseLimit(Int64 value);
void setOrRaiseHardLimit(Int64 value);
void setOrRaiseSoftLimit(Int64 value);
void setFaultProbability(double value)
{
fault_probability = value;
}
void setSoftLimitStep(Int64 value)
{
soft_limit_step = value;
}
/// next should be changed only once: from nullptr to some value.
/// NOTE: It is not true in MergeListElement
void setParent(MemoryTracker * elem)

View File

@ -212,10 +212,7 @@ void TraceCollector::run()
Int64 size;
readPODBinary(size, in);
UInt64 pointer;
readPODBinary(pointer, in);
TraceLogElement element{std::time(nullptr), trace_type, thread_number, query_id, trace, size, pointer};
TraceLogElement element{std::time(nullptr), trace_type, thread_number, query_id, trace, size};
trace_log->add(element);
}
}

View File

@ -325,6 +325,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.", 0) \
M(SettingUInt64, total_memory_profiler_step, 0, "Every number of bytes the memory profiler will dump the allocating stacktrace", 0) \
\
M(SettingUInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.", 0) \
M(SettingUInt64, max_network_bytes, 0, "The maximum number of bytes (compressed) to receive or transmit over the network for execution of the query.", 0) \

View File

@ -1,3 +1,4 @@
#include "Common/quoteString.h"
#include <Common/typeid_cast.h>
#include <Common/PODArray.h>
@ -286,7 +287,7 @@ void ActionsMatcher::visit(const ASTIdentifier & identifier, const ASTPtr & ast,
found = true;
if (found)
throw Exception("Column " + column_name.get(ast) + " is not under aggregate function and not in GROUP BY.",
throw Exception("Column " + backQuote(column_name.get(ast)) + " is not under aggregate function and not in GROUP BY",
ErrorCodes::NOT_AN_AGGREGATE);
/// Special check for WITH statement alias. Add alias action to be able to use this alias.

View File

@ -181,12 +181,14 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
/// You should specify this value in configuration for default profile,
/// not for specific users, sessions or queries,
/// because this setting is effectively global.
total_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_all_queries);
total_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_all_queries);
total_memory_tracker.setOrRaiseSoftLimit(settings.total_memory_profiler_step);
total_memory_tracker.setSoftLimitStep(settings.total_memory_profiler_step);
total_memory_tracker.setDescription("(total)");
/// Track memory usage for all simultaneously running queries from single user.
user_process_list.user_memory_tracker.setParent(&total_memory_tracker);
user_process_list.user_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setDescription("(for user)");
/// Actualize thread group info
@ -198,7 +200,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
thread_group->query = process_it->query;
/// Set query-level memory trackers
thread_group->memory_tracker.setOrRaiseLimit(process_it->max_memory_usage);
thread_group->memory_tracker.setOrRaiseHardLimit(process_it->max_memory_usage);
thread_group->memory_tracker.setDescription("(for query)");
if (process_it->memory_tracker_fault_probability)
thread_group->memory_tracker.setFaultProbability(process_it->memory_tracker_fault_probability);

View File

@ -29,7 +29,6 @@ Block TraceLogElement::createBlock()
{std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()), "trace"},
{std::make_shared<DataTypeInt64>(), "size"},
{std::make_shared<DataTypeUInt64>(), "pointer"},
};
}
@ -47,7 +46,6 @@ void TraceLogElement::appendToBlock(Block & block) const
columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insert(trace);
columns[i++]->insert(size);
columns[i++]->insert(pointer);
block.setColumns(std::move(columns));
}

View File

@ -20,9 +20,7 @@ struct TraceLogElement
String query_id;
Array trace;
/// for |TraceType::MEMORY|
Int64 size; /// Allocation size in bytes. In case of deallocation should match the allocation size.
UInt64 pointer; /// Address of allocated region - to track the deallocations.
Int64 size; /// Allocation size in bytes for |TraceType::MEMORY|
static std::string name() { return "TraceLog"; }
static Block createBlock();