Merge pull request #61473 from ClickHouse/fix-use-concurrency-control

Fixes for concurrency control
This commit is contained in:
Sergei Trifonov 2024-10-19 22:27:12 +00:00 committed by GitHub
commit fc31ec4404
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 345 additions and 19 deletions

View File

@ -737,6 +737,14 @@ Number of sessions (connections) to ZooKeeper. Should be no more than one, becau
Number of watches (event subscriptions) in ZooKeeper.
### ConcurrencyControlAcquired
Total number of acquired CPU slots.
### ConcurrencyControlSoftLimit
Value of soft limit on number of CPU slots.
**See Also**
- [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics.

View File

@ -1772,6 +1772,7 @@ try
concurrent_threads_soft_limit = value;
}
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
LOG_INFO(log, "ConcurrencyControl limit is set to {}", concurrent_threads_soft_limit);
global_context->getProcessList().setMaxSize(new_server_settings[ServerSetting::max_concurrent_queries]);
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings[ServerSetting::max_concurrent_insert_queries]);

View File

@ -103,6 +103,7 @@ namespace Setting
extern const SettingsBool single_join_prefer_left_table;
extern const SettingsBool transform_null_in;
extern const SettingsUInt64 use_structure_from_insertion_table_in_table_functions;
extern const SettingsBool use_concurrency_control;
}
@ -588,6 +589,7 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(context->getProgressCallback());
io.pipeline.setProcessListElement(context->getProcessListElement());
io.pipeline.setConcurrencyControl(context->getSettingsRef()[Setting::use_concurrency_control]);
Block block;

View File

@ -1785,6 +1785,9 @@ try
QueryPipeline pipeline(std::move(pipe));
PullingAsyncPipelineExecutor executor(pipeline);
/// Concurrency control in client is not required
pipeline.setConcurrencyControl(false);
if (need_render_progress)
{
pipeline.setProgressCallback([this](const Progress & progress){ onProgress(progress); });

View File

@ -269,6 +269,7 @@ void LocalConnection::sendQuery(
{
state->block = state->io.pipeline.getHeader();
state->executor = std::make_unique<PullingAsyncPipelineExecutor>(state->io.pipeline);
state->io.pipeline.setConcurrencyControl(false);
}
else if (state->io.pipeline.completed())
{

View File

@ -1,7 +1,23 @@
#include <Common/ISlotControl.h>
#include <Common/ConcurrencyControl.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event ConcurrencyControlSlotsGranted;
extern const Event ConcurrencyControlSlotsDelayed;
extern const Event ConcurrencyControlSlotsAcquired;
extern const Event ConcurrencyControlQueriesDelayed;
}
namespace CurrentMetrics
{
extern const Metric ConcurrencyControlAcquired;
extern const Metric ConcurrencyControlSoftLimit;
}
namespace DB
{
@ -17,6 +33,7 @@ ConcurrencyControl::Slot::~Slot()
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
: allocation(std::move(allocation_))
, acquired_slot_increment(CurrentMetrics::ConcurrencyControlAcquired)
{
}
@ -34,6 +51,7 @@ ConcurrencyControl::Allocation::~Allocation()
{
if (granted.compare_exchange_strong(value, value - 1))
{
ProfileEvents::increment(ProfileEvents::ConcurrencyControlSlotsAcquired, 1);
std::unique_lock lock{mutex};
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
}
@ -84,6 +102,7 @@ void ConcurrencyControl::Allocation::release()
ConcurrencyControl::ConcurrencyControl()
: cur_waiter(waiters.end())
, max_concurrency_metric(CurrentMetrics::ConcurrencyControlSoftLimit, 0)
{
}
@ -103,18 +122,25 @@ ConcurrencyControl::~ConcurrencyControl()
// Acquire as many slots as we can, but not lower than `min`
SlotCount granted = std::max(min, std::min(max, available(lock)));
cur_concurrency += granted;
ProfileEvents::increment(ProfileEvents::ConcurrencyControlSlotsGranted, min);
// Create allocation and start waiting if more slots are required
if (granted < max)
{
ProfileEvents::increment(ProfileEvents::ConcurrencyControlSlotsDelayed, max - granted);
ProfileEvents::increment(ProfileEvents::ConcurrencyControlQueriesDelayed);
return SlotAllocationPtr(new Allocation(*this, max, granted,
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
return SlotAllocationPtr(new Allocation(*this, max, granted));
}
else
return SlotAllocationPtr(new Allocation(*this, max, granted));
}
void ConcurrencyControl::setMaxConcurrency(SlotCount value)
{
std::unique_lock lock{mutex};
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
max_concurrency_metric.changeTo(max_concurrency == UnlimitedSlots ? 0 : max_concurrency);
schedule(lock);
}

View File

@ -8,6 +8,7 @@
#include <base/types.h>
#include <boost/core/noncopyable.hpp>
#include <Common/CurrentMetrics.h>
#include <Common/ISlotControl.h>
namespace DB
@ -53,6 +54,7 @@ public:
explicit Slot(SlotAllocationPtr && allocation_);
SlotAllocationPtr allocation;
CurrentMetrics::Increment acquired_slot_increment;
};
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
@ -131,6 +133,7 @@ private:
Waiters::iterator cur_waiter; // round-robin pointer
SlotCount max_concurrency = UnlimitedSlots;
SlotCount cur_concurrency = 0;
CurrentMetrics::Increment max_concurrency_metric;
};
}

View File

@ -321,6 +321,9 @@
M(FilteringMarksWithPrimaryKey, "Number of threads currently doing filtering of mark ranges by the primary key") \
M(FilteringMarksWithSecondaryKeys, "Number of threads currently doing filtering of mark ranges by secondary keys") \
\
M(ConcurrencyControlAcquired, "Total number of acquired CPU slots") \
M(ConcurrencyControlSoftLimit, "Value of soft limit on number of CPU slots") \
\
M(DiskS3NoSuchKeyErrors, "The number of `NoSuchKey` errors that occur when reading data from S3 cloud storage through ClickHouse disks.") \
\
M(SharedCatalogStateApplicationThreads, "Number of threads in the threadpool for state application in Shared Catalog.") \

View File

@ -73,4 +73,44 @@ public:
[[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0;
};
/// Allocation that grants all the slots immediately on creation
class GrantedAllocation : public ISlotAllocation
{
public:
explicit GrantedAllocation(SlotCount granted_)
: granted(granted_)
, allocated(granted_)
{}
[[nodiscard]] AcquiredSlotPtr tryAcquire() override
{
SlotCount value = granted.load();
while (value)
{
if (granted.compare_exchange_strong(value, value - 1))
return std::make_shared<IAcquiredSlot>();
}
return {};
}
SlotCount grantedCount() const override
{
return granted.load();
}
SlotCount allocatedCount() const override
{
return allocated;
}
private:
std::atomic<SlotCount> granted; // allocated, but not yet acquired
const SlotCount allocated;
};
[[nodiscard]] inline SlotAllocationPtr grantSlots(SlotCount count)
{
return SlotAllocationPtr(new GrantedAllocation(count));
}
}

View File

@ -879,6 +879,11 @@ The server successfully detected this situation and will download merged part fr
M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP", ValueType::Number) \
M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.", ValueType::Bytes) \
\
M(ConcurrencyControlSlotsGranted, "Number of CPU slot granted according to guarantee of 1 thread per query and for queries with setting 'use_concurrency_control' = 0", ValueType::Number) \
M(ConcurrencyControlSlotsDelayed, "Number of CPU slot not granted initially and required to wait for a free CPU slot", ValueType::Number) \
M(ConcurrencyControlSlotsAcquired, "Total number of CPU slot acquired", ValueType::Number) \
M(ConcurrencyControlQueriesDelayed, "Total number of CPU slot allocations (queries) that were required to wait for slots to upscale", ValueType::Number) \
\
M(SharedDatabaseCatalogFailedToApplyState, "Number of failures to apply new state in SharedDatabaseCatalog", ValueType::Number) \
M(SharedDatabaseCatalogStateApplicationMicroseconds, "Total time spend on application of new state in SharedDatabaseCatalog", ValueType::Microseconds) \
\

View File

@ -648,6 +648,7 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
Columns fetched_columns_during_update = fetch_request.makeAttributesResultColumnsNonMutable();
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
{

View File

@ -334,6 +334,7 @@ public:
, pipeline(std::move(pipeline_))
, executor(pipeline)
{
pipeline.setConcurrencyControl(false);
}
std::string getName() const override
@ -378,7 +379,6 @@ Pipe DirectDictionary<dictionary_key_type>::getSourcePipe(
ids.emplace_back(key);
auto pipeline = source_ptr->loadIds(ids);
if (use_async_executor)
pipe = Pipe(std::make_shared<SourceFromQueryPipeline<PullingAsyncPipelineExecutor>>(std::move(pipeline)));
else

View File

@ -454,6 +454,7 @@ void FlatDictionary::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;
@ -495,6 +496,7 @@ void FlatDictionary::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))

View File

@ -480,6 +480,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;
@ -978,6 +979,7 @@ void HashedArrayDictionary<dictionary_key_type, sharded>::loadData()
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
UInt64 pull_time_microseconds = 0;
UInt64 process_time_microseconds = 0;

View File

@ -884,6 +884,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;
@ -1163,6 +1164,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::loadData()
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;

View File

@ -412,6 +412,7 @@ void IPAddressDictionary::loadData()
bool has_ipv6 = false;
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
{

View File

@ -291,6 +291,7 @@ void IPolygonDictionary::loadData()
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
blockToAttributes(block);

View File

@ -541,6 +541,7 @@ void RangeHashedDictionary<dictionary_key_type>::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
@ -692,6 +693,7 @@ void RangeHashedDictionary<dictionary_key_type>::updateData()
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
update_field_loaded_block.reset();
Block block;

View File

@ -321,6 +321,7 @@ void RegExpTreeDictionary::loadData()
{
QueryPipeline pipeline(source_ptr->loadAll());
DictionaryPipelineExecutor executor(pipeline, configuration.use_async_executor);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))

View File

@ -37,6 +37,7 @@ namespace Setting
extern const SettingsBool enable_scalar_subquery_optimization;
extern const SettingsBool extremes;
extern const SettingsUInt64 max_result_rows;
extern const SettingsBool use_concurrency_control;
}
namespace ErrorCodes
@ -199,6 +200,7 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setProgressCallback(data.getContext()->getProgressCallback());
io.pipeline.setConcurrencyControl(data.getContext()->getSettingsRef()[Setting::use_concurrency_control]);
while (block.rows() == 0 && executor.pull(block))
{
}

View File

@ -77,7 +77,6 @@
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Parsers/formatAST.h>
#include <Parsers/QueryParameterVisitor.h>

View File

@ -26,6 +26,8 @@
#include <Analyzer/TableFunctionNode.h>
#include <Analyzer/Utils.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/QueryLog.h>
@ -37,6 +39,11 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
namespace Setting
{
extern const SettingsBool use_concurrency_control;
}
namespace
{
@ -249,6 +256,8 @@ QueryPipelineBuilder InterpreterSelectQueryAnalyzer::buildQueryPipeline()
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context);
auto build_pipeline_settings = BuildQueryPipelineSettings::fromContext(context);
query_plan.setConcurrencyControl(context->getSettingsRef()[Setting::use_concurrency_control]);
return std::move(*query_plan.buildQueryPipeline(optimization_settings, build_pipeline_settings));
}

View File

@ -52,6 +52,7 @@ namespace Setting
extern const SettingsBool allow_experimental_analyzer;
extern const SettingsBool allow_nondeterministic_mutations;
extern const SettingsUInt64 max_block_size;
extern const SettingsBool use_concurrency_control;
}
namespace MergeTreeSetting
@ -221,6 +222,7 @@ bool isStorageTouchedByMutations(
}
PullingAsyncPipelineExecutor executor(io.pipeline);
io.pipeline.setConcurrencyControl(context->getSettingsRef()[Setting::use_concurrency_control]);
Block block;
while (block.rows() == 0 && executor.pull(block));
@ -1295,6 +1297,10 @@ void MutationsInterpreter::Source::read(
void MutationsInterpreter::initQueryPlan(Stage & first_stage, QueryPlan & plan)
{
// Mutations are not using concurrency control now. Queries, merges and mutations running together could lead to CPU overcommit.
// TODO(serxa): Enable concurrency control for mutation queries and mutations. This should be done after CPU scheduler introduction.
plan.setConcurrencyControl(false);
source.read(first_stage, plan, metadata_snapshot, context, settings.apply_deleted_mask, settings.can_execute);
addCreatingSetsStep(plan, first_stage.analyzer->getPreparedSets(), context);
}

View File

@ -1,4 +1,5 @@
#include <IO/WriteBufferFromString.h>
#include "Common/ISlotControl.h"
#include <Common/ThreadPool.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
@ -347,12 +348,22 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
is_execution_initialized = true;
tryUpdateExecutionStatus(ExecutionStatus::NotStarted, ExecutionStatus::Executing);
size_t use_threads = num_threads;
/// Allocate CPU slots from concurrency control
size_t min_threads = concurrency_control ? 1uz : num_threads;
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
use_threads = cpu_slots->grantedCount();
if (concurrency_control)
{
/// Allocate CPU slots from concurrency control
constexpr size_t min_threads = 1uz; // Number of threads that should be granted to every query no matter how many threads are already running in other queries
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
#ifndef NDEBUG
LOG_TEST(log, "Allocate CPU slots. min: {}, max: {}, granted: {}", min_threads, num_threads, cpu_slots->grantedCount());
#endif
}
else
{
/// If concurrency control is not used we should not even count threads as competing.
/// To avoid counting them in ConcurrencyControl, we create dummy slot allocation.
cpu_slots = grantSlots(num_threads);
}
size_t use_threads = cpu_slots->grantedCount();
Queue queue;
Queue async_queue;

View File

@ -199,6 +199,7 @@ QueryPipelineBuilderPtr QueryPlan::buildQueryPipeline(
last_pipeline->setProgressCallback(build_pipeline_settings.progress_callback);
last_pipeline->setProcessListElement(build_pipeline_settings.process_list_element);
last_pipeline->addResources(std::move(resources));
last_pipeline->setConcurrencyControl(getConcurrencyControl());
return last_pipeline;
}

View File

@ -300,7 +300,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
/// It may happen if max_distributed_connections > max_threads
max_threads_limit = std::max(pipeline.max_threads, max_threads_limit);
concurrency_control = pipeline.getConcurrencyControl();
// Use concurrency control if at least one of pipelines is using it
concurrency_control = concurrency_control || pipeline.getConcurrencyControl();
}
QueryPipelineBuilder pipeline;
@ -311,8 +312,8 @@ QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
{
pipeline.setMaxThreads(max_threads);
pipeline.limitMaxThreads(max_threads_limit);
pipeline.setConcurrencyControl(concurrency_control);
}
pipeline.setConcurrencyControl(concurrency_control);
pipeline.setCollectedProcessors(nullptr);
return pipeline;

View File

@ -71,6 +71,7 @@ namespace Setting
extern const SettingsUInt64 max_parser_depth;
extern const SettingsUInt64 max_query_size;
extern const SettingsBool throw_if_no_data_to_insert;
extern const SettingsBool use_concurrency_control;
}
namespace ErrorCodes
@ -1244,6 +1245,7 @@ namespace
if (io.pipeline.pulling())
{
auto executor = std::make_shared<PullingAsyncPipelineExecutor>(io.pipeline);
io.pipeline.setConcurrencyControl(query_context->getSettingsRef()[Setting::use_concurrency_control]);
auto check_for_cancel = [&]
{
if (isQueryCancelled())

View File

@ -100,6 +100,7 @@ namespace Setting
extern const SettingsUInt64 unknown_packet_in_send_data;
extern const SettingsBool wait_for_async_insert;
extern const SettingsSeconds wait_for_async_insert_timeout;
extern const SettingsBool use_concurrency_control;
}
namespace ServerSetting
@ -1115,6 +1116,7 @@ void TCPHandler::processOrdinaryQuery()
{
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(query_context->getSettingsRef()[Setting::use_concurrency_control]);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
/// The following may happen:

View File

@ -452,6 +452,7 @@ void StorageLiveView::writeBlock(StorageLiveView & live_view, Block && block, Ch
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(local_context->getSettingsRef()[Setting::use_concurrency_control]);
Block this_block;
while (executor.pull(this_block))
@ -588,6 +589,7 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(local_context->getSettingsRef()[Setting::use_concurrency_control]);
Block this_block;
while (executor.pull(this_block))
@ -694,6 +696,7 @@ bool StorageLiveView::getNewBlocks(const std::lock_guard<std::mutex> & lock)
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(false);
Block block;
while (executor.pull(block))
{

View File

@ -1766,6 +1766,10 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
// Merges are not using concurrency control now. Queries and merges running together could lead to CPU overcommit.
// TODO(serxa): Enable concurrency control for merges. This should be done after CPU scheduler introduction.
builder->setConcurrencyControl(false);
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
}

View File

@ -2164,6 +2164,7 @@ bool MutateTask::prepare()
context_for_reading->setSetting("apply_mutations_on_fly", false);
/// Skip using large sets in KeyCondition
context_for_reading->setSetting("use_index_for_in_with_subqueries_max_values", 100000);
context_for_reading->setSetting("use_concurrency_control", false);
for (const auto & command : *ctx->commands)
if (!canSkipMutationCommandForPart(ctx->source_part, command, context_for_reading))

View File

@ -657,6 +657,7 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingAsyncPipelineExecutor executor(pipeline);
pipeline.setConcurrencyControl(getContext()->getSettingsRef()[Setting::use_concurrency_control]);
Block block;
BlocksPtr new_blocks = std::make_shared<Blocks>();

View File

@ -28,6 +28,7 @@ namespace DB
namespace Setting
{
extern const SettingsUInt64 max_block_size;
extern const SettingsBool use_concurrency_control;
}
namespace ErrorCodes
@ -116,6 +117,7 @@ Block TableFunctionFormat::parseData(const ColumnsDescription & columns, const S
});
}
builder.setConcurrencyControl(context->getSettingsRef()[Setting::use_concurrency_control]);
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto reader = std::make_unique<PullingPipelineExecutor>(*pipeline);

View File

@ -1,4 +1,5 @@
<clickhouse>
<concurrent_threads_soft_limit_ratio_to_cores>0</concurrent_threads_soft_limit_ratio_to_cores>
<concurrent_threads_soft_limit_num>50</concurrent_threads_soft_limit_num>
<query_log>
<database>system</database>

View File

@ -29,6 +29,16 @@ node4 = cluster.add_instance(
)
def assert_profile_event(node, query_id, profile_event, check):
assert check(
int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
)
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -44,39 +54,176 @@ def test_concurrent_threads_soft_limit_default(started_cluster):
query_id="test_concurrent_threads_soft_limit_1",
)
node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlSlotsGranted",
lambda x: x == 1,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 100,
)
assert_profile_event(
node1,
"test_concurrent_threads_soft_limit_1",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 0,
)
assert (
node1.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_1' order by query_start_time_microseconds desc limit 1"
)
== "102\n"
)
@pytest.mark.skip(reason="broken test")
def test_use_concurrency_control_default(started_cluster):
node1.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control",
)
# Concurrency control is not used, all metrics should be zeros
node1.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlSlotsGranted",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 0,
)
assert_profile_event(
node1,
"test_use_concurrency_control",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_2",
)
node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlSlotsGranted",
lambda x: x == 1,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 50,
)
assert_profile_event(
node2,
"test_concurrent_threads_soft_limit_2",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 1,
)
assert (
node2.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_2' order by query_start_time_microseconds desc limit 1"
)
== "52\n"
)
@pytest.mark.skip(reason="broken test")
def test_use_concurrency_control_soft_limit_defined_50(started_cluster):
node2.query(
"SELECT count(*) FROM numbers_mt(10000000) SETTINGS use_concurrency_control = 0",
query_id="test_use_concurrency_control_2",
)
# Concurrency control is not used, all metrics should be zeros
node2.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlSlotsGranted",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 0,
)
assert_profile_event(
node2,
"test_use_concurrency_control_2",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 0,
)
def test_concurrent_threads_soft_limit_defined_1(started_cluster):
node3.query(
"SELECT count(*) FROM numbers_mt(10000000)",
query_id="test_concurrent_threads_soft_limit_3",
)
node3.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlSlotsGranted",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlSlotsDelayed",
lambda x: x == 99,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlSlotsAcquired",
lambda x: x == 1,
)
assert_profile_event(
node3,
"test_concurrent_threads_soft_limit_3",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 1,
)
assert (
node3.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_3' order by query_start_time_microseconds desc limit 1"
)
== "3\n"
)
@ -85,7 +232,6 @@ def test_concurrent_threads_soft_limit_defined_1(started_cluster):
# In config_limit_reached.xml there is concurrent_threads_soft_limit=10
# Background query starts in a separate thread to reach this limit.
# When this limit is reached the foreground query gets less than 5 queries despite the fact that it has settings max_threads=5
@pytest.mark.skip(reason="broken test")
def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
def background_query():
try:
@ -118,8 +264,32 @@ def test_concurrent_threads_soft_limit_limit_reached(started_cluster):
)
node4.query("SYSTEM FLUSH LOGS")
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlSlotsGranted",
lambda x: x == 1,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlSlotsDelayed",
lambda x: x > 0,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlSlotsAcquired",
lambda x: x < 5,
)
assert_profile_event(
node4,
"test_concurrent_threads_soft_limit_4",
"ConcurrencyControlQueriesDelayed",
lambda x: x == 1,
)
s_count = node4.query(
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4'"
"select length(thread_ids) from system.query_log where current_database = currentDatabase() and type = 'QueryFinish' and query_id = 'test_concurrent_threads_soft_limit_4' order by query_start_time_microseconds desc limit 1"
).strip()
if s_count:
count = int(s_count)

View File

@ -3,6 +3,7 @@
set log_queries=1;
set log_query_threads=1;
set max_threads=0;
set use_concurrency_control=0;
WITH 01091 AS id SELECT 1;
SYSTEM FLUSH LOGS;

View File

@ -6,6 +6,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
${CLICKHOUSE_CLIENT} --log_queries=1 --max_threads=32 --use_concurrency_control=0 --query_id "2015_${CLICKHOUSE_DATABASE}_query" -q "select count() from remote('127.0.0.{2,3}', numbers(10)) where number global in (select number % 5 from numbers_mt(1000000))"
${CLICKHOUSE_CLIENT} -q "system flush logs"
${CLICKHOUSE_CLIENT} -q "select length(thread_ids) >= 32 from system.query_log where event_date >= yesterday() and query_id = '2015_${CLICKHOUSE_DATABASE}_query' and type = 'QueryFinish' and current_database = currentDatabase()"

View File

@ -5,6 +5,9 @@
-- enforce some defaults to be sure that the env settings will not affect the test
SET max_threads=5, async_socket_for_remote=1, prefer_localhost_replica=1, optimize_read_in_order=1, load_marks_asynchronously=0, local_filesystem_read_method='pread', remote_filesystem_read_method='read', trace_profile_events=0;
-- we do not want concurrency control to limit the number of threads
SET use_concurrency_control=0;
-- we use query_thread_log to check peak thread usage
-- after https://github.com/ClickHouse/ClickHouse/issues/53417 there is a simpler way to check it
-- but that will not allow to backport the test to older versions

View File

@ -182,6 +182,9 @@ ComplexKeyCache
ComplexKeyDirect
ComplexKeyHashed
Composable
composable
ConcurrencyControlAcquired
ConcurrencyControlSoftLimit
Config
ConnectionDetails
Const