From f9812d9917becc4f77df48d9e35a49b1fd8a39c8 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 28 Aug 2022 22:27:36 +0200 Subject: [PATCH] Fix memory leak while pushing to MVs w/o query context (from Kafka/...) While pushign to MVs, there is a low-level code that create ThreadGroupStatus/ThreadStatus, it is required to gather some metrics for system.query_views_log. But, one should not use ThreadGroupStatus of the MainThreadStatus, since this structure can hold some state, that may not be cleaned, plus this may be racy, instead it is better to create new ThreadGroupStatus and attach it instead. Also this place misses detachQuery(), and because of this it leaks ThreadGroupStatus::finished_threads_counters_memory. But it is only the problem pushing to MVs is done w/o query context (i.e. from Kafka/...), since when it has query context detachQuery() will be called eventually. Before this patch series, when I've tried the reproducer with 500 MVs attached to Kafka engine (that @den-crane suggested), jemalloc report looks like this: $ ../jeprof --text ~/ch/tmp/upstream/clickhouse-binary --base jeprof.44384.0.i0.heap jeprof.44384.167.i167.heap Using local file /home/azat/ch/tmp/upstream/clickhouse-binary. Using local file jeprof.44384.167.i167.heap. Total: 915.6 MB 910.7 99.5% 99.5% 910.7 99.5% Snapshot (inline) 9.5 1.0% 100.5% 9.5 1.0% std::__1::__libcpp_operator_new (inline) 0.5 0.1% 100.6% 0.5 0.1% DB::TasksStatsCounters::create And with focus to this place: $ ../jeprof --focus Snapshot --text ~/ch/tmp/upstream/clickhouse-binary --base jeprof.44384.0.i0.heap jeprof.44384.167.i167.heap Using local file /home/azat/ch/tmp/upstream/clickhouse-binary. Using local file jeprof.44384.167.i167.heap. Total: 915.6 MB 910.7 100.0% 100.0% 910.7 100.0% Snapshot (inline) 0.0 0.0% 100.0% 910.7 100.0% DB::QueryPipeline::reset 0.0 0.0% 100.0% 910.7 100.0% DB::StorageKafka::streamToViews 0.0 0.0% 100.0% 910.7 100.0% DB::StorageKafka::threadFunc 0.0 0.0% 100.0% 910.7 100.0% ProfileEvents::Counters::getPartiallyAtomicSnapshot 0.0 0.0% 100.0% 910.7 100.0% ~ThreadStatus 0.0 0.0% 100.0% 910.7 100.0% ~ViewRuntimeData 0.0 0.0% 100.0% 910.7 100.0% ~ViewRuntimeStats (inline) Actually this report does not looks great (you understand it because I stripped it), because --text does not that smart, but if you will use --pdf for the report you will see the stacktrace (will attach pdf to the pull request). But after this patch series the process RSS does not goes beyond ~700MiB. Signed-off-by: Azat Khuzhin --- .../Transforms/buildPushingToViewsChain.cpp | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index d71d6901cee..e81457e379a 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -238,29 +238,30 @@ Chain buildPushingToViewsChain( ASTPtr query; Chain out; - /// If the materialized view is executed outside of a query, for example as a result of SYSTEM FLUSH LOGS or - /// SYSTEM FLUSH DISTRIBUTED ..., we can't attach to any thread group and we won't log, so there is no point on collecting metrics - std::unique_ptr view_thread_status_ptr = nullptr; + ThreadGroupStatusPtr running_group; + if (current_thread && current_thread->getThreadGroup()) + running_group = current_thread->getThreadGroup(); + else + running_group = std::make_shared(); - ThreadGroupStatusPtr running_group = current_thread && current_thread->getThreadGroup() - ? current_thread->getThreadGroup() - : MainThreadStatus::getInstance().getThreadGroup(); - if (running_group) + /// We are creating a ThreadStatus per view to store its metrics individually + /// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls + /// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work, + /// and switch back to the original thread_status. + auto * original_thread = current_thread; + SCOPE_EXIT({ current_thread = original_thread; }); + + std::unique_ptr view_thread_status_ptr = std::make_unique(); + /// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one + /// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means + /// N times more interruptions + view_thread_status_ptr->disableProfiling(); + /// view_thread_status_ptr will be moved later (on and on), so need to capture raw pointer. + view_thread_status_ptr->deleter = [thread_status = view_thread_status_ptr.get(), running_group] { - /// We are creating a ThreadStatus per view to store its metrics individually - /// Since calling ThreadStatus() changes current_thread we save it and restore it after the calls - /// Later on, before doing any task related to a view, we'll switch to its ThreadStatus, do the work, - /// and switch back to the original thread_status. - auto * original_thread = current_thread; - SCOPE_EXIT({ current_thread = original_thread; }); - - view_thread_status_ptr = std::make_unique(); - /// Disable query profiler for this ThreadStatus since the running (main query) thread should already have one - /// If we didn't disable it, then we could end up with N + 1 (N = number of dependencies) profilers which means - /// N times more interruptions - view_thread_status_ptr->disableProfiling(); - view_thread_status_ptr->attachQuery(running_group); - } + thread_status->detachQuery(); + }; + view_thread_status_ptr->attachQuery(running_group); auto runtime_stats = std::make_unique(); runtime_stats->target_name = database_table.getFullTableName();