renamed global_max_threads parameter to total_max_threads

This commit is contained in:
Roman Vasin 2022-05-13 16:56:03 +03:00
parent 87f25c6864
commit b06d84b3e5
9 changed files with 36 additions and 36 deletions

View File

@ -1107,18 +1107,18 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config->has("max_partition_size_to_drop"))
global_context->setMaxPartitionSizeToDrop(config->getUInt64("max_partition_size_to_drop"));
if (config->has("global_max_threads")) {
if (config->has("total_max_threads")) {
auto adqm_log = &Poco::Logger::get("ADQM");
auto global_max_threads = config->getInt("global_max_threads", 0);
LOG_DEBUG(adqm_log,"From config.xml global_max_threads: {}", global_max_threads);
if (global_max_threads == -1) {
// Based on tests global_max_threads has an optimal value when it's about two times of logical CPU cores
auto total_max_threads = config->getInt("total_max_threads", 0);
LOG_DEBUG(adqm_log,"From config.xml total_max_threads: {}", total_max_threads);
if (total_max_threads == -1) {
// Based on tests total_max_threads has an optimal value when it's about two times of logical CPU cores
constexpr size_t thread_factor = 2;
LOG_DEBUG(adqm_log,"number of logical cores: {}", std::thread::hardware_concurrency());
global_max_threads = std::thread::hardware_concurrency()*thread_factor;
total_max_threads = std::thread::hardware_concurrency()*thread_factor;
}
LOG_DEBUG(adqm_log,"Finally global_max_threads: {}", global_max_threads);
global_context->getProcessList().setGlobalMaxThreads(global_max_threads);
LOG_DEBUG(adqm_log,"Finally total_max_threads: {}", total_max_threads);
global_context->getProcessList().setGlobalMaxThreads(total_max_threads);
}
if (config->has("max_concurrent_queries"))

View File

@ -75,7 +75,7 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
auto adqm_log = &Poco::Logger::get("ADQM");
LOG_DEBUG(adqm_log,"Inserting query into process list: {}", query_);
LOG_DEBUG(adqm_log,"Num of concurrent queries: {}", processes.size());
LOG_DEBUG(adqm_log,"Global Num Threads: {}", getGlobalNumThreads());
LOG_DEBUG(adqm_log,"Global Num Threads: {}", getTotalNumThreads());
const ClientInfo & client_info = query_context->getClientInfo();
const Settings & settings = query_context->getSettingsRef();
@ -505,18 +505,18 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev
return per_query_infos;
}
size_t ProcessList::getGlobalNumThreads() const
size_t ProcessList::getTotalNumThreads() const
{
size_t global_num_threads = 0;
size_t total_num_threads = 0;
std::lock_guard lock(mutex);
for (const auto & process : processes)
{
auto qsi = process.getInfo(true);
global_num_threads += qsi.thread_ids.size();
total_num_threads += qsi.thread_ids.size();
}
return global_num_threads;
return total_num_threads;
}
ProcessListForUser::ProcessListForUser(ProcessList * global_process_list)

View File

@ -302,7 +302,7 @@ protected:
size_t max_size = 0; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
/// The total maximum number of threads for all requests.
size_t global_max_threads = 0; /// 0 means no limit.
size_t total_max_threads = 0; /// 0 means no limit.
/// Stores per-user info: queries, statistics and limits
UserToQueries user_to_queries;
@ -346,14 +346,14 @@ public:
Info getInfo(bool get_thread_list = false, bool get_profile_events = false, bool get_settings = false) const;
/// Get total number of threads for all queries in process list.
size_t getGlobalNumThreads() const;
size_t getTotalNumThreads() const;
size_t getGlobalMaxThreads() const { return global_max_threads; }
size_t getTotalMaxThreads() const { return total_max_threads; }
void setGlobalMaxThreads(size_t global_max_threads_)
{
std::lock_guard lock(mutex);
global_max_threads = global_max_threads_;
total_max_threads = global_max_threads_;
}
/// Get current state of process list per user.

View File

@ -473,22 +473,22 @@ size_t QueryPipelineBuilder::getNumThreads() const
LOG_DEBUG(adqm_log,"Recommended num threads: {}", num_threads);
auto context = process_list_element->getContext();
auto global_max_threads = context->getProcessList().getGlobalMaxThreads();
if (process_list_element && global_max_threads) {
LOG_DEBUG(adqm_log,"Global number of threads from config: {}", global_max_threads);
LOG_DEBUG(adqm_log,"Current global num threads: {}",
context->getProcessList().getGlobalNumThreads());
size_t current_global_num_threads = context->getProcessList().getGlobalNumThreads();
size_t globally_available_threads = 0;
if (global_max_threads > current_global_num_threads)
globally_available_threads = global_max_threads - current_global_num_threads;
LOG_DEBUG(adqm_log,"Globally available threads: {}", globally_available_threads);
num_threads = std::min(num_threads, globally_available_threads);
auto total_max_threads = context->getProcessList().getTotalMaxThreads();
if (process_list_element && total_max_threads) {
LOG_DEBUG(adqm_log,"Total number of threads from config: {}", total_max_threads);
LOG_DEBUG(adqm_log,"Current total num threads: {}",
context->getProcessList().getTotalNumThreads());
size_t current_total_num_threads = context->getProcessList().getTotalNumThreads();
size_t total_available_threads = 0;
if (total_max_threads > current_total_num_threads)
total_available_threads = total_max_threads - current_total_num_threads;
LOG_DEBUG(adqm_log,"Total available threads: {}", total_available_threads);
num_threads = std::min(num_threads, total_available_threads);
LOG_DEBUG(adqm_log,"Recommended num threads: {}", num_threads);
}
num_threads = std::max<size_t>(1, num_threads);
LOG_DEBUG(adqm_log,"Final num threads: {}", num_threads);
LOG_DEBUG(adqm_log,"Finally num threads: {}", num_threads);
return num_threads;
}

View File

@ -1,6 +1,6 @@
<?xml version="1.0"?>
<clickhouse>
<global_max_threads>50</global_max_threads>
<total_max_threads>50</total_max_threads>
<query_log>
<database>system</database>
<table>query_log</table>

View File

@ -15,13 +15,13 @@ def started_cluster():
cluster.shutdown()
def test_global_max_threads_default(started_cluster):
node1.query("SELECT count(*) FROM numbers_mt(10000000)", query_id="test_global_max_threads_1");
def test_total_max_threads_default(started_cluster):
node1.query("SELECT count(*) FROM numbers_mt(10000000)", query_id="test_total_max_threads_1");
node1.query("SYSTEM FLUSH LOGS");
assert node1.query("select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_global_max_threads_1'") == "102\n"
assert node1.query("select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_total_max_threads_1'") == "102\n"
def test_global_max_threads_defined(started_cluster):
node2.query("SELECT count(*) FROM numbers_mt(10000000)", query_id="test_global_max_threads_2");
def test_total_max_threads_defined(started_cluster):
node2.query("SELECT count(*) FROM numbers_mt(10000000)", query_id="test_total_max_threads_2");
node2.query("SYSTEM FLUSH LOGS");
assert node2.query("select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_global_max_threads_2'") == "51\n"
assert node2.query("select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_total_max_threads_2'") == "51\n"