Merge pull request #3623 from vavrusa/master

Added null guards for all CurrentThread::attachToIfDetached()
This commit is contained in:
alexey-milovidov 2018-11-20 21:16:45 +03:00 committed by GitHub
commit 7939a61797
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 14 additions and 7 deletions

View File

@ -179,8 +179,9 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
auto thread_group = CurrentThread::getGroup();
reading_pool->schedule([&child, thread_group]
{
CurrentThread::attachToIfDetached(thread_group);
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
});
@ -485,8 +486,9 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
auto thread_group = CurrentThread::getGroup();
reading_pool->schedule([&input, &read_from_input, thread_group]
{
CurrentThread::attachToIfDetached(thread_group);
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);
});

View File

@ -91,7 +91,8 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
pool.schedule([=]
{
setThreadName("PushingToViewsBlockOutputStream");
CurrentThread::attachToIfDetached(thread_group);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
process(block, view_num);
});
}

View File

@ -1352,7 +1352,8 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
{
auto converter = [&](size_t bucket, ThreadGroupStatusPtr thread_group)
{
CurrentThread::attachToIfDetached(thread_group);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
return convertOneBucketToBlock(data_variants, method, final, bucket);
};
@ -1805,7 +1806,8 @@ private:
try
{
setThreadName("MergingAggregtd");
CurrentThread::attachToIfDetached(thread_group);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
/// TODO: add no_more_keys support maybe
@ -2127,7 +2129,8 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupStatusPtr thread_group)
{
CurrentThread::attachToIfDetached(thread_group);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
for (Block & block : bucket_to_blocks[bucket])
{

View File

@ -196,7 +196,8 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block]()
{
CurrentThread::attachToIfDetached(thread_group);
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
setThreadName("DistrOutStrProc");
++job.blocks_started;