Fix memory leak while pushing to MVs w/o query context (from Kafka/...)

While pushign to MVs, there is a low-level code that create
ThreadGroupStatus/ThreadStatus, it is required to gather some metrics
for system.query_views_log.

But, one should not use ThreadGroupStatus of the MainThreadStatus, since
this structure can hold some state, that may not be cleaned, plus this
may be racy, instead it is better to create new ThreadGroupStatus and
attach it instead.

Also this place misses detachQuery(), and because of this it leaks
ThreadGroupStatus::finished_threads_counters_memory. But it is only the
problem pushing to MVs is done w/o query context (i.e. from Kafka/...),
since when it has query context detachQuery() will be called eventually.

Before this patch series, when I've tried the reproducer with
500 MVs attached to Kafka engine (that @den-crane suggested), jemalloc
report looks like this:

    $ ../jeprof --text ~/ch/tmp/upstream/clickhouse-binary --base jeprof.44384.0.i0.heap jeprof.44384.167.i167.heap
    Using local file /home/azat/ch/tmp/upstream/clickhouse-binary.
    Using local file jeprof.44384.167.i167.heap.
    Total: 915.6 MB
       910.7  99.5%  99.5%    910.7  99.5% Snapshot (inline)
         9.5   1.0% 100.5%      9.5   1.0% std::__1::__libcpp_operator_new (inline)
         0.5   0.1% 100.6%      0.5   0.1% DB::TasksStatsCounters::create

And with focus to this place:

    $ ../jeprof --focus Snapshot --text ~/ch/tmp/upstream/clickhouse-binary --base jeprof.44384.0.i0.heap jeprof.44384.167.i167.heap
    Using local file /home/azat/ch/tmp/upstream/clickhouse-binary.
    Using local file jeprof.44384.167.i167.heap.
    Total: 915.6 MB
       910.7 100.0% 100.0%    910.7 100.0% Snapshot (inline)
         0.0   0.0% 100.0%    910.7 100.0% DB::QueryPipeline::reset
         0.0   0.0% 100.0%    910.7 100.0% DB::StorageKafka::streamToViews
         0.0   0.0% 100.0%    910.7 100.0% DB::StorageKafka::threadFunc
         0.0   0.0% 100.0%    910.7 100.0% ProfileEvents::Counters::getPartiallyAtomicSnapshot
         0.0   0.0% 100.0%    910.7 100.0% ~ThreadStatus
         0.0   0.0% 100.0%    910.7 100.0% ~ViewRuntimeData
         0.0   0.0% 100.0%    910.7 100.0% ~ViewRuntimeStats (inline)

Actually this report does not looks great (you understand it because I
stripped it), because --text does not that smart, but if you will use
--pdf for the report you will see the stacktrace (will attach pdf to the
pull request).

But after this patch series the process RSS does not goes beyond
~700MiB.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-08-28 22:27:36 +02:00
parent 6da5707f8f
commit f9812d9917

View File

@ -238,29 +238,30 @@ Chain buildPushingToViewsChain(
ASTPtr query;
Chain out;
/// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or
/// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics
std::unique_ptr<ThreadStatus> view_thread_status_ptr = nullptr;
ThreadGroupStatusPtr running_group;
if (current_thread && current_thread->getThreadGroup())
running_group = current_thread->getThreadGroup();
else
running_group = std::make_shared<ThreadGroupStatus>();
ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup()
? current_thread->getThreadGroup()
: MainThreadStatus::getInstance().getThreadGroup();
if (running_group)
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
std::unique_ptr<ThreadStatus> view_thread_status_ptr = std::make_unique<ThreadStatus>();
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
view_thread_status_ptr->disableProfiling();
/// view_thread_status_ptr will be moved later (on and on), so need to capture raw pointer.
view_thread_status_ptr->deleter = [thread_status = view_thread_status_ptr.get(), running_group]
{
/// We are creating a ThreadStatus per view to store its metrics individually
/// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls
/// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work,
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
view_thread_status_ptr = std::make_unique<ThreadStatus>();
/// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one
/// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means
/// N times more interruptions
view_thread_status_ptr->disableProfiling();
view_thread_status_ptr->attachQuery(running_group);
}
thread_status->detachQuery();
};
view_thread_status_ptr->attachQuery(running_group);
auto runtime_stats = std::make_unique<QueryViewsLogElement::ViewRuntimeStats>();
runtime_stats->target_name = database_table.getFullTableName();