Use total_memory_tracker when there is no other MemoryTracker object.

This should significantly reduce the MemoryTracking drift, test shows
that there is 0 drift after query storm (100 queries, via http/tcp/tcp
in one session).

TL;DR;

To track memory, clickhouse creates memory tracker object for each
thread **explicitly**, but until it is not created the memory
allocations are not under account.
There should not be lot of allocations w/o memory tracker, since most of
the time it is created early enough, but even this maybe enough to
trigger some problems.

Plus sometimes it is not possible to create it, for example some 3d
party library does not allow to do this explicitly:
- for example before #15740 allocations from librdkafka threads,
- or even worse, poco threads, they don't have any routines to do this.
This won't be a problem for `MemoryTracking` metric if the deallocation
will be done from the same thread w/o memory tracker (or vise versa),
but this is not always true.

NOTE, that this will slow down per-thread allocations w/o memory
tracker, since before this patch there were no memory tracking for them
while now they will be accounted in total_memory_tracker, and for
total_memory_tracker max_untracked_memory is always reached.
But this should not be significant.
This commit is contained in:
Azat Khuzhin 2020-10-18 10:32:49 +03:00
parent 840d96255e
commit 72d7b6117e
5 changed files with 96 additions and 17 deletions

View File

@ -258,7 +258,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
Poco::Logger * log = &logger();
UseSSL use_ssl;
ThreadStatus thread_status;
MainThreadStatus::getInstance();
registerFunctions();
registerAggregateFunctions();

View File

@ -13,6 +13,24 @@
#include <random>
#include <cstdlib>
namespace
{
MemoryTracker * getMemoryTracker()
{
if (auto * thread_memory_tracker = DB::CurrentThread::getMemoryTracker())
return thread_memory_tracker;
/// Once the main thread is initialized,
/// total_memory_tracker is initialized too.
/// And can be used, since MainThreadStatus is required for profiling.
if (DB::MainThreadStatus::get())
return &total_memory_tracker;
return nullptr;
}
}
namespace DB
{
@ -270,16 +288,24 @@ namespace CurrentMemoryTracker
void alloc(Int64 size)
{
if (auto * memory_tracker = DB::CurrentThread::getMemoryTracker())
if (auto * memory_tracker = getMemoryTracker())
{
current_thread->untracked_memory += size;
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
if (current_thread)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->alloc(tmp);
current_thread->untracked_memory += size;
if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be useful to enlarge Exception message in rethrow logic.
Int64 tmp = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
memory_tracker->alloc(tmp);
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->alloc(size);
}
}
}
@ -292,13 +318,21 @@ namespace CurrentMemoryTracker
void free(Int64 size)
{
if (auto * memory_tracker = DB::CurrentThread::getMemoryTracker())
if (auto * memory_tracker = getMemoryTracker())
{
current_thread->untracked_memory -= size;
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
if (current_thread)
{
memory_tracker->free(-current_thread->untracked_memory);
current_thread->untracked_memory = 0;
current_thread->untracked_memory -= size;
if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
{
memory_tracker->free(-current_thread->untracked_memory);
current_thread->untracked_memory = 0;
}
}
/// total_memory_tracker only, ignore untracked_memory
else
{
memory_tracker->free(size);
}
}
}

View File

@ -20,6 +20,7 @@ namespace ErrorCodes
thread_local ThreadStatus * current_thread = nullptr;
thread_local ThreadStatus * main_thread = nullptr;
ThreadStatus::ThreadStatus()
@ -115,4 +116,20 @@ void ThreadStatus::onFatalError()
fatal_error_callback();
}
ThreadStatus * MainThreadStatus::main_thread = nullptr;
MainThreadStatus & MainThreadStatus::getInstance()
{
static MainThreadStatus thread_status;
return thread_status;
}
MainThreadStatus::MainThreadStatus()
: ThreadStatus()
{
main_thread = current_thread;
}
MainThreadStatus::~MainThreadStatus()
{
main_thread = nullptr;
}
}

View File

@ -215,4 +215,22 @@ private:
void setupState(const ThreadGroupStatusPtr & thread_group_);
};
/**
* Creates ThreadStatus for the main thread.
*/
class MainThreadStatus : public ThreadStatus
{
public:
static MainThreadStatus & getInstance();
static ThreadStatus * get() { return main_thread; }
static bool isMainThread() { return main_thread == current_thread; }
~MainThreadStatus();
private:
MainThreadStatus();
static ThreadStatus * main_thread;
};
}

View File

@ -66,10 +66,20 @@ void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trac
char buffer[buf_size];
WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
StringRef query_id;
UInt64 thread_id;
auto thread_id = CurrentThread::get().thread_id;
if (CurrentThread::isInitialized())
{
query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
thread_id = CurrentThread::get().thread_id;
}
else
{
thread_id = MainThreadStatus::get()->thread_id;
}
writeChar(false, out); /// true if requested to stop the collecting thread.
writeStringBinary(query_id, out);