Merge pull request #40732 from azat/thread-status-fix-leak

Fix memory leak while pushing to MVs w/o query context (from Kafka/...)
This commit is contained in:
Maksim Kita 2022-08-29 19:36:25 +02:00 committed by GitHub
commit 88141cae98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 46 additions and 49 deletions

View File

@ -200,9 +200,9 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMiss);
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
ThreadGroupStatusPtr running_group;
if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup())
running_group = CurrentThread::get().getThreadGroup();
ContextPtr query_context;
if (CurrentThread::isInitialized())
@ -212,12 +212,17 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
{
ThreadStatus thread_status;
if (query_context)
thread_status.attachQueryContext(query_context);
SCOPE_EXIT({
if (running_group)
thread_status.detachQuery();
});
if (running_group)
thread_status.attachQuery(running_group);
if (query_context)
thread_status.attachQueryContext(query_context);
setThreadName("ThreadPoolRead");
Stopwatch watch(CLOCK_MONOTONIC);
@ -252,9 +257,6 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
ProfileEvents::increment(ProfileEvents::ThreadPoolReaderPageCacheMissElapsedMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::DiskReadElapsedMicroseconds, watch.elapsedMicroseconds());
if (running_group)
thread_status.detachQuery();
return Result{ .size = bytes_read, .offset = request.ignore };
});

View File

@ -40,9 +40,9 @@ ThreadPoolRemoteFSReader::ThreadPoolRemoteFSReader(size_t pool_size, size_t queu
std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Request request)
{
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
ThreadGroupStatusPtr running_group;
if (CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup())
running_group = CurrentThread::get().getThreadGroup();
ContextPtr query_context;
if (CurrentThread::isInitialized())
@ -52,6 +52,11 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
{
ThreadStatus thread_status;
SCOPE_EXIT({
if (running_group)
thread_status.detachQuery();
});
/// To be able to pass ProfileEvents.
if (running_group)
thread_status.attachQuery(running_group);
@ -67,25 +72,13 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
Stopwatch watch(CLOCK_MONOTONIC);
Result result;
try
{
result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
}
catch (...)
{
if (running_group)
CurrentThread::detachQuery();
throw;
}
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch.stop();
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderTaskMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, result.offset ? result.size - result.offset : result.size);
thread_status.detachQuery(/* if_not_detached */true);
return Result{ .size = result.size, .offset = result.offset };
});

View File

@ -86,10 +86,6 @@ void WriteBufferFromS3::nextImpl()
size_t size = offset();
temporary_buffer->write(working_buffer.begin(), size);
ThreadGroupStatusPtr running_group = CurrentThread::isInitialized() && CurrentThread::get().getThreadGroup()
? CurrentThread::get().getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Bytes, offset());
last_part_size += offset();
if (write_settings.remote_throttler)

View File

@ -406,6 +406,11 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
query_context.reset();
thread_trace_context.trace_id = 0;
thread_trace_context.span_id = 0;
/// Avoid leaking of ThreadGroupStatus::finished_threads_counters_memory
/// (this is in case someone uses system thread but did not call getProfileEventsCountersAndMemoryForThreads())
thread_group->getProfileEventsCountersAndMemoryForThreads();
thread_group.reset();
thread_state = thread_exits ? ThreadState::Died : ThreadState::DetachedFromQuery;

View File

@ -238,29 +238,30 @@ Chain buildPushingToViewsChain(
ASTPtr query;
Chain out;
/// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or
/// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics
std::unique_ptr<ThreadStatus> view_thread_status_ptr = nullptr;
ThreadGroupStatusPtr running_group;
if (current_thread && current_thread->getThreadGroup())
running_group = current_thread->getThreadGroup();
else
running_group = std::make_shared<ThreadGroupStatus>();
ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup()
? current_thread->getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
if (running_group)
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
std::unique_ptr<ThreadStatus> view_thread_status_ptr = std::make_unique<ThreadStatus>();
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
view_thread_status_ptr->disableProfiling();
/// view_thread_status_ptr will be moved later (on and on), so need to capture raw pointer.
view_thread_status_ptr->deleter = [thread_status = view_thread_status_ptr.get(), running_group]
{
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
view_thread_status_ptr = std::make_unique<ThreadStatus>();
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
view_thread_status_ptr->disableProfiling();
view_thread_status_ptr->attachQuery(running_group);
}
thread_status->detachQuery();
};
view_thread_status_ptr->attachQuery(running_group);
auto runtime_stats = std::make_unique<QueryViewsLogElement::ViewRuntimeStats>();
runtime_stats->target_name = database_table.getFullTableName();