mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
Better thread pool
This commit is contained in:
parent
540f890291
commit
fef57b4e3b
@ -155,8 +155,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
|||||||
\
|
\
|
||||||
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
|
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
|
||||||
\
|
\
|
||||||
M(UInt64, system_replicas_fetch_threads, 16, "The maximum number of threads to fetch data for system.replicas table.", 0) \
|
|
||||||
\
|
|
||||||
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
|
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
|
||||||
\
|
\
|
||||||
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \
|
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Databases/IDatabase.h>
|
#include <Databases/IDatabase.h>
|
||||||
#include <Processors/Sources/SourceFromSingleChunk.h>
|
#include <Processors/Sources/SourceFromSingleChunk.h>
|
||||||
|
#include <Common/getNumberOfPhysicalCPUCores.h>
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -151,30 +152,46 @@ Pipe StorageSystemReplicas::read(
|
|||||||
|
|
||||||
MutableColumns res_columns = storage_snapshot->metadata->getSampleBlock().cloneEmptyColumns();
|
MutableColumns res_columns = storage_snapshot->metadata->getSampleBlock().cloneEmptyColumns();
|
||||||
|
|
||||||
auto settings = context->getSettingsRef();
|
size_t tables_size = col_database->size();
|
||||||
size_t thread_pool_size = settings.system_replicas_fetch_threads;
|
size_t thread_pool_size = std::min(tables_size, static_cast<size_t>(getNumberOfPhysicalCPUCores()));
|
||||||
|
|
||||||
|
auto settings = context->getSettingsRef();
|
||||||
if (settings.max_threads != 0)
|
if (settings.max_threads != 0)
|
||||||
thread_pool_size = std::min(thread_pool_size, static_cast<size_t>(settings.max_threads));
|
thread_pool_size = std::min(thread_pool_size, static_cast<size_t>(settings.max_threads));
|
||||||
|
|
||||||
ThreadPool thread_pool(thread_pool_size);
|
ThreadPool thread_pool(thread_pool_size);
|
||||||
|
std::atomic<bool> error_flag = false;
|
||||||
|
Exception exception;
|
||||||
|
|
||||||
size_t tables_size = col_database->size();
|
|
||||||
std::vector<StorageReplicatedMergeTree::Status> statuses(tables_size);
|
std::vector<StorageReplicatedMergeTree::Status> statuses(tables_size);
|
||||||
|
|
||||||
for (size_t i = 0; i < tables_size; ++i)
|
for (size_t i = 0; i < tables_size; ++i)
|
||||||
{
|
{
|
||||||
thread_pool.scheduleOrThrowOnError([i, &statuses, &replicated_tables, &col_database, &col_table, &with_zk_fields]
|
thread_pool.scheduleOrThrowOnError([&, i=i]
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
dynamic_cast<StorageReplicatedMergeTree &>(
|
dynamic_cast<StorageReplicatedMergeTree &>(
|
||||||
*replicated_tables
|
*replicated_tables
|
||||||
[(*col_database)[i].safeGet<const String &>()]
|
[(*col_database)[i].safeGet<const String &>()]
|
||||||
[(*col_table)[i].safeGet<const String &>()]).getStatus(statuses[i], with_zk_fields);
|
[(*col_table)[i].safeGet<const String &>()]).getStatus(statuses[i], with_zk_fields);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
tryLogCurrentException("system.replicas", "Failed to fetch system.replicas data");
|
||||||
|
|
||||||
|
/// We capture one of the exceptions to be thrown later
|
||||||
|
if (!error_flag.exchange(true))
|
||||||
|
exception = Exception(getCurrentExceptionCode(), getCurrentExceptionMessage(false));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
thread_pool.wait();
|
thread_pool.wait();
|
||||||
|
|
||||||
|
if (error_flag)
|
||||||
|
throw exception;
|
||||||
|
|
||||||
for (const auto & status: statuses)
|
for (const auto & status: statuses)
|
||||||
{
|
{
|
||||||
size_t col_num = 3;
|
size_t col_num = 3;
|
||||||
|
Loading…
Reference in New Issue
Block a user