This commit is contained in:
Nikita Taranov 2024-03-20 16:36:21 +00:00
parent d4895c2e52
commit a3718451b5

View File

@ -1,12 +1,17 @@
#include <Databases/DatabaseOnDisk.h> #include <Databases/DatabaseOnDisk.h>
#include <filesystem>
#include <iterator>
#include <span>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOrdinary.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Interpreters/ApplyWithSubqueryVisitor.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h> #include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ApplyWithSubqueryVisitor.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ParserCreateQuery.h> #include <Parsers/ParserCreateQuery.h>
@ -15,14 +20,11 @@
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <Databases/DatabaseOrdinary.h> #include <Common/escapeForFileName.h>
#include <Databases/DatabaseAtomic.h> #include <Common/filesystemHelpers.h>
#include <filesystem> #include <Common/logger_useful.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -612,7 +614,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
}; };
/// Metadata files to load: name and flag for .tmp_drop files /// Metadata files to load: name and flag for .tmp_drop files
std::set<std::pair<String, bool>> metadata_files; std::vector<std::pair<String, bool>> metadata_files;
fs::directory_iterator dir_end; fs::directory_iterator dir_end;
for (fs::directory_iterator dir_it(getMetadataPath()); dir_it != dir_end; ++dir_it) for (fs::directory_iterator dir_it(getMetadataPath()); dir_it != dir_end; ++dir_it)
@ -633,7 +635,7 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
if (endsWith(file_name, ".sql.tmp_drop")) if (endsWith(file_name, ".sql.tmp_drop"))
{ {
/// There are files that we tried to delete previously /// There are files that we tried to delete previously
metadata_files.emplace(file_name, false); metadata_files.emplace_back(file_name, false);
} }
else if (endsWith(file_name, ".sql.tmp")) else if (endsWith(file_name, ".sql.tmp"))
{ {
@ -644,23 +646,30 @@ void DatabaseOnDisk::iterateMetadataFiles(ContextPtr local_context, const Iterat
else if (endsWith(file_name, ".sql")) else if (endsWith(file_name, ".sql"))
{ {
/// The required files have names like `table_name.sql` /// The required files have names like `table_name.sql`
metadata_files.emplace(file_name, true); metadata_files.emplace_back(file_name, true);
} }
else else
throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Incorrect file extension: {} in metadata directory {}", file_name, getMetadataPath()); throw Exception(ErrorCodes::INCORRECT_FILE_NAME, "Incorrect file extension: {} in metadata directory {}", file_name, getMetadataPath());
} }
std::sort(metadata_files.begin(), metadata_files.end());
metadata_files.erase(std::unique(metadata_files.begin(), metadata_files.end()), metadata_files.end());
/// Read and parse metadata in parallel /// Read and parse metadata in parallel
ThreadPool pool(CurrentMetrics::DatabaseOnDiskThreads, CurrentMetrics::DatabaseOnDiskThreadsActive, CurrentMetrics::DatabaseOnDiskThreadsScheduled); ThreadPool pool(CurrentMetrics::DatabaseOnDiskThreads, CurrentMetrics::DatabaseOnDiskThreadsActive, CurrentMetrics::DatabaseOnDiskThreadsScheduled);
for (const auto & file : metadata_files) const auto batch_size = metadata_files.size() / pool.getMaxThreads() + 1;
for (auto it = metadata_files.begin(); it < metadata_files.end(); std::advance(it, batch_size))
{ {
pool.scheduleOrThrowOnError([&]() std::span batch{it, std::min(std::next(it, batch_size), metadata_files.end())};
{ pool.scheduleOrThrowOnError(
if (file.second) [batch, &process_metadata_file, &process_tmp_drop_metadata_file]() mutable
process_metadata_file(file.first); {
else for (const auto & file : batch)
process_tmp_drop_metadata_file(file.first); if (file.second)
}); process_metadata_file(file.first);
else
process_tmp_drop_metadata_file(file.first);
});
} }
pool.wait(); pool.wait();
} }