detach threads from thread group

This commit is contained in:
Alexander Tokmakov 2022-11-28 21:31:55 +01:00
parent f6c129043e
commit e45105bf44
12 changed files with 73 additions and 0 deletions

View File

@ -4,6 +4,7 @@
#include <Common/HashTable/HashSet.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
namespace DB
@ -51,6 +52,10 @@ public:
auto thread_func = [&lhs, &rhs, next_bucket_to_merge, thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("UniqExactMerger");

View File

@ -36,6 +36,7 @@
#include <Core/ProtocolDefines.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/scope_guard_safe.h>
#include <Parsers/ASTSelectQuery.h>
@ -2234,6 +2235,10 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
auto converter = [&](size_t thread_id, ThreadGroupStatusPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
@ -2951,6 +2956,10 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupStatusPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);

View File

@ -6,6 +6,7 @@
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <iostream>
#include <Common/scope_guard_safe.h>
namespace DB
{
@ -33,6 +34,10 @@ struct CompletedPipelineExecutor::Data
static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
setThreadName("QueryCompPipeEx");
try

View File

@ -306,6 +306,10 @@ void PipelineExecutor::spawnThreads()
{
/// ThreadStatus thread_status;
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
setThreadName("QueryPipelineEx");
if (thread_group)

View File

@ -69,6 +69,10 @@ const Block & PullingAsyncPipelineExecutor::getHeader() const
static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
setThreadName("QueryPullPipeEx");
try

View File

@ -6,6 +6,7 @@
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <Poco/Event.h>
#include <Common/scope_guard_safe.h>
namespace DB
{
@ -98,6 +99,10 @@ struct PushingAsyncPipelineExecutor::Data
static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
setThreadName("QueryPushPipeEx");
try

View File

@ -1,6 +1,7 @@
#include <Processors/Formats/Impl/ParallelFormattingOutputFormat.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
namespace DB
{
@ -97,6 +98,10 @@ namespace DB
void ParallelFormattingOutputFormat::collectorThreadFunction(const ThreadGroupStatusPtr & thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
setThreadName("Collector");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
@ -154,6 +159,10 @@ namespace DB
void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, size_t first_row_num, const ThreadGroupStatusPtr & thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
setThreadName("Formatter");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);

View File

@ -3,12 +3,17 @@
#include <IO/WithFileName.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
namespace DB
{
void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr thread_group)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachTo(thread_group);
@ -55,6 +60,10 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr
void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);

View File

@ -5,6 +5,7 @@
#include <Processors/IAccumulatingTransform.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/scope_guard_safe.h>
namespace DB
{
@ -97,6 +98,10 @@ struct ManyAggregatedData
pool->trySchedule(
[variant = std::move(variant), thread_group = CurrentThread::getGroup()]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);

View File

@ -33,6 +33,7 @@
#include <Common/logger_useful.h>
#include <base/range.h>
#include <base/scope_guard.h>
#include <Common/scope_guard_safe.h>
#include <filesystem>
@ -290,6 +291,10 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
OpenTelemetry::SpanHolder span(__PRETTY_FUNCTION__);
if (thread_group)

View File

@ -73,6 +73,7 @@
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Formats/IInputFormat.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <Common/scope_guard_safe.h>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/algorithm_ext/erase.hpp>
@ -1137,6 +1138,10 @@ void MergeTreeData::loadDataPartsFromDisk(
{
pool.scheduleOrThrowOnError([&, thread, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
@ -1964,6 +1969,10 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
{
pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);

View File

@ -1114,6 +1114,10 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);