Use SettingMaxThreads only in Settings, call getNumberOfPhysicalCPUCores() instead of SettingMaxThreads::getAuto().

This commit is contained in:
Vitaly Baranov 2020-07-17 02:12:47 +03:00
parent cd372de417
commit 668653600c
7 changed files with 42 additions and 28 deletions

View File

@ -1,5 +1,6 @@
#include <Common/ThreadPool.h>
#include <Common/Exception.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <cassert>
#include <type_traits>
@ -24,6 +25,13 @@ namespace CurrentMetrics
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl()
: ThreadPoolImpl(getNumberOfPhysicalCPUCores())
{
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_)
: ThreadPoolImpl(max_threads_, max_threads_, max_threads_)

View File

@ -29,6 +29,9 @@ class ThreadPoolImpl
public:
using Job = std::function<void()>;
/// Maximum number of threads is based on the number of physical cores.
ThreadPoolImpl();
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
explicit ThreadPoolImpl(size_t max_threads_);

View File

@ -9,29 +9,33 @@
unsigned getNumberOfPhysicalCPUCores()
{
#if USE_CPUID
cpu_raw_data_t raw_data;
cpu_id_t data;
static const unsigned number = []
{
# if USE_CPUID
cpu_raw_data_t raw_data;
cpu_id_t data;
/// On Xen VMs, libcpuid returns wrong info (zero number of cores). Fallback to alternative method.
/// Also, libcpuid does not support some CPUs like AMD Hygon C86 7151.
if (0 != cpuid_get_raw_data(&raw_data) || 0 != cpu_identify(&raw_data, &data) || data.num_logical_cpus == 0)
/// On Xen VMs, libcpuid returns wrong info (zero number of cores). Fallback to alternative method.
/// Also, libcpuid does not support some CPUs like AMD Hygon C86 7151.
if (0 != cpuid_get_raw_data(&raw_data) || 0 != cpu_identify(&raw_data, &data) || data.num_logical_cpus == 0)
return std::thread::hardware_concurrency();
unsigned res = data.num_cores * data.total_logical_cpus / data.num_logical_cpus;
/// Also, libcpuid gives strange result on Google Compute Engine VMs.
/// Example:
/// num_cores = 12, /// number of physical cores on current CPU socket
/// total_logical_cpus = 1, /// total number of logical cores on all sockets
/// num_logical_cpus = 24. /// number of logical cores on current CPU socket
/// It means two-way hyper-threading (24 / 12), but contradictory, 'total_logical_cpus' == 1.
if (res != 0)
return res;
# endif
/// As a fallback (also for non-x86 architectures) assume there are no hyper-threading on the system.
/// (Actually, only Aarch64 is supported).
return std::thread::hardware_concurrency();
unsigned res = data.num_cores * data.total_logical_cpus / data.num_logical_cpus;
/// Also, libcpuid gives strange result on Google Compute Engine VMs.
/// Example:
/// num_cores = 12, /// number of physical cores on current CPU socket
/// total_logical_cpus = 1, /// total number of logical cores on all sockets
/// num_logical_cpus = 24. /// number of logical cores on current CPU socket
/// It means two-way hyper-threading (24 / 12), but contradictory, 'total_logical_cpus' == 1.
if (res != 0)
return res;
#endif
/// As a fallback (also for non-x86 architectures) assume there are no hyper-threading on the system.
/// (Actually, only Aarch64 is supported).
return std::thread::hardware_concurrency();
}();
return number;
}

View File

@ -210,8 +210,7 @@ void SettingMaxThreads::setAuto()
UInt64 SettingMaxThreads::getAutoValue()
{
static auto res = getNumberOfPhysicalCPUCores();
return res;
return getNumberOfPhysicalCPUCores();
}

View File

@ -426,7 +426,7 @@ void DatabaseOnDisk::iterateMetadataFiles(const Context & context, const Iterati
}
/// Read and parse metadata in parallel
ThreadPool pool(SettingMaxThreads().getAutoValue());
ThreadPool pool;
for (const auto & file : metadata_files)
{
pool.scheduleOrThrowOnError([&]()

View File

@ -152,7 +152,7 @@ void DatabaseOrdinary::loadStoredObjects(Context & context, bool has_force_resto
std::atomic<size_t> tables_processed{0};
std::atomic<size_t> dictionaries_processed{0};
ThreadPool pool(SettingMaxThreads().getAutoValue());
ThreadPool pool;
/// Attach tables.
for (const auto & name_with_query : file_names)

View File

@ -560,7 +560,7 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
dropped_metadata.emplace(std::move(full_path), std::move(dropped_id));
}
ThreadPool pool(SettingMaxThreads().getAutoValue());
ThreadPool pool;
for (const auto & elem : dropped_metadata)
{
pool.scheduleOrThrowOnError([&]()