diff --git a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp index 608bc06b713..4cdc2655933 100644 --- a/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.cpp @@ -179,8 +179,9 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start() auto thread_group = CurrentThread::getGroup(); reading_pool->schedule([&child, thread_group] { - CurrentThread::attachToIfDetached(thread_group); setThreadName("MergeAggReadThr"); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; child->readPrefix(); }); @@ -485,8 +486,9 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate auto thread_group = CurrentThread::getGroup(); reading_pool->schedule([&input, &read_from_input, thread_group] { - CurrentThread::attachToIfDetached(thread_group); setThreadName("MergeAggReadThr"); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; read_from_input(input); }); diff --git a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp index 096a7b1fe2a..f54c62b3579 100644 --- a/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -91,7 +91,8 @@ void PushingToViewsBlockOutputStream::write(const Block & block) pool.schedule([=] { setThreadName("PushingToViewsBlockOutputStream"); - CurrentThread::attachToIfDetached(thread_group); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); process(block, view_num); }); } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 2019ad9f04e..bfa8d51c9c3 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -1352,7 +1352,8 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl( { auto converter = [&](size_t bucket, ThreadGroupStatusPtr thread_group) { - CurrentThread::attachToIfDetached(thread_group); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); return convertOneBucketToBlock(data_variants, method, final, bucket); }; @@ -1805,7 +1806,8 @@ private: try { setThreadName("MergingAggregtd"); - CurrentThread::attachToIfDetached(thread_group); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; /// TODO: add no_more_keys support maybe @@ -2127,7 +2129,8 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupStatusPtr thread_group) { - CurrentThread::attachToIfDetached(thread_group); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); for (Block & block : bucket_to_blocks[bucket]) { diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 39e78ffd363..2464d15e624 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -196,7 +196,8 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp auto thread_group = CurrentThread::getGroup(); return [this, thread_group, &job, ¤t_block]() { - CurrentThread::attachToIfDetached(thread_group); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); setThreadName("DistrOutStrProc"); ++job.blocks_started;