Merge pull request #49618 from ClickHouse/concurrency-control-controllable

Make concurrency control controllable
This commit is contained in:
Sergei Trifonov 2023-08-29 19:44:51 +02:00 committed by GitHub
commit 802579f3f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 86 additions and 48 deletions

View File

@ -1471,8 +1471,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
sendDataFromPipe(
std::move(pipe),
parsed_query,
have_data_in_stdin
);
have_data_in_stdin);
}
catch (Exception & e)
{

View File

@ -886,7 +886,7 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
return sink;
});
executor = pipeline.execute();
executor->execute(/*num_threads = */ 1);
executor->execute(/*num_threads = */ 1, false);
auto read_rows = sink->getNumReadRows();
rows += read_rows;

View File

@ -5,7 +5,6 @@
#include <Storages/ConstraintsDescription.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/LimitReadBuffer.h>
@ -18,7 +17,6 @@
#include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h>
#include <base/find_symbols.h>
#include <base/scope_guard.h>

View File

@ -47,6 +47,7 @@ class IColumn;
M(MaxThreads, max_final_threads, 0, "The maximum number of threads to read from table with FINAL.", 0) \
M(UInt64, max_threads_for_indexes, 0, "The maximum number of threads process indices.", 0) \
M(MaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
M(Bool, use_concurrency_control, true, "Respect the server's concurrency control (see the `concurrent_threads_soft_limit_num` and `concurrent_threads_soft_limit_ratio_to_cores` global server settings). If disabled, it allows using a larger number of threads even if the server is overloaded (not recommended for normal usage, and needed mostly for tests).", 0) \
M(MaxThreads, max_download_threads, 4, "The maximum number of threads to download data (e.g. for URL engine).", 0) \
M(UInt64, max_download_buffer_size, 10*1024*1024, "The maximal size of buffer for parallel downloading (e.g. for URL engine) per each thread.", 0) \
M(UInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \

View File

@ -288,7 +288,8 @@ public:
: ISource(pipeline_.getHeader())
, pipeline(std::move(pipeline_))
, executor(pipeline)
{}
{
}
std::string getName() const override
{

View File

@ -184,7 +184,9 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(data.getContext()->getProgressCallback());
while (block.rows() == 0 && executor.pull(block));
while (block.rows() == 0 && executor.pull(block))
{
}
if (block.rows() == 0)
{
@ -216,7 +218,8 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
Block tmp_block;
while (tmp_block.rows() == 0 && executor.pull(tmp_block))
;
{
}
if (tmp_block.rows() != 0)
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row");

View File

@ -616,6 +616,7 @@ BlockIO InterpreterInsertQuery::execute()
presink_chains.at(0).appendChain(std::move(sink_chains.at(0)));
res.pipeline = QueryPipeline(std::move(presink_chains[0]));
res.pipeline.setNumThreads(std::min<size_t>(res.pipeline.getNumThreads(), settings.max_threads));
res.pipeline.setConcurrencyControl(settings.use_concurrency_control);
if (query.hasInlinedData() && !async_insert)
{

View File

@ -68,7 +68,6 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -84,12 +83,9 @@
#include <Core/ProtocolDefines.h>
#include <Functions/IFunction.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/IJoin.h>
#include <QueryPipeline/SizeLimits.h>
#include <base/map.h>
#include <base/sort.h>
#include <base/types.h>
#include <Common/FieldVisitorToString.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Common/checkStackSize.h>
@ -97,7 +93,6 @@
#include <Common/typeid_cast.h>
#include <Common/ProfileEvents.h>
#include "config_version.h"
namespace ProfileEvents
{
@ -2527,6 +2522,8 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (!query_plan.getMaxThreads() || is_remote)
query_plan.setMaxThreads(max_threads_execute_query);
query_plan.setConcurrencyControl(settings.use_concurrency_control);
/// Aliases in table declaration.
if (processing_stage == QueryProcessingStage::FetchColumns && alias_actions)
{

View File

@ -790,6 +790,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
*/
if (!query_plan.getMaxThreads() || is_remote)
query_plan.setMaxThreads(max_threads_execute_query);
query_plan.setConcurrencyControl(settings.use_concurrency_control);
}
else
{

View File

@ -32,7 +32,8 @@ struct CompletedPipelineExecutor::Data
}
};
static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads)
static void threadFunction(
CompletedPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads, bool concurrency_control)
{
SCOPE_EXIT_SAFE(
if (thread_group)
@ -45,7 +46,7 @@ static void threadFunction(CompletedPipelineExecutor::Data & data, ThreadGroupPt
if (thread_group)
CurrentThread::attachToGroup(thread_group);
data.executor->execute(num_threads);
data.executor->execute(num_threads, concurrency_control);
}
catch (...)
{
@ -79,9 +80,13 @@ void CompletedPipelineExecutor::execute()
/// Avoid passing this to lambda, copy ptr to data instead.
/// Destructor of unique_ptr copy raw ptr into local variable first, only then calls object destructor.
auto func = [data_ptr = data.get(), num_threads = pipeline.getNumThreads(), thread_group = CurrentThread::getGroup()]
auto func = [
data_ptr = data.get(),
num_threads = pipeline.getNumThreads(),
thread_group = CurrentThread::getGroup(),
concurrency_control = pipeline.getConcurrencyControl()]
{
threadFunction(*data_ptr, thread_group, num_threads);
threadFunction(*data_ptr, thread_group, num_threads, concurrency_control);
};
data->thread = ThreadFromGlobalPool(std::move(func));
@ -102,7 +107,7 @@ void CompletedPipelineExecutor::execute()
{
PipelineExecutor executor(pipeline.processors, pipeline.process_list_element);
executor.setReadProgressCallback(pipeline.getReadProgressCallback());
executor.execute(pipeline.getNumThreads());
executor.execute(pipeline.getNumThreads(), pipeline.getConcurrencyControl());
}
}

View File

@ -3,16 +3,13 @@
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/setThreadName.h>
#include <Common/MemoryTracker.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Executors/ExecutingGraph.h>
#include <QueryPipeline/printPipeline.h>
#include <QueryPipeline/ReadProgressCallback.h>
#include <Processors/ISource.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/Context.h>
#include <Common/scope_guard_safe.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <Common/OpenTelemetryTraceContext.h>
@ -99,7 +96,7 @@ void PipelineExecutor::finish()
tasks.finish();
}
void PipelineExecutor::execute(size_t num_threads)
void PipelineExecutor::execute(size_t num_threads, bool concurrency_control)
{
checkTimeLimit();
if (num_threads < 1)
@ -110,7 +107,7 @@ void PipelineExecutor::execute(size_t num_threads)
try
{
executeImpl(num_threads);
executeImpl(num_threads, concurrency_control);
/// Execution can be stopped because of exception. Check and rethrow if any.
for (auto & node : graph->nodes)
@ -137,12 +134,11 @@ bool PipelineExecutor::executeStep(std::atomic_bool * yield_flag)
{
if (!is_execution_initialized)
{
initializeExecution(1);
initializeExecution(1, true);
// Acquire slot until we are done
single_thread_slot = slots->tryAcquire();
if (!single_thread_slot)
abort(); // Unable to allocate slot for the first thread, but we just allocated at least one slot
chassert(single_thread_slot && "Unable to allocate slot for the first thread, but we just allocated at least one slot");
if (yield_flag && *yield_flag)
return true;
@ -297,14 +293,16 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
#endif
}
void PipelineExecutor::initializeExecution(size_t num_threads)
void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_control)
{
is_execution_initialized = true;
size_t use_threads = num_threads;
/// Allocate CPU slots from concurrency control
constexpr size_t min_threads = 1;
size_t min_threads = concurrency_control ? 1uz : num_threads;
slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
size_t use_threads = slots->grantedCount();
use_threads = slots->grantedCount();
Queue queue;
graph->initializeExecution(queue);
@ -320,7 +318,7 @@ void PipelineExecutor::spawnThreads()
{
while (auto slot = slots->tryAcquire())
{
size_t thread_num = threads++;
size_t thread_num = threads.fetch_add(1);
/// Count of threads in use should be updated for proper finish() condition.
/// NOTE: this will not decrease `use_threads` below initially granted count
@ -352,9 +350,9 @@ void PipelineExecutor::spawnThreads()
}
}
void PipelineExecutor::executeImpl(size_t num_threads)
void PipelineExecutor::executeImpl(size_t num_threads, bool concurrency_control)
{
initializeExecution(num_threads);
initializeExecution(num_threads, concurrency_control);
bool finished_flag = false;

View File

@ -38,7 +38,7 @@ public:
/// Execute pipeline in multiple threads. Must be called once.
/// In case of exception during execution throws any occurred.
void execute(size_t num_threads);
void execute(size_t num_threads, bool concurrency_control);
/// Execute single step. Step will be stopped when yield_flag is true.
/// Execution is happened in a single thread.
@ -67,7 +67,7 @@ private:
ExecutorTasks tasks;
// Concurrency control related
/// Concurrency control related
ConcurrencyControl::AllocationPtr slots;
ConcurrencyControl::SlotPtr single_thread_slot; // slot for single-thread mode to work using executeStep()
std::unique_ptr<ThreadPool> pool;
@ -92,12 +92,12 @@ private:
using Queue = std::queue<ExecutingGraph::Node *>;
void initializeExecution(size_t num_threads); /// Initialize executor contexts and task_queue.
void initializeExecution(size_t num_threads, bool concurrency_control); /// Initialize executor contexts and task_queue.
void finalizeExecution(); /// Check all processors are finished.
void spawnThreads();
/// Methods connected to execution.
void executeImpl(size_t num_threads);
void executeImpl(size_t num_threads, bool concurrency_control);
void executeStepImpl(size_t thread_num, std::atomic_bool * yield_flag = nullptr);
void executeSingleThread(size_t thread_num);
void finish();

View File

@ -67,7 +67,8 @@ const Block & PullingAsyncPipelineExecutor::getHeader() const
return lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader();
}
static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads)
static void threadFunction(
PullingAsyncPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads, bool concurrency_control)
{
SCOPE_EXIT_SAFE(
if (thread_group)
@ -80,7 +81,7 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou
if (thread_group)
CurrentThread::attachToGroup(thread_group);
data.executor->execute(num_threads);
data.executor->execute(num_threads, concurrency_control);
}
catch (...)
{
@ -108,7 +109,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
auto func = [&, thread_group = CurrentThread::getGroup()]()
{
threadFunction(*data, thread_group, pipeline.getNumThreads());
threadFunction(*data, thread_group, pipeline.getNumThreads(), pipeline.getConcurrencyControl());
};
data->thread = ThreadFromGlobalPool(std::move(func));

View File

@ -98,7 +98,8 @@ struct PushingAsyncPipelineExecutor::Data
}
};
static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads)
static void threadFunction(
PushingAsyncPipelineExecutor::Data & data, ThreadGroupPtr thread_group, size_t num_threads, bool concurrency_control)
{
SCOPE_EXIT_SAFE(
if (thread_group)
@ -111,7 +112,7 @@ static void threadFunction(PushingAsyncPipelineExecutor::Data & data, ThreadGrou
if (thread_group)
CurrentThread::attachToGroup(thread_group);
data.executor->execute(num_threads);
data.executor->execute(num_threads, concurrency_control);
}
catch (...)
{
@ -172,7 +173,7 @@ void PushingAsyncPipelineExecutor::start()
auto func = [&, thread_group = CurrentThread::getGroup()]()
{
threadFunction(*data, thread_group, pipeline.getNumThreads());
threadFunction(*data, thread_group, pipeline.getNumThreads(), pipeline.getConcurrencyControl());
};
data->thread = ThreadFromGlobalPool(std::move(func));

View File

@ -168,7 +168,6 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
QueryPipelineBuilderPtr last_pipeline;
std::stack<Frame> stack;
stack.push(Frame{.node = root});

View File

@ -98,6 +98,9 @@ public:
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
size_t getMaxThreads() const { return max_threads; }
void setConcurrencyControl(bool concurrency_control_) { concurrency_control = concurrency_control_; }
bool getConcurrencyControl() const { return concurrency_control; }
/// Tree node. Step and it's children.
struct Node
{
@ -120,6 +123,7 @@ private:
/// Those fields are passed to QueryPipeline.
size_t max_threads = 0;
bool concurrency_control = false;
};
std::string debugExplainStep(const IQueryPlanStep & step);

View File

@ -432,6 +432,7 @@ Chain buildPushingToViewsChain(
processors.emplace_back(std::move(finalizing_views));
result_chain = Chain(std::move(processors));
result_chain.setNumThreads(std::min(views_data->max_threads, max_parallel_streams));
result_chain.setConcurrencyControl(settings.use_concurrency_control);
}
if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get()))

View File

@ -29,7 +29,7 @@ TEST(Processors, PortsConnected)
QueryStatusPtr element;
PipelineExecutor executor(processors, element);
executor.execute(1);
executor.execute(1, false);
}
TEST(Processors, PortsNotConnected)
@ -55,7 +55,7 @@ TEST(Processors, PortsNotConnected)
{
QueryStatusPtr element;
PipelineExecutor executor(processors, element);
executor.execute(1);
executor.execute(1, false);
ASSERT_TRUE(false) << "Should have thrown.";
}
catch (DB::Exception & e)

View File

@ -29,6 +29,9 @@ public:
size_t getNumThreads() const { return num_threads; }
void setNumThreads(size_t num_threads_) { num_threads = num_threads_; }
bool getConcurrencyControl() const { return concurrency_control; }
void setConcurrencyControl(bool concurrency_control_) { concurrency_control = concurrency_control_; }
void addSource(ProcessorPtr processor);
void addSink(ProcessorPtr processor);
void appendChain(Chain chain);
@ -66,6 +69,7 @@ private:
/// input port output port
std::list<ProcessorPtr> processors;
size_t num_threads = 0;
bool concurrency_control = false;
};
}

View File

@ -100,6 +100,9 @@ public:
size_t getNumThreads() const { return num_threads; }
void setNumThreads(size_t num_threads_) { num_threads = num_threads_; }
bool getConcurrencyControl() const { return concurrency_control; }
void setConcurrencyControl(bool concurrency_control_) { concurrency_control = concurrency_control_; }
void setProcessListElement(QueryStatusPtr elem);
void setProgressCallback(const ProgressCallback & callback);
void setLimitsAndQuota(const StreamLocalLimits & limits, std::shared_ptr<const EnabledQuota> quota_);
@ -157,6 +160,7 @@ private:
IOutputFormat * output_format = nullptr;
size_t num_threads = 0;
bool concurrency_control = false;
friend class PushingPipelineExecutor;
friend class PullingPipelineExecutor;

View File

@ -278,6 +278,7 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
bool will_limit_max_threads = true;
size_t max_threads = 0;
bool concurrency_control = false;
Pipes pipes;
QueryPlanResourceHolder resources;
@ -297,6 +298,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
/// It may happen if max_distributed_connections > max_threads
if (pipeline.max_threads > max_threads_limit)
max_threads_limit = pipeline.max_threads;
concurrency_control = pipeline.getConcurrencyControl();
}
QueryPipelineBuilder pipeline;
@ -307,6 +310,7 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
{
pipeline.setMaxThreads(max_threads);
pipeline.limitMaxThreads(max_threads_limit);
pipeline.setConcurrencyControl(concurrency_control);
}
pipeline.setCollectedProcessors(nullptr);
@ -644,6 +648,7 @@ QueryPipeline QueryPipelineBuilder::getPipeline(QueryPipelineBuilder builder)
QueryPipeline res(std::move(builder.pipe));
res.addResources(std::move(builder.resources));
res.setNumThreads(builder.getNumThreads());
res.setConcurrencyControl(builder.getConcurrencyControl());
res.setProcessListElement(builder.process_list_element);
res.setProgressCallback(builder.progress_callback);
return res;

View File

@ -183,6 +183,16 @@ public:
max_threads = max_threads_;
}
void setConcurrencyControl(bool concurrency_control_)
{
concurrency_control = concurrency_control_;
}
bool getConcurrencyControl()
{
return concurrency_control;
}
void addResources(QueryPlanResourceHolder resources_) { resources = std::move(resources_); }
void setQueryIdHolder(std::shared_ptr<QueryIdHolder> query_id_holder) { resources.query_id_holders.emplace_back(std::move(query_id_holder)); }
void addContext(ContextPtr context) { resources.interpreter_context.emplace_back(std::move(context)); }
@ -201,6 +211,8 @@ private:
/// Sometimes, more streams are created then the number of threads for more optimal execution.
size_t max_threads = 0;
bool concurrency_control = false;
QueryStatusPtr process_list_element;
ProgressCallback progress_callback = nullptr;

View File

@ -1134,7 +1134,7 @@ namespace
});
auto executor = cur_pipeline.execute();
executor->execute(1);
executor->execute(1, false);
}
}

View File

@ -719,6 +719,7 @@ bool StorageFileLog::streamToViews()
{
block_io.pipeline.complete(std::move(input));
block_io.pipeline.setNumThreads(max_streams_number);
block_io.pipeline.setConcurrencyControl(new_context->getSettingsRef().use_concurrency_control);
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();

View File

@ -865,6 +865,7 @@ bool StorageKafka::streamToViews()
// we need to read all consumers in parallel (sequential read may lead to situation
// when some of consumers are not used, and will break some Kafka consumer invariants)
block_io.pipeline.setNumThreads(stream_count);
block_io.pipeline.setConcurrencyControl(kafka_context->getSettingsRef().use_concurrency_control);
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
CompletedPipelineExecutor executor(block_io.pipeline);

View File

@ -478,7 +478,7 @@ void StorageLiveView::writeBlock(const Block & block, ContextPtr local_context)
});
auto executor = pipeline.execute();
executor->execute(pipeline.getNumThreads());
executor->execute(pipeline.getNumThreads(), local_context->getSettingsRef().use_concurrency_control);
}
void StorageLiveView::refresh()

View File

@ -1571,7 +1571,7 @@ void StorageWindowView::writeIntoWindowView(
});
auto executor = builder.execute();
executor->execute(builder.getNumThreads());
executor->execute(builder.getNumThreads(), local_context->getSettingsRef().use_concurrency_control);
}
void StorageWindowView::startup()