Remove total_max_threads logic from QueryPipelineBuilder; Add setMaxConcurrency(total_max_threads) in Server.cpp

This commit is contained in:
Roman Vasin 2022-05-31 19:43:15 +03:00
parent 441bd3e81f
commit b7cde2c5ad
5 changed files with 13 additions and 58 deletions

View File

@ -29,6 +29,7 @@
#include <Common/ClickHouseRevision.h>
#include <Common/DNSResolver.h>
#include <Common/CurrentMetrics.h>
#include <Common/ConcurrencyControl.h>
#include <Common/Macros.h>
#include <Common/ShellCommand.h>
#include <Common/StringUtils/StringUtils.h>
@ -1129,7 +1130,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
constexpr size_t thread_factor = 3;
total_max_threads = std::thread::hardware_concurrency() * thread_factor;
}
global_context->getProcessList().setTotalMaxThreads(total_max_threads);
if (total_max_threads)
ConcurrencyControl::instance().setMaxConcurrency(total_max_threads);
}
if (config->has("max_concurrent_queries"))

View File

@ -16,6 +16,7 @@
#include <Common/logger_useful.h>
#include <chrono>
namespace CurrentMetrics
{
extern const Metric Query;
@ -500,19 +501,6 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev
return per_query_infos;
}
size_t ProcessList::getTotalNumThreads() const
{
size_t total_num_threads = 0;
std::lock_guard lock(mutex);
for (const auto & process : processes)
{
auto qsi = process.getInfo(true);
total_num_threads += qsi.thread_ids.size();
}
return total_num_threads;
}
ProcessListForUser::ProcessListForUser(ProcessList * global_process_list)
: user_overcommit_tracker(global_process_list, this)

View File

@ -314,10 +314,6 @@ protected:
Container processes;
size_t max_size = 0; /// 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
/// The total maximum number of threads for all queries.
/// Sometimes, real total number of threads may exceed total_max_threads parameter.
size_t total_max_threads = 0; /// 0 means no limit. Otherwise, concurrency of a query is determined based on this parameter.
/// Stores per-user info: queries, statistics and limits
UserToQueries user_to_queries;
@ -359,17 +355,6 @@ public:
/// Get current state of process list.
Info getInfo(bool get_thread_list = false, bool get_profile_events = false, bool get_settings = false) const;
/// Get total number of threads for all queries in process list.
size_t getTotalNumThreads() const;
size_t getTotalMaxThreads() const { return total_max_threads; }
void setTotalMaxThreads(size_t total_max_threads_)
{
std::lock_guard lock(mutex);
total_max_threads = total_max_threads_;
}
/// Get current state of process list per user.
UserInfo getUserInfo(bool get_profile_events = false) const;

View File

@ -16,15 +16,12 @@
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/ProcessList.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentThread.h>
#include <Processors/DelayedPortsProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Core/Settings.h>
#include <Core/SettingsQuirks.h>
namespace DB
{
@ -499,30 +496,6 @@ void QueryPipelineBuilder::setProcessListElement(QueryStatus * elem)
}
}
size_t QueryPipelineBuilder::getNumThreads() const
{
auto num_threads = pipe.maxParallelStreams();
if (max_threads) //-V1051
num_threads = std::min(num_threads, max_threads);
if (process_list_element)
{
auto total_max_threads = process_list_element->getContext()->getProcessList().getTotalMaxThreads();
if (total_max_threads)
{
size_t current_total_num_threads = process_list_element->getContext()->getProcessList().getTotalNumThreads();
size_t total_available_threads = 0;
if (total_max_threads > current_total_num_threads)
total_available_threads = total_max_threads - current_total_num_threads;
num_threads = std::min(num_threads, total_available_threads);
}
}
num_threads = std::max<size_t>(1, num_threads);
return num_threads;
}
PipelineExecutorPtr QueryPipelineBuilder::execute()
{
if (!isCompleted())

View File

@ -7,7 +7,6 @@
#include <Storages/TableLockHolder.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
@ -133,7 +132,15 @@ public:
void setProcessListElement(QueryStatus * elem);
/// Recommend number of threads for pipeline execution.
size_t getNumThreads() const;
size_t getNumThreads() const
{
auto num_threads = pipe.maxParallelStreams();
if (max_threads) //-V1051
num_threads = std::min(num_threads, max_threads);
return std::max<size_t>(1, num_threads);
}
/// Set upper limit for the recommend number of threads
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }