Merge pull request #59277 from ClickHouse/create-tables-in-threadpool

Concurrent tables creation in `recoverLostReplica`
This commit is contained in:
Konstantin Bogdanov 2024-02-05 16:04:46 +03:00 committed by GitHub
commit f663f13a35
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 86 additions and 19 deletions

View File

@ -826,6 +826,11 @@ try
0, // We don't need any threads one all the parts will be deleted
server_settings.max_parts_cleaning_thread_pool_size);
getDatabaseReplicatedCreateTablesThreadPool().initialize(
server_settings.max_database_replicated_create_table_thread_pool_size,
0, // We don't need any threads once all the tables will be created
server_settings.max_database_replicated_create_table_thread_pool_size);
/// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs"))
{

View File

@ -178,6 +178,9 @@
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \
M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \
M(MergeTreePartsCleanerThreadsScheduled, "Number of queued or active jobs in the MergeTree parts cleaner thread pool.") \
M(DatabaseReplicatedCreateTablesThreads, "Number of threads in the threadpool for table creation in DatabaseReplicated.") \
M(DatabaseReplicatedCreateTablesThreadsActive, "Number of active threads in the threadpool for table creation in DatabaseReplicated.") \
M(DatabaseReplicatedCreateTablesThreadsScheduled, "Number of queued or active jobs in the threadpool for table creation in DatabaseReplicated.") \
M(IDiskCopierThreads, "Number of threads for copying data between disks of different types.") \
M(IDiskCopierThreadsActive, "Number of threads for copying data between disks of different types running a task.") \
M(IDiskCopierThreadsScheduled, "Number of queued or active jobs for copying data between disks of different types.") \

View File

@ -114,6 +114,7 @@ namespace DB
M(Bool, validate_tcp_client_information, false, "Validate client_information in the query packet over the native TCP protocol.", 0) \
M(Bool, storage_metadata_write_full_object_key, false, "Write disk metadata files with VERSION_FULL_OBJECT_KEY format", 0) \
M(UInt64, max_materialized_views_count_for_table, 0, "A limit on the number of materialized views attached to a table.", 0) \
M(UInt64, max_database_replicated_create_table_thread_pool_size, 0, "The number of threads to create tables during replica recovery in DatabaseReplicated. Value less than two means tables will be created sequentially.", 0) \
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

View File

@ -29,6 +29,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/SharedThreadPools.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
@ -1091,31 +1092,57 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
}
tables_dependencies.checkNoCyclicDependencies();
auto tables_to_create = tables_dependencies.getTablesSortedByDependency();
for (const auto & table_id : tables_to_create)
auto allow_concurrent_table_creation = getContext()->getServerSettings().max_database_replicated_create_table_thread_pool_size > 1;
auto tables_to_create_by_level = tables_dependencies.getTablesSortedByDependencyWithLevels();
auto create_tables_runner = threadPoolCallbackRunner<void>(getDatabaseReplicatedCreateTablesThreadPool().get(), "CreateTables");
std::vector<std::future<void>> create_table_futures;
for (const auto & [_, tables_to_create] : tables_to_create_by_level)
{
auto table_name = table_id.getTableName();
auto metadata_it = table_name_to_metadata.find(table_name);
if (metadata_it == table_name_to_metadata.end())
for (const auto & table_id : tables_to_create)
{
/// getTablesSortedByDependency() may return some not existing tables or tables from other databases
LOG_WARNING(log, "Got table name {} when resolving table dependencies, "
"but database {} does not have metadata for that table. Ignoring it", table_id.getNameForLogs(), getDatabaseName());
continue;
auto task = [&]()
{
auto table_name = table_id.getTableName();
auto metadata_it = table_name_to_metadata.find(table_name);
if (metadata_it == table_name_to_metadata.end())
{
/// getTablesSortedByDependency() may return some not existing tables or tables from other databases
LOG_WARNING(log, "Got table name {} when resolving table dependencies, "
"but database {} does not have metadata for that table. Ignoring it", table_id.getNameForLogs(), getDatabaseName());
return;
}
const auto & create_query_string = metadata_it->second;
if (isTableExist(table_name, getContext()))
{
assert(create_query_string == readMetadataFile(table_name) || getTableUUIDIfReplicated(create_query_string, getContext()) != UUIDHelpers::Nil);
return;
}
auto query_ast = parseQueryFromMetadataInZooKeeper(table_name, create_query_string);
LOG_INFO(log, "Executing {}", serializeAST(*query_ast));
auto create_query_context = make_query_context();
InterpreterCreateQuery(query_ast, create_query_context).execute();
};
if (allow_concurrent_table_creation)
create_table_futures.push_back(create_tables_runner(task, Priority{0}));
else
task();
}
const auto & create_query_string = metadata_it->second;
if (isTableExist(table_name, getContext()))
{
assert(create_query_string == readMetadataFile(table_name) || getTableUUIDIfReplicated(create_query_string, getContext()) != UUIDHelpers::Nil);
continue;
}
/// First wait for all tasks to finish.
for (auto & future : create_table_futures)
future.wait();
auto query_ast = parseQueryFromMetadataInZooKeeper(table_name, create_query_string);
LOG_INFO(log, "Executing {}", serializeAST(*query_ast));
auto create_query_context = make_query_context();
InterpreterCreateQuery(query_ast, create_query_context).execute();
/// Now rethrow the first exception if any.
for (auto & future : create_table_futures)
future.get();
create_table_futures.clear();
}
LOG_INFO(log, "All tables are created successfully");

View File

@ -699,6 +699,17 @@ std::vector<StorageID> TablesDependencyGraph::getTablesSortedByDependency() cons
}
std::map<size_t, std::vector<StorageID>> TablesDependencyGraph::getTablesSortedByDependencyWithLevels() const
{
std::map<size_t, std::vector<StorageID>> tables_by_level;
for (const auto * node : getNodesSortedByLevel())
{
tables_by_level[node->level].emplace_back(node->storage_id);
}
return tables_by_level;
}
void TablesDependencyGraph::log() const
{
if (nodes.empty())

View File

@ -107,6 +107,12 @@ public:
/// tables which depend on the tables which depend on the tables without dependencies, and so on.
std::vector<StorageID> getTablesSortedByDependency() const;
/// Returns a map of lists of tables by the number of dependencies they have:
/// tables without dependencies first with level 0, then
/// tables with depend on the tables without dependencies with level 1, then
/// tables which depend on the tables which depend on the tables without dependencies with level 2, and so on.
std::map<size_t, std::vector<StorageID>> getTablesSortedByDependencyWithLevels() const;
/// Outputs information about this graph as a bunch of logging messages.
void log() const;

View File

@ -20,6 +20,9 @@ namespace CurrentMetrics
extern const Metric MergeTreeOutdatedPartsLoaderThreads;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsActive;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsScheduled;
extern const Metric DatabaseReplicatedCreateTablesThreads;
extern const Metric DatabaseReplicatedCreateTablesThreadsActive;
extern const Metric DatabaseReplicatedCreateTablesThreadsScheduled;
}
namespace DB
@ -148,4 +151,10 @@ StaticThreadPool & getOutdatedPartsLoadingThreadPool()
return instance;
}
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool()
{
static StaticThreadPool instance("CreateTablesThreadPool", CurrentMetrics::DatabaseReplicatedCreateTablesThreads, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsActive, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsScheduled);
return instance;
}
}

View File

@ -64,4 +64,7 @@ StaticThreadPool & getPartsCleaningThreadPool();
/// the number of threads by calling enableTurboMode() :-)
StaticThreadPool & getOutdatedPartsLoadingThreadPool();
/// ThreadPool used for creating tables in DatabaseReplicated.
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool();
}

View File

@ -97,4 +97,5 @@
</remote_servers>
<_functional_tests_helper_database_replicated_replace_args_macros>1</_functional_tests_helper_database_replicated_replace_args_macros>
<max_database_replicated_create_table_thread_pool_size>50</max_database_replicated_create_table_thread_pool_size>
</clickhouse>

View File

@ -4,4 +4,5 @@
<merge_tree>
<initialization_retry_period>10</initialization_retry_period>
</merge_tree>
<max_database_replicated_create_table_thread_pool_size>50</max_database_replicated_create_table_thread_pool_size>
</clickhouse>