Fix possible "Can't attach query to the thread, it is already attached"

After detachQueryIfNotDetached() had been removed it is not enough to
use attachTo() for ThreadPool (scheduleOrThrowOnError()) since the query
may be already attached, if the thread doing multiple jobs, so
CurrentThread::attachToIfDetached() should be used instead.

This should fix all the places from the failures on CI [1]:

    $ fgrep DB::CurrentThread::attachTo -A1 ~/Downloads/47.txt  | fgrep -v attachTo | cut -d' ' -f5,6 | sort | uniq -c
         92 --
          2 /fasttest-workspace/build/../../ClickHouse/contrib/libcxx/include/deque:1393: DB::ParallelParsingInputFormat::parserThreadFunction(std::__1::shared_ptr<DB::ThreadGroupStatus>,
          4 /fasttest-workspace/build/../../ClickHouse/src/Storages/MergeTree/MergeTreeData.cpp:1595: void
         87 /fasttest-workspace/build/../../ClickHouse/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp:993: void

  [1]: https://github.com/ClickHouse/ClickHouse/runs/4954466034?check_suite_focus=true

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-01-27 11:55:38 +03:00
parent b0c862c297
commit 1519985c98
3 changed files with 3 additions and 3 deletions

View File

@ -55,7 +55,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupStatusPtr
void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number) void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupStatusPtr thread_group, size_t current_ticket_number)
{ {
if (thread_group) if (thread_group)
CurrentThread::attachTo(thread_group); CurrentThread::attachToIfDetached(thread_group);
const auto parser_unit_number = current_ticket_number % processing_units.size(); const auto parser_unit_number = current_ticket_number % processing_units.size();
auto & unit = processing_units[parser_unit_number]; auto & unit = processing_units[parser_unit_number];

View File

@ -1590,7 +1590,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()] pool.scheduleOrThrowOnError([&, thread_group = CurrentThread::getGroup()]
{ {
if (thread_group) if (thread_group)
CurrentThread::attachTo(thread_group); CurrentThread::attachToIfDetached(thread_group);
LOG_DEBUG(log, "Removing part from filesystem {}", part->name); LOG_DEBUG(log, "Removing part from filesystem {}", part->name);
part->remove(); part->remove();

View File

@ -988,7 +988,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]
{ {
if (thread_group) if (thread_group)
CurrentThread::attachTo(thread_group); CurrentThread::attachToIfDetached(thread_group);
process_part(part_index); process_part(part_index);
}); });