diff --git a/docs/en/operations/system-tables/metrics.md b/docs/en/operations/system-tables/metrics.md index f253b164e2a..d9aa1f58326 100644 --- a/docs/en/operations/system-tables/metrics.md +++ b/docs/en/operations/system-tables/metrics.md @@ -737,6 +737,14 @@ Number of sessions (connections) to ZooKeeper. Should be no more than one, becau Number of watches (event subscriptions) in ZooKeeper. +### ConcurrencyControlAcquired + +Total number of acquired CPU slots. + +### ConcurrencyControlSoftLimit + +Value of soft limit on number of CPU slots. + **See Also** - [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics. diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 09359040106..47a87a6a193 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1772,6 +1772,7 @@ try concurrent_threads_soft_limit = value; } ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit); + LOG_INFO(log, "ConcurrencyControl limit is set to {}", concurrent_threads_soft_limit); global_context->getProcessList().setMaxSize(new_server_settings[ServerSetting::max_concurrent_queries]); global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings[ServerSetting::max_concurrent_insert_queries]); diff --git a/src/Analyzer/Resolve/QueryAnalyzer.cpp b/src/Analyzer/Resolve/QueryAnalyzer.cpp index 23b6f6a83b9..381edee607d 100644 --- a/src/Analyzer/Resolve/QueryAnalyzer.cpp +++ b/src/Analyzer/Resolve/QueryAnalyzer.cpp @@ -103,6 +103,7 @@ namespace Setting extern const SettingsBool single_join_prefer_left_table; extern const SettingsBool transform_null_in; extern const SettingsUInt64 use_structure_from_insertion_table_in_table_functions; + extern const SettingsBool use_concurrency_control; } @@ -588,6 +589,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden PullingAsyncPipelineExecutor executor(io.pipeline); io.pipeline.setProgressCallback(context->getProgressCallback()); io.pipeline.setProcessListElement(context->getProcessListElement()); + io.pipeline.setConcurrencyControl(context->getSettingsRef()[Setting::use_concurrency_control]); Block block; diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 4541df2bd61..8f7cced73ef 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -1785,6 +1785,9 @@ try QueryPipeline pipeline(std::move(pipe)); PullingAsyncPipelineExecutor executor(pipeline); + /// Concurrency control in client is not required + pipeline.setConcurrencyControl(false); + if (need_render_progress) { pipeline.setProgressCallback([this](const Progress & progress){ onProgress(progress); }); diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 9c0afa815e2..e4915a77c83 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -269,6 +269,7 @@ void LocalConnection::sendQuery( { state->block = state->io.pipeline.getHeader(); state->executor = std::make_unique(state->io.pipeline); + state->io.pipeline.setConcurrencyControl(false); } else if (state->io.pipeline.completed()) { diff --git a/src/Common/ConcurrencyControl.cpp b/src/Common/ConcurrencyControl.cpp index 8bc38af7aa5..97cdf0e6951 100644 --- a/src/Common/ConcurrencyControl.cpp +++ b/src/Common/ConcurrencyControl.cpp @@ -1,7 +1,23 @@ +#include #include #include +#include +namespace ProfileEvents +{ + extern const Event ConcurrencyControlSlotsGranted; + extern const Event ConcurrencyControlSlotsDelayed; + extern const Event ConcurrencyControlSlotsAcquired; + extern const Event ConcurrencyControlQueriesDelayed; +} + +namespace CurrentMetrics +{ + extern const Metric ConcurrencyControlAcquired; + extern const Metric ConcurrencyControlSoftLimit; +} + namespace DB { @@ -17,6 +33,7 @@ ConcurrencyControl::Slot::~Slot() ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_) : allocation(std::move(allocation_)) + , acquired_slot_increment(CurrentMetrics::ConcurrencyControlAcquired) { } @@ -34,6 +51,7 @@ ConcurrencyControl::Allocation::~Allocation() { if (granted.compare_exchange_strong(value, value - 1)) { + ProfileEvents::increment(ProfileEvents::ConcurrencyControlSlotsAcquired, 1); std::unique_lock lock{mutex}; return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor } @@ -84,6 +102,7 @@ void ConcurrencyControl::Allocation::release() ConcurrencyControl::ConcurrencyControl() : cur_waiter(waiters.end()) + , max_concurrency_metric(CurrentMetrics::ConcurrencyControlSoftLimit, 0) { } @@ -103,18 +122,25 @@ ConcurrencyControl::~ConcurrencyControl() // Acquire as many slots as we can, but not lower than `min` SlotCount granted = std::max(min, std::min(max, available(lock))); cur_concurrency += granted; + ProfileEvents::increment(ProfileEvents::ConcurrencyControlSlotsGranted, min); // Create allocation and start waiting if more slots are required if (granted < max) + { + ProfileEvents::increment(ProfileEvents::ConcurrencyControlSlotsDelayed, max - granted); + ProfileEvents::increment(ProfileEvents::ConcurrencyControlQueriesDelayed); return SlotAllocationPtr(new Allocation(*this, max, granted, waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */))); - return SlotAllocationPtr(new Allocation(*this, max, granted)); + } + else + return SlotAllocationPtr(new Allocation(*this, max, granted)); } void ConcurrencyControl::setMaxConcurrency(SlotCount value) { std::unique_lock lock{mutex}; max_concurrency = std::max(1, value); // never allow max_concurrency to be zero + max_concurrency_metric.changeTo(max_concurrency == UnlimitedSlots ? 0 : max_concurrency); schedule(lock); } diff --git a/src/Common/ConcurrencyControl.h b/src/Common/ConcurrencyControl.h index 9d35d7cb8b0..161e6ff2024 100644 --- a/src/Common/ConcurrencyControl.h +++ b/src/Common/ConcurrencyControl.h @@ -8,6 +8,7 @@ #include #include +#include #include namespace DB @@ -53,6 +54,7 @@ public: explicit Slot(SlotAllocationPtr && allocation_); SlotAllocationPtr allocation; + CurrentMetrics::Increment acquired_slot_increment; }; // Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max) @@ -131,6 +133,7 @@ private: Waiters::iterator cur_waiter; // round-robin pointer SlotCount max_concurrency = UnlimitedSlots; SlotCount cur_concurrency = 0; + CurrentMetrics::Increment max_concurrency_metric; }; } diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index bd62e7e8aae..e9d5e07c914 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -321,6 +321,9 @@ M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \ M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \ \ + M(ConcurrencyControlAcquired, "Total number of acquired CPU slots") \ + M(ConcurrencyControlSoftLimit, "Value of soft limit on number of CPU slots") \ + \ M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \ \ M(SharedCatalogStateApplicationThreads, "Number of threads in the threadpool for state application in Shared Catalog.") \ diff --git a/src/Common/ISlotControl.h b/src/Common/ISlotControl.h index daeb956f5a8..c2be08f8406 100644 --- a/src/Common/ISlotControl.h +++ b/src/Common/ISlotControl.h @@ -73,4 +73,44 @@ public: [[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0; }; +/// Allocation that grants all the slots immediately on creation +class GrantedAllocation : public ISlotAllocation +{ +public: + explicit GrantedAllocation(SlotCount granted_) + : granted(granted_) + , allocated(granted_) + {} + + [[nodiscard]] AcquiredSlotPtr tryAcquire() override + { + SlotCount value = granted.load(); + while (value) + { + if (granted.compare_exchange_strong(value, value - 1)) + return std::make_shared(); + } + return {}; + } + + SlotCount grantedCount() const override + { + return granted.load(); + } + + SlotCount allocatedCount() const override + { + return allocated; + } + +private: + std::atomic granted; // allocated, but not yet acquired + const SlotCount allocated; +}; + +[[nodiscard]] inline SlotAllocationPtr grantSlots(SlotCount count) +{ + return SlotAllocationPtr(new GrantedAllocation(count)); +} + } diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 117397f5c0b..414e3bef592 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -879,6 +879,11 @@ The server successfully detected this situation and will download merged part fr M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP", ValueType::Number) \ M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.", ValueType::Bytes) \ \ + M(ConcurrencyControlSlotsGranted, "Number of CPU slot granted according to guarantee of 1 thread per query and for queries with setting 'use_concurrency_control' = 0", ValueType::Number) \ + M(ConcurrencyControlSlotsDelayed, "Number of CPU slot not granted initially and required to wait for a free CPU slot", ValueType::Number) \ + M(ConcurrencyControlSlotsAcquired, "Total number of CPU slot acquired", ValueType::Number) \ + M(ConcurrencyControlQueriesDelayed, "Total number of CPU slot allocations (queries) that were required to wait for slots to upscale", ValueType::Number) \ + \ M(SharedDatabaseCatalogFailedToApplyState, "Number of failures to apply new state in SharedDatabaseCatalog", ValueType::Number) \ M(SharedDatabaseCatalogStateApplicationMicroseconds, "Total time spend on application of new state in SharedDatabaseCatalog", ValueType::Microseconds) \ \ diff --git a/src/Dictionaries/CacheDictionary.cpp b/src/Dictionaries/CacheDictionary.cpp index 93526078dab..51fcbf5288c 100644 --- a/src/Dictionaries/CacheDictionary.cpp +++ b/src/Dictionaries/CacheDictionary.cpp @@ -648,6 +648,7 @@ void CacheDictionary::update(CacheDictionaryUpdateUnitPtr::getSourcePipe( ids.emplace_back(key); auto pipeline = source_ptr->loadIds(ids); - if (use_async_executor) pipe = Pipe(std::make_shared>(std::move(pipeline))); else diff --git a/src/Dictionaries/FlatDictionary.cpp b/src/Dictionaries/FlatDictionary.cpp index b0233766741..9b0db8dde95 100644 --- a/src/Dictionaries/FlatDictionary.cpp +++ b/src/Dictionaries/FlatDictionary.cpp @@ -454,6 +454,7 @@ void FlatDictionary::updateData() { QueryPipeline pipeline(source_ptr->loadUpdatedAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); update_field_loaded_block.reset(); Block block; @@ -495,6 +496,7 @@ void FlatDictionary::loadData() { QueryPipeline pipeline(source_ptr->loadAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); Block block; while (executor.pull(block)) diff --git a/src/Dictionaries/HashedArrayDictionary.cpp b/src/Dictionaries/HashedArrayDictionary.cpp index d12cffb50fa..9da825ebd69 100644 --- a/src/Dictionaries/HashedArrayDictionary.cpp +++ b/src/Dictionaries/HashedArrayDictionary.cpp @@ -480,6 +480,7 @@ void HashedArrayDictionary::updateData() { QueryPipeline pipeline(source_ptr->loadUpdatedAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); update_field_loaded_block.reset(); Block block; @@ -978,6 +979,7 @@ void HashedArrayDictionary::loadData() QueryPipeline pipeline(source_ptr->loadAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); UInt64 pull_time_microseconds = 0; UInt64 process_time_microseconds = 0; diff --git a/src/Dictionaries/HashedDictionary.h b/src/Dictionaries/HashedDictionary.h index 7e935fe4855..ef0b5b0ef2f 100644 --- a/src/Dictionaries/HashedDictionary.h +++ b/src/Dictionaries/HashedDictionary.h @@ -884,6 +884,7 @@ void HashedDictionary::updateData() { QueryPipeline pipeline(source_ptr->loadUpdatedAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); update_field_loaded_block.reset(); Block block; @@ -1163,6 +1164,7 @@ void HashedDictionary::loadData() QueryPipeline pipeline(source_ptr->loadAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); Block block; DictionaryKeysArenaHolder arena_holder; diff --git a/src/Dictionaries/IPAddressDictionary.cpp b/src/Dictionaries/IPAddressDictionary.cpp index e42ffa09e34..dd3f6583cda 100644 --- a/src/Dictionaries/IPAddressDictionary.cpp +++ b/src/Dictionaries/IPAddressDictionary.cpp @@ -412,6 +412,7 @@ void IPAddressDictionary::loadData() bool has_ipv6 = false; DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); Block block; while (executor.pull(block)) { diff --git a/src/Dictionaries/PolygonDictionary.cpp b/src/Dictionaries/PolygonDictionary.cpp index ff29ca1f6b8..0f99d0b7f7a 100644 --- a/src/Dictionaries/PolygonDictionary.cpp +++ b/src/Dictionaries/PolygonDictionary.cpp @@ -291,6 +291,7 @@ void IPolygonDictionary::loadData() QueryPipeline pipeline(source_ptr->loadAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); Block block; while (executor.pull(block)) blockToAttributes(block); diff --git a/src/Dictionaries/RangeHashedDictionary.h b/src/Dictionaries/RangeHashedDictionary.h index c264b480bcb..bc335b890fc 100644 --- a/src/Dictionaries/RangeHashedDictionary.h +++ b/src/Dictionaries/RangeHashedDictionary.h @@ -541,6 +541,7 @@ void RangeHashedDictionary::loadData() { QueryPipeline pipeline(source_ptr->loadAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); Block block; while (executor.pull(block)) @@ -692,6 +693,7 @@ void RangeHashedDictionary::updateData() { QueryPipeline pipeline(source_ptr->loadUpdatedAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); update_field_loaded_block.reset(); Block block; diff --git a/src/Dictionaries/RegExpTreeDictionary.cpp b/src/Dictionaries/RegExpTreeDictionary.cpp index 214158c40d3..ef164939459 100644 --- a/src/Dictionaries/RegExpTreeDictionary.cpp +++ b/src/Dictionaries/RegExpTreeDictionary.cpp @@ -321,6 +321,7 @@ void RegExpTreeDictionary::loadData() { QueryPipeline pipeline(source_ptr->loadAll()); DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor); + pipeline.setConcurrencyControl(false); Block block; while (executor.pull(block)) diff --git a/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp b/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp index 2597fe3592f..d4da038c089 100644 --- a/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp +++ b/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp @@ -37,6 +37,7 @@ namespace Setting extern const SettingsBool enable_scalar_subquery_optimization; extern const SettingsBool extremes; extern const SettingsUInt64 max_result_rows; + extern const SettingsBool use_concurrency_control; } namespace ErrorCodes @@ -199,6 +200,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr PullingAsyncPipelineExecutor executor(io.pipeline); io.pipeline.setProgressCallback(data.getContext()->getProgressCallback()); + io.pipeline.setConcurrencyControl(data.getContext()->getSettingsRef()[Setting::use_concurrency_control]); while (block.rows() == 0 && executor.pull(block)) { } diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 6bf5e1d5845..4e5cf7d2549 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -77,7 +77,6 @@ #include #include -#include #include #include #include diff --git a/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp b/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp index c6170f6e7e2..99693c3e743 100644 --- a/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp +++ b/src/Interpreters/InterpreterSelectQueryAnalyzer.cpp @@ -26,6 +26,8 @@ #include #include +#include + #include #include @@ -37,6 +39,11 @@ namespace ErrorCodes extern const int UNSUPPORTED_METHOD; } +namespace Setting +{ + extern const SettingsBool use_concurrency_control; +} + namespace { @@ -249,6 +256,8 @@ QueryPipelineBuilder InterpreterSelectQueryAnalyzer::buildQueryPipeline() auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context); auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(context); + query_plan.setConcurrencyControl(context->getSettingsRef()[Setting::use_concurrency_control]); + return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings)); } diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index a040ebd8a97..0f25d5ac21c 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -52,6 +52,7 @@ namespace Setting extern const SettingsBool allow_experimental_analyzer; extern const SettingsBool allow_nondeterministic_mutations; extern const SettingsUInt64 max_block_size; + extern const SettingsBool use_concurrency_control; } namespace MergeTreeSetting @@ -221,6 +222,7 @@ bool isStorageTouchedByMutations( } PullingAsyncPipelineExecutor executor(io.pipeline); + io.pipeline.setConcurrencyControl(context->getSettingsRef()[Setting::use_concurrency_control]); Block block; while (block.rows() == 0 && executor.pull(block)); @@ -1295,6 +1297,10 @@ void MutationsInterpreter::Source::read( void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan) { + // Mutations are not using concurrency control now. Queries, merges and mutations running together could lead to CPU overcommit. + // TODO(serxa): Enable concurrency control for mutation queries and mutations. This should be done after CPU scheduler introduction. + plan.setConcurrencyControl(false); + source.read(first_stage, plan, metadata_snapshot, context, settings.apply_deleted_mask, settings.can_execute); addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context); } diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index e8e1d5a941b..6e0498e5dcf 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -1,4 +1,5 @@ #include +#include "Common/ISlotControl.h" #include #include #include @@ -347,12 +348,22 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_ is_execution_initialized = true; tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing); - size_t use_threads = num_threads; - - /// Allocate CPU slots from concurrency control - size_t min_threads = concurrency_control ? 1uz : num_threads; - cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads); - use_threads = cpu_slots->grantedCount(); + if (concurrency_control) + { + /// Allocate CPU slots from concurrency control + constexpr size_t min_threads = 1uz; // Number of threads that should be granted to every query no matter how many threads are already running in other queries + cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads); +#ifndef NDEBUG + LOG_TEST(log, "Allocate CPU slots. min: {}, max: {}, granted: {}", min_threads, num_threads, cpu_slots->grantedCount()); +#endif + } + else + { + /// If concurrency control is not used we should not even count threads as competing. + /// To avoid counting them in ConcurrencyControl, we create dummy slot allocation. + cpu_slots = grantSlots(num_threads); + } + size_t use_threads = cpu_slots->grantedCount(); Queue queue; Queue async_queue; diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index fd523b184e4..98fd209c12a 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -199,6 +199,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline( last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback); last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element); last_pipeline->addResources(std::move(resources)); + last_pipeline->setConcurrencyControl(getConcurrencyControl()); return last_pipeline; } diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index 526f02466d1..be0e17db2a2 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -300,7 +300,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines( /// It may happen if max_distributed_connections > max_threads max_threads_limit = std::max(pipeline.max_threads, max_threads_limit); - concurrency_control = pipeline.getConcurrencyControl(); + // Use concurrency control if at least one of pipelines is using it + concurrency_control = concurrency_control || pipeline.getConcurrencyControl(); } QueryPipelineBuilder pipeline; @@ -311,8 +312,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines( { pipeline.setMaxThreads(max_threads); pipeline.limitMaxThreads(max_threads_limit); - pipeline.setConcurrencyControl(concurrency_control); } + pipeline.setConcurrencyControl(concurrency_control); pipeline.setCollectedProcessors(nullptr); return pipeline; diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index 48caa9236ef..376baee52f9 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -71,6 +71,7 @@ namespace Setting extern const SettingsUInt64 max_parser_depth; extern const SettingsUInt64 max_query_size; extern const SettingsBool throw_if_no_data_to_insert; + extern const SettingsBool use_concurrency_control; } namespace ErrorCodes @@ -1244,6 +1245,7 @@ namespace if (io.pipeline.pulling()) { auto executor = std::make_shared(io.pipeline); + io.pipeline.setConcurrencyControl(query_context->getSettingsRef()[Setting::use_concurrency_control]); auto check_for_cancel = [&] { if (isQueryCancelled()) diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index cd9de1e94ea..f18c9f1cb95 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -100,6 +100,7 @@ namespace Setting extern const SettingsUInt64 unknown_packet_in_send_data; extern const SettingsBool wait_for_async_insert; extern const SettingsSeconds wait_for_async_insert_timeout; + extern const SettingsBool use_concurrency_control; } namespace ServerSetting @@ -1115,6 +1116,7 @@ void TCPHandler::processOrdinaryQuery() { PullingAsyncPipelineExecutor executor(pipeline); + pipeline.setConcurrencyControl(query_context->getSettingsRef()[Setting::use_concurrency_control]); CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread}; /// The following may happen: diff --git a/src/Storages/LiveView/StorageLiveView.cpp b/src/Storages/LiveView/StorageLiveView.cpp index 1f0bdf1a2fc..795cd544e65 100644 --- a/src/Storages/LiveView/StorageLiveView.cpp +++ b/src/Storages/LiveView/StorageLiveView.cpp @@ -452,6 +452,7 @@ void StorageLiveView::writeBlock(StorageLiveView & live_view, Block && block, Ch auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); PullingAsyncPipelineExecutor executor(pipeline); + pipeline.setConcurrencyControl(local_context->getSettingsRef()[Setting::use_concurrency_control]); Block this_block; while (executor.pull(this_block)) @@ -588,6 +589,7 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); PullingAsyncPipelineExecutor executor(pipeline); + pipeline.setConcurrencyControl(local_context->getSettingsRef()[Setting::use_concurrency_control]); Block this_block; while (executor.pull(this_block)) @@ -694,6 +696,7 @@ bool StorageLiveView::getNewBlocks(const std::lock_guard & lock) auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); PullingAsyncPipelineExecutor executor(pipeline); + pipeline.setConcurrencyControl(false); Block block; while (executor.pull(block)) { diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 74d6d60ba1b..c171acb8089 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1766,6 +1766,10 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context); auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings); + // Merges are not using concurrency control now. Queries and merges running together could lead to CPU overcommit. + // TODO(serxa): Enable concurrency control for merges. This should be done after CPU scheduler introduction. + builder->setConcurrencyControl(false); + global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 2e7847fc99f..ee87051371c 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -2164,6 +2164,7 @@ bool MutateTask::prepare() context_for_reading->setSetting("apply_mutations_on_fly", false); /// Skip using large sets in KeyCondition context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000); + context_for_reading->setSetting("use_concurrency_control", false); for (const auto & command : *ctx->commands) if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading)) diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index e75d6185064..0739c03b963 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -657,6 +657,7 @@ std::pair StorageWindowView::getNewBlocks(UInt32 watermark) auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder)); PullingAsyncPipelineExecutor executor(pipeline); + pipeline.setConcurrencyControl(getContext()->getSettingsRef()[Setting::use_concurrency_control]); Block block; BlocksPtr new_blocks = std::make_shared(); diff --git a/src/TableFunctions/TableFunctionFormat.cpp b/src/TableFunctions/TableFunctionFormat.cpp index fde9e07e8d4..3e13d37b99c 100644 --- a/src/TableFunctions/TableFunctionFormat.cpp +++ b/src/TableFunctions/TableFunctionFormat.cpp @@ -28,6 +28,7 @@ namespace DB namespace Setting { extern const SettingsUInt64 max_block_size; + extern const SettingsBool use_concurrency_control; } namespace ErrorCodes @@ -116,6 +117,7 @@ Block TableFunctionFormat::parseData(const ColumnsDescription & columns, const S }); } + builder.setConcurrencyControl(context->getSettingsRef()[Setting::use_concurrency_control]); auto pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); auto reader = std::make_unique(*pipeline); diff --git a/tests/integration/test_concurrent_threads_soft_limit/configs/config_defined_50.xml b/tests/integration/test_concurrent_threads_soft_limit/configs/config_defined_50.xml index 33ed0030acf..56bc112f3ac 100644 --- a/tests/integration/test_concurrent_threads_soft_limit/configs/config_defined_50.xml +++ b/tests/integration/test_concurrent_threads_soft_limit/configs/config_defined_50.xml @@ -1,4 +1,5 @@ + 0 50 system diff --git a/tests/integration/test_concurrent_threads_soft_limit/test.py b/tests/integration/test_concurrent_threads_soft_limit/test.py index 02655be61f3..5fdd0fb80c9 100644 --- a/tests/integration/test_concurrent_threads_soft_limit/test.py +++ b/tests/integration/test_concurrent_threads_soft_limit/test.py @@ -29,6 +29,16 @@ node4 = cluster.add_instance( ) +def assert_profile_event(node, query_id, profile_event, check): + assert check( + int( + node.query( + f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1" + ) + ) + ) + + @pytest.fixture(scope="module") def started_cluster(): try: @@ -44,39 +54,176 @@ def test_concurrent_threads_soft_limit_default(started_cluster): query_id="test_concurrent_threads_soft_limit_1", ) node1.query("SYSTEM FLUSH LOGS") + assert_profile_event( + node1, + "test_concurrent_threads_soft_limit_1", + "ConcurrencyControlSlotsGranted", + lambda x: x == 1, + ) + assert_profile_event( + node1, + "test_concurrent_threads_soft_limit_1", + "ConcurrencyControlSlotsDelayed", + lambda x: x == 0, + ) + assert_profile_event( + node1, + "test_concurrent_threads_soft_limit_1", + "ConcurrencyControlSlotsAcquired", + lambda x: x == 100, + ) + assert_profile_event( + node1, + "test_concurrent_threads_soft_limit_1", + "ConcurrencyControlQueriesDelayed", + lambda x: x == 0, + ) assert ( node1.query( - "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'" + "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1' order by query_start_time_microseconds desc limit 1" ) == "102\n" ) -@pytest.mark.skip(reason="broken test") +def test_use_concurrency_control_default(started_cluster): + node1.query( + "SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0", + query_id="test_use_concurrency_control", + ) + + # Concurrency control is not used, all metrics should be zeros + node1.query("SYSTEM FLUSH LOGS") + assert_profile_event( + node1, + "test_use_concurrency_control", + "ConcurrencyControlSlotsGranted", + lambda x: x == 0, + ) + assert_profile_event( + node1, + "test_use_concurrency_control", + "ConcurrencyControlSlotsDelayed", + lambda x: x == 0, + ) + assert_profile_event( + node1, + "test_use_concurrency_control", + "ConcurrencyControlSlotsAcquired", + lambda x: x == 0, + ) + assert_profile_event( + node1, + "test_use_concurrency_control", + "ConcurrencyControlQueriesDelayed", + lambda x: x == 0, + ) + + def test_concurrent_threads_soft_limit_defined_50(started_cluster): node2.query( "SELECT count(*) FROM numbers_mt(10000000)", query_id="test_concurrent_threads_soft_limit_2", ) node2.query("SYSTEM FLUSH LOGS") + assert_profile_event( + node2, + "test_concurrent_threads_soft_limit_2", + "ConcurrencyControlSlotsGranted", + lambda x: x == 1, + ) + assert_profile_event( + node2, + "test_concurrent_threads_soft_limit_2", + "ConcurrencyControlSlotsDelayed", + lambda x: x == 50, + ) + assert_profile_event( + node2, + "test_concurrent_threads_soft_limit_2", + "ConcurrencyControlSlotsAcquired", + lambda x: x == 50, + ) + assert_profile_event( + node2, + "test_concurrent_threads_soft_limit_2", + "ConcurrencyControlQueriesDelayed", + lambda x: x == 1, + ) assert ( node2.query( - "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'" + "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2' order by query_start_time_microseconds desc limit 1" ) == "52\n" ) -@pytest.mark.skip(reason="broken test") +def test_use_concurrency_control_soft_limit_defined_50(started_cluster): + node2.query( + "SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0", + query_id="test_use_concurrency_control_2", + ) + # Concurrency control is not used, all metrics should be zeros + node2.query("SYSTEM FLUSH LOGS") + assert_profile_event( + node2, + "test_use_concurrency_control_2", + "ConcurrencyControlSlotsGranted", + lambda x: x == 0, + ) + assert_profile_event( + node2, + "test_use_concurrency_control_2", + "ConcurrencyControlSlotsDelayed", + lambda x: x == 0, + ) + assert_profile_event( + node2, + "test_use_concurrency_control_2", + "ConcurrencyControlSlotsAcquired", + lambda x: x == 0, + ) + assert_profile_event( + node2, + "test_use_concurrency_control_2", + "ConcurrencyControlQueriesDelayed", + lambda x: x == 0, + ) + + def test_concurrent_threads_soft_limit_defined_1(started_cluster): node3.query( "SELECT count(*) FROM numbers_mt(10000000)", query_id="test_concurrent_threads_soft_limit_3", ) node3.query("SYSTEM FLUSH LOGS") + assert_profile_event( + node3, + "test_concurrent_threads_soft_limit_3", + "ConcurrencyControlSlotsGranted", + lambda x: x == 1, + ) + assert_profile_event( + node3, + "test_concurrent_threads_soft_limit_3", + "ConcurrencyControlSlotsDelayed", + lambda x: x == 99, + ) + assert_profile_event( + node3, + "test_concurrent_threads_soft_limit_3", + "ConcurrencyControlSlotsAcquired", + lambda x: x == 1, + ) + assert_profile_event( + node3, + "test_concurrent_threads_soft_limit_3", + "ConcurrencyControlQueriesDelayed", + lambda x: x == 1, + ) assert ( node3.query( - "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'" + "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3' order by query_start_time_microseconds desc limit 1" ) == "3\n" ) @@ -85,7 +232,6 @@ def test_concurrent_threads_soft_limit_defined_1(started_cluster): # In config_limit_reached.xml there is concurrent_threads_soft_limit=10 # Background query starts in a separate thread to reach this limit. # When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5 -@pytest.mark.skip(reason="broken test") def test_concurrent_threads_soft_limit_limit_reached(started_cluster): def background_query(): try: @@ -118,8 +264,32 @@ def test_concurrent_threads_soft_limit_limit_reached(started_cluster): ) node4.query("SYSTEM FLUSH LOGS") + assert_profile_event( + node4, + "test_concurrent_threads_soft_limit_4", + "ConcurrencyControlSlotsGranted", + lambda x: x == 1, + ) + assert_profile_event( + node4, + "test_concurrent_threads_soft_limit_4", + "ConcurrencyControlSlotsDelayed", + lambda x: x > 0, + ) + assert_profile_event( + node4, + "test_concurrent_threads_soft_limit_4", + "ConcurrencyControlSlotsAcquired", + lambda x: x < 5, + ) + assert_profile_event( + node4, + "test_concurrent_threads_soft_limit_4", + "ConcurrencyControlQueriesDelayed", + lambda x: x == 1, + ) s_count = node4.query( - "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'" + "select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4' order by query_start_time_microseconds desc limit 1" ).strip() if s_count: count = int(s_count) diff --git a/tests/queries/0_stateless/01091_num_threads.sql b/tests/queries/0_stateless/01091_num_threads.sql index 9fc82b470c9..baec13a722d 100644 --- a/tests/queries/0_stateless/01091_num_threads.sql +++ b/tests/queries/0_stateless/01091_num_threads.sql @@ -3,6 +3,7 @@ set log_queries=1; set log_query_threads=1; set max_threads=0; +set use_concurrency_control=0; WITH 01091 AS id SELECT 1; SYSTEM FLUSH LOGS; diff --git a/tests/queries/0_stateless/02015_global_in_threads.sh b/tests/queries/0_stateless/02015_global_in_threads.sh index 3845c55511c..174be1af1ca 100755 --- a/tests/queries/0_stateless/02015_global_in_threads.sh +++ b/tests/queries/0_stateless/02015_global_in_threads.sh @@ -6,6 +6,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CURDIR"/../shell_config.sh -${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))" +${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --use_concurrency_control=0 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))" ${CLICKHOUSE_CLIENT} -q "system flush logs" ${CLICKHOUSE_CLIENT} -q "select length(thread_ids) >= 32 from system.query_log where event_date >= yesterday() and query_id = '2015_${CLICKHOUSE_DATABASE}_query' and type = 'QueryFinish' and current_database = currentDatabase()" diff --git a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 index 26b858d8c00..0b9cf496940 100644 --- a/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 +++ b/tests/queries/0_stateless/02845_threads_count_in_distributed_queries.sql.j2 @@ -5,6 +5,9 @@ -- enforce some defaults to be sure that the env settings will not affect the test SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read', trace_profile_events=0; +-- we do not want concurrency control to limit the number of threads +SET use_concurrency_control=0; + -- we use query_thread_log to check peak thread usage -- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it -- but that will not allow to backport the test to older versions diff --git a/utils/check-style/aspell-ignore/en/aspell-dict.txt b/utils/check-style/aspell-ignore/en/aspell-dict.txt index a5e06af213e..cccce791858 100644 --- a/utils/check-style/aspell-ignore/en/aspell-dict.txt +++ b/utils/check-style/aspell-ignore/en/aspell-dict.txt @@ -182,6 +182,9 @@ ComplexKeyCache ComplexKeyDirect ComplexKeyHashed Composable +composable +ConcurrencyControlAcquired +ConcurrencyControlSoftLimit Config ConnectionDetails Const