Merge pull request #25663 from azat/dist-startup

Improve startup time of Distributed engine.
This commit is contained in:
alexey-milovidov 2021-06-27 18:22:45 +03:00 committed by GitHub
commit 1b644b9a31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 15 deletions

View File

@ -752,7 +752,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (const auto & dir_name : dir_names)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false);
directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds());
}
}

View File

@ -800,12 +800,33 @@ void StorageDistributed::startup()
if (!storage_policy)
return;
for (const DiskPtr & disk : data_volume->getDisks())
createDirectoryMonitors(disk);
const auto & disks = data_volume->getDisks();
for (const String & path : getDataPaths())
/// Make initialization for large number of disks parallel.
ThreadPool pool(disks.size());
for (const DiskPtr & disk : disks)
{
pool.scheduleOrThrowOnError([&]()
{
createDirectoryMonitors(disk);
});
}
pool.wait();
const auto & paths = getDataPaths();
std::vector<UInt64> last_increment(paths.size());
for (size_t i = 0; i < paths.size(); ++i)
{
pool.scheduleOrThrowOnError([&, i]()
{
last_increment[i] = getMaximumFileNumber(paths[i]);
});
}
pool.wait();
for (const auto inc : last_increment)
{
UInt64 inc = getMaximumFileNumber(path);
if (inc > file_names_increment.value)
file_names_increment.value.store(inc);
}
@ -907,30 +928,50 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
}
else
{
requireDirectoryMonitor(disk, dir_path.filename().string());
requireDirectoryMonitor(disk, dir_path.filename().string(), /* startup= */ true);
}
}
}
}
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name)
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup)
{
const std::string & disk_path = disk->getPath();
const std::string key(disk_path + name);
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_monitor)
auto create_node_data = [&]()
{
node_data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
node_data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
ClusterNodeData data;
data.connection_pool = StorageDistributedDirectoryMonitor::createPool(name, *this);
data.directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(
*this, disk, relative_data_path + name,
node_data.connection_pool,
data.connection_pool,
monitors_blocker,
getContext()->getDistributedSchedulePool());
return data;
};
/// In case of startup the lock can be acquired later.
if (startup)
{
auto tmp_node_data = create_node_data();
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
assert(!node_data.directory_monitor);
node_data = std::move(tmp_node_data);
return *node_data.directory_monitor;
}
else
{
std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_monitor)
{
node_data = create_node_data();
}
return *node_data.directory_monitor;
}
return *node_data.directory_monitor;
}
std::vector<StorageDistributedDirectoryMonitor::Status> StorageDistributed::getDirectoryMonitorsStatuses() const

View File

@ -160,7 +160,7 @@ private:
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors(const DiskPtr & disk);
/// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name);
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup);
/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)