mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge pull request #62625 from kitaisreal/hashed-dictionary-parallel-loader-exception-safe-constructor
HashedDictionaryParallelLoader exception safe constructor
This commit is contained in:
commit
5576cb7964
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Dictionaries/IDictionary.h>
|
||||
#include <Dictionaries/DictionaryHelpers.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/iota.h>
|
||||
#include <Common/scope_guard_safe.h>
|
||||
@ -62,28 +63,40 @@ public:
|
||||
for (size_t shard = 0; shard < shards; ++shard)
|
||||
{
|
||||
shards_queues[shard].emplace(backlog);
|
||||
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
|
||||
|
||||
try
|
||||
{
|
||||
WorkerStatistic statistic;
|
||||
SCOPE_EXIT_SAFE(
|
||||
LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms",
|
||||
dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms);
|
||||
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
|
||||
{
|
||||
WorkerStatistic statistic;
|
||||
SCOPE_EXIT_SAFE(
|
||||
LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms",
|
||||
dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms);
|
||||
|
||||
if (thread_group)
|
||||
CurrentThread::detachFromGroupIfNotDetached();
|
||||
);
|
||||
|
||||
/// Do not account memory that was occupied by the dictionaries for the query/user context.
|
||||
MemoryTrackerBlockerInThread memory_blocker;
|
||||
|
||||
if (thread_group)
|
||||
CurrentThread::detachFromGroupIfNotDetached();
|
||||
);
|
||||
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||
setThreadName("HashedDictLoad");
|
||||
|
||||
/// Do not account memory that was occupied by the dictionaries for the query/user context.
|
||||
MemoryTrackerBlockerInThread memory_blocker;
|
||||
LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard);
|
||||
|
||||
if (thread_group)
|
||||
CurrentThread::attachToGroupIfDetached(thread_group);
|
||||
setThreadName("HashedDictLoad");
|
||||
threadWorker(shard, statistic);
|
||||
});
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
for (size_t shard_to_finish = 0; shard_to_finish < shard; ++shard_to_finish)
|
||||
shards_queues[shard_to_finish]->clearAndFinish();
|
||||
|
||||
LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard);
|
||||
|
||||
threadWorker(shard, statistic);
|
||||
});
|
||||
pool.wait();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user