From 7ebaa4d1ca1a7a1c51c5b7a0cb54a0175ba09cf2 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Sun, 14 Apr 2024 11:21:57 +0300 Subject: [PATCH] HashedDictionaryParallelLoader exception safe constructor --- .../HashedDictionaryParallelLoader.h | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/Dictionaries/HashedDictionaryParallelLoader.h b/src/Dictionaries/HashedDictionaryParallelLoader.h index d88ee88f9a9..ef5e6976c17 100644 --- a/src/Dictionaries/HashedDictionaryParallelLoader.h +++ b/src/Dictionaries/HashedDictionaryParallelLoader.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -62,28 +63,40 @@ public: for (size_t shard = 0; shard < shards; ++shard) { shards_queues[shard].emplace(backlog); - pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()] + + try { - WorkerStatistic statistic; - SCOPE_EXIT_SAFE( - LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms", - dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms); + pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()] + { + WorkerStatistic statistic; + SCOPE_EXIT_SAFE( + LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms", + dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms); + + if (thread_group) + CurrentThread::detachFromGroupIfNotDetached(); + ); + + /// Do not account memory that was occupied by the dictionaries for the query/user context. + MemoryTrackerBlockerInThread memory_blocker; if (thread_group) - CurrentThread::detachFromGroupIfNotDetached(); - ); + CurrentThread::attachToGroupIfDetached(thread_group); + setThreadName("HashedDictLoad"); - /// Do not account memory that was occupied by the dictionaries for the query/user context. - MemoryTrackerBlockerInThread memory_blocker; + LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard); - if (thread_group) - CurrentThread::attachToGroupIfDetached(thread_group); - setThreadName("HashedDictLoad"); + threadWorker(shard, statistic); + }); + } + catch (...) + { + for (size_t shard_to_finish = 0; shard_to_finish < shard; ++shard_to_finish) + shards_queues[shard_to_finish]->clearAndFinish(); - LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard); - - threadWorker(shard, statistic); - }); + pool.wait(); + throw; + } } }