Merge pull request #34001 from azat/memory-tracker-fix

Fix memory accounting for queries that uses < max_untracker_memory
This commit is contained in:
alexey-milovidov 2022-01-29 00:59:53 +03:00 committed by GitHub
commit 6535b75322
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 44 additions and 57 deletions

View File

@ -5,7 +5,6 @@
#include <Common/CurrentThread.h>
#include <base/logger_useful.h>
#include <chrono>
#include <base/scope_guard.h>
namespace DB
@ -246,7 +245,6 @@ void BackgroundSchedulePool::threadFunction()
setThreadName(thread_name.c_str());
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
while (!shutdown)
{
@ -273,7 +271,6 @@ void BackgroundSchedulePool::delayExecutionThreadFunction()
setThreadName((thread_name + "/D").c_str());
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
while (!shutdown)
{

View File

@ -966,14 +966,14 @@ private:
/// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupStatusPtr thread_group = {})
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
LOG_TRACE(log, "Start loading object '{}'", name);
try
{

View File

@ -4,7 +4,6 @@
#include <Poco/Event.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <base/scope_guard_safe.h>
#include <iostream>
namespace DB
@ -40,11 +39,6 @@ static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupSt
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
data.executor->execute(num_threads);
}
catch (...)

View File

@ -301,11 +301,6 @@ void PipelineExecutor::executeImpl(size_t num_threads)
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
try
{
executeSingleThread(thread_num);

View File

@ -4,9 +4,7 @@
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Sources/NullSource.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Common/setThreadName.h>
#include <base/scope_guard_safe.h>
namespace DB
{
@ -77,11 +75,6 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
data.executor->execute(num_threads);
}
catch (...)

View File

@ -2,11 +2,8 @@
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/ISource.h>
#include <QueryPipeline/QueryPipeline.h>
#include <iostream>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <base/scope_guard_safe.h>
#include <Poco/Event.h>
namespace DB
@ -107,11 +104,6 @@ static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGrou
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
data.executor->execute(num_threads);
}
catch (...)

View File

@ -2,17 +2,12 @@
#include <IO/ReadHelpers.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <base/scope_guard_safe.h>
namespace DB
{
void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
@ -59,12 +54,8 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr
void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
CurrentThread::attachToIfDetached(thread_group);
const auto parser_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[parser_unit_number];

View File

@ -67,7 +67,6 @@
#include <boost/algorithm/string/replace.hpp>
#include <base/insertAtEnd.h>
#include <base/scope_guard_safe.h>
#include <algorithm>
#include <iomanip>
@ -1590,12 +1589,8 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
{
pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
CurrentThread::attachToIfDetached(thread_group);
LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove();

View File

@ -1,5 +1,4 @@
#include <boost/rational.hpp> /// For calculations related to sampling coefficients.
#include <base/scope_guard_safe.h>
#include <optional>
#include <unordered_set>
@ -988,9 +987,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachQueryIfNotDetached(););
if (thread_group)
CurrentThread::attachTo(thread_group);
CurrentThread::attachToIfDetached(thread_group);
process_part(part_index);
});

View File

@ -16,22 +16,54 @@ $CLICKHOUSE_CLIENT -nm -q "create database ordinary_$CLICKHOUSE_DATABASE engine=
$CLICKHOUSE_CLIENT -nm -q """
use ordinary_$CLICKHOUSE_DATABASE;
drop table if exists data_01810;
create table data_01810 (key Int) Engine=MergeTree() order by key partition by key settings max_part_removal_threads=10, concurrent_part_removal_threshold=49;
insert into data_01810 select * from numbers(50);
create table data_01810 (key Int)
Engine=MergeTree()
order by key
partition by key%100
settings max_part_removal_threads=10, concurrent_part_removal_threshold=99, min_bytes_for_wide_part=0;
insert into data_01810 select * from numbers(100);
drop table data_01810 settings log_queries=1;
system flush logs;
select throwIf(length(thread_ids)<50) from system.query_log where event_date >= yesterday() and current_database = currentDatabase() and query = 'drop table data_01810 settings log_queries=1;' and type = 'QueryFinish' format Null;
-- sometimes the same thread can be used to remove part, due to ThreadPool,
-- hence we cannot compare strictly.
select throwIf(not(length(thread_ids) between 6 and 11))
from system.query_log
where
event_date >= yesterday() and
current_database = currentDatabase() and
query = 'drop table data_01810 settings log_queries=1;' and
type = 'QueryFinish'
format Null;
"""
# ReplicatedMergeTree
$CLICKHOUSE_CLIENT -nm -q """
use ordinary_$CLICKHOUSE_DATABASE;
drop table if exists rep_data_01810;
create table rep_data_01810 (key Int) Engine=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rep_data_01810', '1') order by key partition by key settings max_part_removal_threads=10, concurrent_part_removal_threshold=49;
insert into rep_data_01810 select * from numbers(50);
create table rep_data_01810 (key Int)
Engine=ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rep_data_01810', '1')
order by key
partition by key%100
settings max_part_removal_threads=10, concurrent_part_removal_threshold=99, min_bytes_for_wide_part=0;
insert into rep_data_01810 select * from numbers(100);
drop table rep_data_01810 settings log_queries=1;
system flush logs;
select throwIf(length(thread_ids)<50) from system.query_log where event_date >= yesterday() and current_database = currentDatabase() and query = 'drop table rep_data_01810 settings log_queries=1;' and type = 'QueryFinish' format Null;
-- sometimes the same thread can be used to remove part, due to ThreadPool,
-- hence we cannot compare strictly.
select throwIf(not(length(thread_ids) between 6 and 11))
from system.query_log
where
event_date >= yesterday() and
current_database = currentDatabase() and
query = 'drop table rep_data_01810 settings log_queries=1;' and
type = 'QueryFinish'
format Null;
"""
$CLICKHOUSE_CLIENT -nm -q "drop database ordinary_$CLICKHOUSE_DATABASE"