Attach to query for threads in Distributed engine background ops

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2024-02-22 15:41:19 +01:00
parent eb75926e50
commit 929dc6fa12

View File

@ -1287,10 +1287,19 @@ void StorageDistributed::initializeFromDisk()
/// Make initialization for large number of disks parallel.
ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, disks.size());
ThreadGroupPtr thread_group = CurrentThread::getGroup();
for (const DiskPtr & disk : disks)
{
pool.scheduleOrThrowOnError([&]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroup(thread_group);
setThreadName("DistInit");
initializeDirectoryQueuesForDisk(disk);
});
}
@ -1302,6 +1311,14 @@ void StorageDistributed::initializeFromDisk()
{
pool.scheduleOrThrowOnError([&, i]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroup(thread_group);
setThreadName("DistInit");
last_increment[i] = getMaximumFileNumber(paths[i]);
});
}
@ -1739,11 +1756,20 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
LOG_INFO(log, "Flushing pending INSERT blocks");
Stopwatch watch;
ThreadGroupPtr thread_group = CurrentThread::getGroup();
ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, directory_queues.size());
for (const auto & node : directory_queues)
{
pool.scheduleOrThrowOnError([node_to_flush = node]()
pool.scheduleOrThrowOnError([node_to_flush = node, &thread_group]()
{
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
);
if (thread_group)
CurrentThread::attachToGroup(thread_group);
setThreadName("DistFlush");
node_to_flush->flushAllData();
});
}