diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 91647a5f165..4be91ce311f 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -155,6 +155,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) \ M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \ \ + M(UInt64, system_replicas_fetch_threads, 16, "The maximum number of threads to fetch data for system.replicas table.", 0) \ + \ M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \ \ M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \ diff --git a/src/Storages/System/StorageSystemReplicas.cpp b/src/Storages/System/StorageSystemReplicas.cpp index 0f7877a6e41..a6afb6eff0d 100644 --- a/src/Storages/System/StorageSystemReplicas.cpp +++ b/src/Storages/System/StorageSystemReplicas.cpp @@ -151,14 +151,32 @@ Pipe StorageSystemReplicas::read( MutableColumns res_columns = storage_snapshot->metadata->getSampleBlock().cloneEmptyColumns(); - for (size_t i = 0, size = col_database->size(); i < size; ++i) - { - StorageReplicatedMergeTree::Status status; - dynamic_cast( - *replicated_tables - [(*col_database)[i].safeGet()] - [(*col_table)[i].safeGet()]).getStatus(status, with_zk_fields); + auto settings = context->getSettingsRef(); + size_t thread_pool_size = settings.system_replicas_fetch_threads; + if (settings.max_threads != 0) + thread_pool_size = std::min(thread_pool_size, static_cast(settings.max_threads)); + + ThreadPool thread_pool(thread_pool_size); + + size_t tables_size = col_database->size(); + std::vector statuses(tables_size); + + for (size_t i = 0; i < tables_size; ++i) + { + thread_pool.scheduleOrThrowOnError([i, &statuses, &replicated_tables, &col_database, &col_table, &with_zk_fields] + { + dynamic_cast( + *replicated_tables + [(*col_database)[i].safeGet()] + [(*col_table)[i].safeGet()]).getStatus(statuses[i], with_zk_fields); + }); + } + + thread_pool.wait(); + + for (const auto & status: statuses) + { size_t col_num = 3; res_columns[col_num++]->insert(status.is_leader); res_columns[col_num++]->insert(status.can_become_leader);