From a616ae88618eb050f2e8df3435fceddfd773a250 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 24 Jun 2021 10:07:31 +0300 Subject: [PATCH 1/2] Improve startup time of Distributed engine. - create directory monitors in parallel (this also includes rmdir in case of directory is empty, since even if the directory is empty it may take some time to remove it, due to waiting for journal or if the directory is large, i.e. it had lots of files before, since remember ext4 does not truncate the directory size on each unlink [1]) - initialize increment in parallel too (since it does readdir()) [1]: https://lore.kernel.org/linux-ext4/930A5754-5CE6-4567-8CF0-62447C97825C@dilger.ca/ --- .../DistributedBlockOutputStream.cpp | 2 +- src/Storages/StorageDistributed.cpp | 65 +++++++++++++++---- src/Storages/StorageDistributed.h | 2 +- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 9b13198812b..9a50cec5986 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -752,7 +752,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms; for (const auto & dir_name : dir_names) { - auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name); + auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false); directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds()); } } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 8507198a7f6..d43fd1532a1 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -800,12 +800,31 @@ void StorageDistributed::startup() if (!storage_policy) return; - for (const DiskPtr & disk : data_volume->getDisks()) - createDirectoryMonitors(disk); + const auto & disks = data_volume->getDisks(); + ThreadPool pool(disks.size()); - for (const String & path : getDataPaths()) + for (const DiskPtr & disk : disks) + { + pool.scheduleOrThrowOnError([&]() + { + createDirectoryMonitors(disk); + }); + } + pool.wait(); + + const auto & paths = getDataPaths(); + std::vector last_increment(paths.size()); + for (size_t i = 0; i < paths.size(); ++i) + { + pool.scheduleOrThrowOnError([&, i]() + { + last_increment[i] = getMaximumFileNumber(paths[i]); + }); + } + pool.wait(); + + for (const auto inc : last_increment) { - UInt64 inc = getMaximumFileNumber(path); if (inc > file_names_increment.value) file_names_increment.value.store(inc); } @@ -907,30 +926,50 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk) } else { - requireDirectoryMonitor(disk, dir_path.filename().string()); + requireDirectoryMonitor(disk, dir_path.filename().string(), /* startup= */ true); } } } } -StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name) +StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup) { const std::string & disk_path = disk->getPath(); const std::string key(disk_path + name); - std::lock_guard lock(cluster_nodes_mutex); - auto & node_data = cluster_nodes_data[key]; - if (!node_data.directory_monitor) + auto create_node_data = [&]() { - node_data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this); - node_data.directory_monitor = std::make_unique( + ClusterNodeData data; + data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this); + data.directory_monitor = std::make_unique( *this, disk, relative_data_path + name, - node_data.connection_pool, + data.connection_pool, monitors_blocker, getContext()->getDistributedSchedulePool()); + return data; + }; + + /// In case of startup the lock can be acquired later. + if (startup) + { + auto tmp_node_data = create_node_data(); + std::lock_guard lock(cluster_nodes_mutex); + auto & node_data = cluster_nodes_data[key]; + assert(!node_data.directory_monitor); + node_data = std::move(tmp_node_data); + return *node_data.directory_monitor; + } + else + { + std::lock_guard lock(cluster_nodes_mutex); + auto & node_data = cluster_nodes_data[key]; + if (!node_data.directory_monitor) + { + node_data = create_node_data(); + } + return *node_data.directory_monitor; } - return *node_data.directory_monitor; } std::vector StorageDistributed::getDirectoryMonitorsStatuses() const diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index c734b0f777e..c63abbc6aa4 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -160,7 +160,7 @@ private: /// create directory monitors for each existing subdirectory void createDirectoryMonitors(const DiskPtr & disk); /// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name - StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name); + StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup); /// Return list of metrics for all created monitors /// (note that monitors are created lazily, i.e. until at least one INSERT executed) From f6e67d3dc1ed06df521dbd5ff1838d5ee836dbe0 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Sun, 27 Jun 2021 18:22:34 +0300 Subject: [PATCH 2/2] Update StorageDistributed.cpp --- src/Storages/StorageDistributed.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index d43fd1532a1..15b1421c635 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -801,6 +801,8 @@ void StorageDistributed::startup() return; const auto & disks = data_volume->getDisks(); + + /// Make initialization for large number of disks parallel. ThreadPool pool(disks.size()); for (const DiskPtr & disk : disks)