Merge pull request #60225 from azat/dist/parallel-flush

Parallel flush of pending INSERT blocks of Distributed engine
This commit is contained in:
Antonio Andelic 2024-03-21 12:25:59 +01:00 committed by GitHub
commit c4856e326b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -23,6 +23,7 @@
#include <Columns/ColumnConst.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Common/Macros.h>
#include <Common/ProfileEvents.h>
#include <Common/escapeForFileName.h>
@ -282,6 +283,17 @@ size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & clus
return (num_remote_shards + num_local_shards) * settings.max_parallel_replicas;
}
template <class F>
void waitFutures(F & futures)
{
for (auto & future : futures)
future.wait();
/// Make sure there is no exception.
for (auto & future : futures)
future.get();
futures.clear();
}
}
/// For destruction of std::unique_ptr of type that is incomplete in class definition.
@ -1286,25 +1298,30 @@ void StorageDistributed::initializeFromDisk()
/// Make initialization for large number of disks parallel.
ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, disks.size());
std::vector<std::future<void>> futures;
for (const DiskPtr & disk : disks)
{
pool.scheduleOrThrowOnError([&]()
auto future = scheduleFromThreadPool<void>([this, disk_to_init = disk]
{
initializeDirectoryQueuesForDisk(disk);
});
initializeDirectoryQueuesForDisk(disk_to_init);
}, pool, "DistInit");
futures.push_back(std::move(future));
}
waitFutures(futures);
pool.wait();
const auto & paths = getDataPaths();
std::vector<UInt64> last_increment(paths.size());
for (size_t i = 0; i < paths.size(); ++i)
{
pool.scheduleOrThrowOnError([&, i]()
auto future = scheduleFromThreadPool<void>([&paths, &last_increment, i]
{
last_increment[i] = getMaximumFileNumber(paths[i]);
});
}, pool, "DistInit");
futures.push_back(std::move(future));
}
waitFutures(futures);
pool.wait();
for (const auto inc : last_increment)
@ -1734,16 +1751,33 @@ void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
directory_queues.push_back(node.second.directory_queue);
}
bool need_flush = getDistributedSettingsRef().flush_on_detach;
if (!need_flush)
if (getDistributedSettingsRef().flush_on_detach)
{
LOG_INFO(log, "Flushing pending INSERT blocks");
Stopwatch watch;
ThreadPool pool(CurrentMetrics::StorageDistributedThreads, CurrentMetrics::StorageDistributedThreadsActive, CurrentMetrics::StorageDistributedThreadsScheduled, directory_queues.size());
std::vector<std::future<void>> futures;
for (const auto & node : directory_queues)
{
auto future = scheduleFromThreadPool<void>([node_to_flush = node]
{
node_to_flush->flushAllData();
}, pool, "DistFlush");
futures.push_back(std::move(future));
}
waitFutures(futures);
pool.wait();
LOG_INFO(log, "Pending INSERT blocks flushed, took {} ms.", watch.elapsedMilliseconds());
}
else
{
LOG_INFO(log, "Skip flushing data (due to flush_on_detach=0)");
/// TODO: Maybe it should be executed in parallel
for (auto & node : directory_queues)
{
if (need_flush)
node->flushAllData();
else
for (auto & node : directory_queues)
node->shutdownWithoutFlush();
}
}