Make system.replicas parallel

This commit is contained in:
Nikolay Degterinsky 2022-12-07 11:04:15 +00:00
parent e2a9b226e0
commit 540f890291
2 changed files with 27 additions and 7 deletions

View File

@ -155,6 +155,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
\
M(UInt64, system_replicas_fetch_threads, 16, "The maximum number of threads to fetch data for system.replicas table.", 0) \
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
\
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \

View File

@ -151,14 +151,32 @@ Pipe StorageSystemReplicas::read(
MutableColumns res_columns = storage_snapshot->metadata->getSampleBlock().cloneEmptyColumns();
for (size_t i = 0, size = col_database->size(); i < size; ++i)
auto settings = context->getSettingsRef();
size_t thread_pool_size = settings.system_replicas_fetch_threads;
if (settings.max_threads != 0)
thread_pool_size = std::min(thread_pool_size, static_cast<size_t>(settings.max_threads));
ThreadPool thread_pool(thread_pool_size);
size_t tables_size = col_database->size();
std::vector<StorageReplicatedMergeTree::Status> statuses(tables_size);
for (size_t i = 0; i < tables_size; ++i)
{
thread_pool.scheduleOrThrowOnError([i, &statuses, &replicated_tables, &col_database, &col_table, &with_zk_fields]
{
StorageReplicatedMergeTree::Status status;
dynamic_cast<StorageReplicatedMergeTree &>(
*replicated_tables
[(*col_database)[i].safeGet<const String &>()]
[(*col_table)[i].safeGet<const String &>()]).getStatus(status, with_zk_fields);
[(*col_table)[i].safeGet<const String &>()]).getStatus(statuses[i], with_zk_fields);
});
}
thread_pool.wait();
for (const auto & status: statuses)
{
size_t col_num = 3;
res_columns[col_num++]->insert(status.is_leader);
res_columns[col_num++]->insert(status.can_become_leader);