From fef57b4e3b615c8ec80ab4c116a2afd9c8b70f91 Mon Sep 17 00:00:00 2001 From: Nikolay Degterinsky Date: Wed, 14 Dec 2022 02:12:08 +0000 Subject: [PATCH] Better thread pool --- src/Core/Settings.h | 2 -- src/Storages/System/StorageSystemReplicas.cpp | 27 +++++++++++++++---- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 4be91ce311f..91647a5f165 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -155,8 +155,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) \ M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \ \ - M(UInt64, system_replicas_fetch_threads, 16, "The maximum number of threads to fetch data for system.replicas table.", 0) \ - \ M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \ \ M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \ diff --git a/src/Storages/System/StorageSystemReplicas.cpp b/src/Storages/System/StorageSystemReplicas.cpp index a6afb6eff0d..d36de9afe12 100644 --- a/src/Storages/System/StorageSystemReplicas.cpp +++ b/src/Storages/System/StorageSystemReplicas.cpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace DB @@ -151,30 +152,46 @@ Pipe StorageSystemReplicas::read( MutableColumns res_columns = storage_snapshot->metadata->getSampleBlock().cloneEmptyColumns(); - auto settings = context->getSettingsRef(); - size_t thread_pool_size = settings.system_replicas_fetch_threads; + size_t tables_size = col_database->size(); + size_t thread_pool_size = std::min(tables_size, static_cast(getNumberOfPhysicalCPUCores())); + auto settings = context->getSettingsRef(); if (settings.max_threads != 0) thread_pool_size = std::min(thread_pool_size, static_cast(settings.max_threads)); ThreadPool thread_pool(thread_pool_size); + std::atomic error_flag = false; + Exception exception; - size_t tables_size = col_database->size(); std::vector statuses(tables_size); for (size_t i = 0; i < tables_size; ++i) { - thread_pool.scheduleOrThrowOnError([i, &statuses, &replicated_tables, &col_database, &col_table, &with_zk_fields] + thread_pool.scheduleOrThrowOnError([&, i=i] { - dynamic_cast( + try + { + dynamic_cast( *replicated_tables [(*col_database)[i].safeGet()] [(*col_table)[i].safeGet()]).getStatus(statuses[i], with_zk_fields); + } + catch (...) + { + tryLogCurrentException("system.replicas", "Failed to fetch system.replicas data"); + + /// We capture one of the exceptions to be thrown later + if (!error_flag.exchange(true)) + exception = Exception(getCurrentExceptionCode(), getCurrentExceptionMessage(false)); + } }); } thread_pool.wait(); + if (error_flag) + throw exception; + for (const auto & status: statuses) { size_t col_num = 3;