HashedDictionaryParallelLoader exception safe constructor

This commit is contained in:
Maksim Kita 2024-04-14 11:21:57 +03:00
parent b6cfba33f1
commit 7ebaa4d1ca

View File

@ -1,6 +1,7 @@
#pragma once
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryHelpers.h>
#include <Common/CurrentThread.h>
#include <Common/iota.h>
#include <Common/scope_guard_safe.h>
@ -62,28 +63,40 @@ public:
for (size_t shard = 0; shard < shards; ++shard)
{
shards_queues[shard].emplace(backlog);
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
try
{
WorkerStatistic statistic;
SCOPE_EXIT_SAFE(
LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms",
dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms);
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
{
WorkerStatistic statistic;
SCOPE_EXIT_SAFE(
LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms",
dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms);
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
/// Do not account memory that was occupied by the dictionaries for the query/user context.
MemoryTrackerBlockerInThread memory_blocker;
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictLoad");
/// Do not account memory that was occupied by the dictionaries for the query/user context.
MemoryTrackerBlockerInThread memory_blocker;
LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard);
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictLoad");
threadWorker(shard, statistic);
});
}
catch (...)
{
for (size_t shard_to_finish = 0; shard_to_finish < shard; ++shard_to_finish)
shards_queues[shard_to_finish]->clearAndFinish();
LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard);
threadWorker(shard, statistic);
});
pool.wait();
throw;
}
}
}