Merge pull request #3230 from yandex/fix-memory-tracker

Attempt to fix inconsistent values in total memory tracker
This commit is contained in:
alexey-milovidov 2018-10-08 10:10:21 +03:00 committed by GitHub
commit 9ff30aea55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 47 additions and 20 deletions

View File

@ -199,8 +199,10 @@ void TCPHandler::runImpl()
else
processOrdinaryQuery();
sendLogs();
/// Do it before sending end of stream, to have a chance to show log message in client.
query_scope->logPeakMemoryUsage();
sendLogs();
sendEndOfStream();
query_scope.reset();

View File

@ -75,11 +75,14 @@ public:
static void detachQuery();
static void detachQueryIfNotDetached();
/// Initializes query with current thread as master thread in constructor, and detaches it in desstructor
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor
struct QueryScope
{
explicit QueryScope(Context & query_context);
~QueryScope();
void logPeakMemoryUsage();
bool log_peak_memory_usage_in_destructor = true;
};
/// Implicitly finalizes current thread in the destructor

View File

@ -43,8 +43,7 @@ MemoryTracker::~MemoryTracker()
* then memory usage of 'next' memory trackers will be underestimated,
* because amount will be decreased twice (first - here, second - when real 'free' happens).
*/
if (auto value = amount.load(std::memory_order_relaxed))
free(value);
reset();
}
@ -74,7 +73,7 @@ void MemoryTracker::alloc(Int64 size)
*/
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
if (!parent.load(std::memory_order_relaxed))
if (metric != CurrentMetrics::end())
CurrentMetrics::add(metric, size);
Int64 current_limit = limit.load(std::memory_order_relaxed);
@ -154,7 +153,8 @@ void MemoryTracker::free(Int64 size)
if (auto loaded_next = parent.load(std::memory_order_relaxed))
loaded_next->free(size);
else
if (metric != CurrentMetrics::end())
CurrentMetrics::sub(metric, size);
}
@ -170,7 +170,8 @@ void MemoryTracker::resetCounters()
void MemoryTracker::reset()
{
if (!parent.load(std::memory_order_relaxed))
CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed));
if (auto value = amount.load(std::memory_order_relaxed))
free(value);
resetCounters();
}

View File

@ -7,12 +7,6 @@
#include <Common/VariableContext.h>
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
}
/** Tracks memory consumption.
* It throws an exception if amount of consumed memory become greater than certain limit.
* The same memory tracker could be simultaneously used in different threads.
@ -31,7 +25,7 @@ class MemoryTracker
std::atomic<MemoryTracker *> parent {};
/// You could specify custom metric to track memory usage.
CurrentMetrics::Metric metric = CurrentMetrics::MemoryTracking;
CurrentMetrics::Metric metric = CurrentMetrics::end();
/// This description will be used as prefix into log messages (if isn't nullptr)
const char * description = nullptr;

View File

@ -14,6 +14,11 @@
#include <chrono>
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
}
namespace DB
{
@ -74,6 +79,13 @@ static bool isUnlimitedQuery(const IAST * ast)
}
ProcessList::ProcessList(size_t max_size_)
: max_size(max_size_)
{
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
}
ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, Context & query_context)
{
EntryPtr res;
@ -207,7 +219,7 @@ ProcessListEntry::~ProcessListEntry()
/// Destroy all streams to avoid long lock of ProcessList
it->releaseQueryStreams();
std::lock_guard<std::mutex> lock(parent.mutex);
std::lock_guard lock(parent.mutex);
String user = it->getClientInfo().current_user;
String query_id = it->getClientInfo().current_query_id;

View File

@ -287,7 +287,7 @@ protected:
QueryStatus * tryGetProcessListElement(const String & current_query_id, const String & current_user);
public:
ProcessList(size_t max_size_ = 0) : max_size(max_size_) {}
ProcessList(size_t max_size_ = 0);
using EntryPtr = std::shared_ptr<ProcessListEntry>;

View File

@ -130,13 +130,13 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
assertState({ThreadState::AttachedToQuery}, __PRETTY_FUNCTION__);
finalizePerformanceCounters();
/// For better logging ({query_id} will be shown here)
if (thread_group && thread_group.use_count() == 1)
thread_group->memory_tracker.logPeakMemoryUsage();
/// Detach from thread group
performance_counters.setParent(&ProfileEvents::global_counters);
memory_tracker.reset();
/// Must reset pointer to thread_group's memory_tracker, because it will be destroyed two lines below.
memory_tracker.setParent(nullptr);
query_context = nullptr;
thread_group.reset();
@ -244,10 +244,19 @@ CurrentThread::QueryScope::QueryScope(Context & query_context)
CurrentThread::attachQueryContext(query_context);
}
void CurrentThread::QueryScope::logPeakMemoryUsage()
{
log_peak_memory_usage_in_destructor = false;
CurrentThread::getGroup()->memory_tracker.logPeakMemoryUsage();
}
CurrentThread::QueryScope::~QueryScope()
{
try
{
if (log_peak_memory_usage_in_destructor)
logPeakMemoryUsage();
CurrentThread::detachQueryIfNotDetached();
}
catch (...)

View File

@ -0,0 +1,3 @@
0
100000000
0

View File

@ -0,0 +1,3 @@
SELECT least(value, 0) FROM system.metrics WHERE metric = 'MemoryTracking';
SELECT length(range(100000000));
SELECT least(value, 0) FROM system.metrics WHERE metric = 'MemoryTracking';