add max_concurrent_select_queries and max_concurrent_insert_queries

This commit is contained in:
cmsxbc 2021-12-12 16:07:06 +08:00 committed by mergify-bot
parent bd3a5bd4e2
commit b30e250eed
8 changed files with 209 additions and 4 deletions

View File

@ -640,7 +640,8 @@ On hosts with low RAM and swap, you possibly need setting `max_server_memory_usa
## max_concurrent_queries {#max-concurrent-queries}
The maximum number of simultaneously processed queries related to MergeTree table. Queries may be limited by other settings: [max_concurrent_queries_for_user](#max-concurrent-queries-for-user), [max_concurrent_queries_for_all_users](#max-concurrent-queries-for-all-users), [min_marks_to_honor_max_concurrent_queries](#min-marks-to-honor-max-concurrent-queries).
The maximum number of simultaneously processed queries related to MergeTree table.
Queries may be limited by other settings: [max_concurrent_insert_queries](#max-concurrent-insert-queries), [max_concurrent_select_queries](#max-concurrent-select-queries), [max_concurrent_queries_for_user](#max-concurrent-queries-for-user), [max_concurrent_queries_for_all_users](#max-concurrent-queries-for-all-users), [min_marks_to_honor_max_concurrent_queries](#min-marks-to-honor-max-concurrent-queries).
!!! info "Note"
These settings can be modified at runtime and will take effect immediately. Queries that are already running will remain unchanged.
@ -656,6 +657,42 @@ Possible values:
<max_concurrent_queries>100</max_concurrent_queries>
```
## max_concurrent_insert_queries {#max-concurrent-insert-queries}
The maximum number of simultaneously processed insert queries.
!!! info "Note"
These settings can be modified at runtime and will take effect immediately. Queries that are already running will remain unchanged.
Possible values:
- Positive integer.
- 0 — Disabled.
**Example**
``` xml
<max_concurrent_insert_queries>100</max_concurrent_insert_queries>
```
## max_concurrent_select_queries {#max-concurrent-select-queries}
The maximum number of simultaneously processed select queries.
!!! info "Note"
These settings can be modified at runtime and will take effect immediately. Queries that are already running will remain unchanged.
Possible values:
- Positive integer.
- 0 — Disabled.
**Example**
``` xml
<max_concurrent_select_queries>100</max_concurrent_select_queries>
```
## max_concurrent_queries_for_user {#max-concurrent-queries-for-user}
The maximum number of simultaneously processed queries related to MergeTree table per user.

View File

@ -859,6 +859,12 @@ if (ThreadFuzzer::instance().isEffective())
if (config->has("max_concurrent_queries"))
global_context->getProcessList().setMaxSize(config->getInt("max_concurrent_queries", 0));
if (config->has("max_concurrent_insert_queries"))
global_context->getProcessList().setMaxInsertQueriesAmount(config->getInt("max_concurrent_insert_queries", 0));
if (config->has("max_concurrent_select_queries"))
global_context->getProcessList().setMaxSelectQueriesAmount(config->getInt("max_concurrent_select_queries", 0));
if (config->has("keeper_server"))
global_context->updateKeeperConfiguration(*config);

View File

@ -86,6 +86,20 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
}
String query_kind{ast->getQueryKindString()};
if (!is_unlimited_query)
{
auto amount = getQueryKindAmount(query_kind);
if (max_insert_queries_amount && query_kind == "Insert" && amount >= max_insert_queries_amount)
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
"Too many simultaneous insert queries. Maximum: {}, current: {}",
max_insert_queries_amount, amount);
if (max_select_queries_amount && query_kind == "Select" && amount >= max_select_queries_amount)
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
"Too many simultaneous select queries. Maximum: {}, current: {}",
max_select_queries_amount, amount);
}
{
/**
* `max_size` check above is controlled by `max_concurrent_queries` server setting and is a "hard" limit for how many
@ -176,7 +190,9 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
}
auto process_it = processes.emplace(processes.end(),
query_context, query_, client_info, priorities.insert(settings.priority));
query_context, query_, client_info, priorities.insert(settings.priority), query_kind);
increaseQueryKindAmount(query_kind);
res = std::make_shared<Entry>(*this, process_it);
@ -242,6 +258,7 @@ ProcessListEntry::~ProcessListEntry()
String user = it->getClientInfo().current_user;
String query_id = it->getClientInfo().current_query_id;
String query_kind = it->query_kind;
const QueryStatus * process_list_element_ptr = &*it;
@ -273,6 +290,9 @@ ProcessListEntry::~ProcessListEntry()
LOG_ERROR(&Poco::Logger::get("ProcessList"), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser");
std::terminate();
}
parent.decreaseQueryKindAmount(query_kind);
parent.have_space.notify_all();
/// If there are no more queries for the user, then we will reset memory tracker and network throttler.
@ -286,11 +306,12 @@ ProcessListEntry::~ProcessListEntry()
QueryStatus::QueryStatus(
ContextPtr context_, const String & query_, const ClientInfo & client_info_, QueryPriorities::Handle && priority_handle_)
ContextPtr context_, const String & query_, const ClientInfo & client_info_, QueryPriorities::Handle && priority_handle_, const String & query_kind_)
: WithContext(context_)
, query(query_)
, client_info(client_info_)
, priority_handle(std::move(priority_handle_))
, query_kind(query_kind_)
{
auto settings = getContext()->getSettings();
limits.max_execution_time = settings.max_execution_time;
@ -485,4 +506,33 @@ ProcessList::UserInfo ProcessList::getUserInfo(bool get_profile_events) const
return per_user_infos;
}
void ProcessList::increaseQueryKindAmount(const String & query_kind)
{
auto found = query_kind_amounts.find(query_kind);
if (found == query_kind_amounts.end())
query_kind_amounts[query_kind] = 1;
else
found->second += 1;
}
void ProcessList::decreaseQueryKindAmount(const String & query_kind)
{
auto found = query_kind_amounts.find(query_kind);
/// TODO: we could just rebuild the map, as we have saved all query_kind.
if (found == query_kind_amounts.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong query kind amount: decrease before increase on '{}'", query_kind);
else if (found->second == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong query kind amount: decrease to negative on '{}'", query_kind, found->second);
else
found->second -= 1;
}
ProcessList::QueryAmount ProcessList::getQueryKindAmount(const String & query_kind)
{
auto found = query_kind_amounts.find(query_kind);
if (found == query_kind_amounts.end())
return 0;
return found->second;
}
}

View File

@ -118,13 +118,17 @@ protected:
ProcessListForUser * user_process_list = nullptr;
String query_kind;
public:
QueryStatus(
ContextPtr context_,
const String & query_,
const ClientInfo & client_info_,
QueryPriorities::Handle && priority_handle_);
QueryPriorities::Handle && priority_handle_,
const String & query_kind_
);
~QueryStatus();
@ -256,6 +260,7 @@ class ProcessList
public:
using Element = QueryStatus;
using Entry = ProcessListEntry;
using QueryAmount = UInt64;
/// list, for iterators not to invalidate. NOTE: could replace with cyclic buffer, but not worth.
using Container = std::list<Element>;
@ -265,6 +270,8 @@ public:
/// User -> queries
using UserToQueries = std::unordered_map<String, ProcessListForUser>;
using QueryKindToAmount = std::unordered_map<String, QueryAmount>;
protected:
friend class ProcessListEntry;
@ -287,6 +294,19 @@ protected:
/// Call under lock. Finds process with specified current_user and current_query_id.
QueryStatus * tryGetProcessListElement(const String & current_query_id, const String & current_user);
/// limit for insert. 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
size_t max_insert_queries_amount = 0;
/// limit for select. 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
size_t max_select_queries_amount = 0;
/// amount of queries by query kind.
QueryKindToAmount query_kind_amounts;
void increaseQueryKindAmount(const String & query_kind);
void decreaseQueryKindAmount(const String & query_kind);
QueryAmount getQueryKindAmount(const String & query_kind);
public:
using EntryPtr = std::shared_ptr<ProcessListEntry>;
@ -312,6 +332,18 @@ public:
max_size = max_size_;
}
void setMaxInsertQueriesAmount(size_t max_insert_queries_amount_)
{
std::lock_guard lock(mutex);
max_insert_queries_amount = max_insert_queries_amount_;
}
void setMaxSelectQueriesAmount(size_t max_select_queries_amount_)
{
std::lock_guard lock(mutex);
max_select_queries_amount = max_select_queries_amount_;
}
/// Try call cancel() for input and output streams of query with specified id and user
CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill = false);

View File

@ -0,0 +1,3 @@
<clickhouse>
<max_concurrent_insert_queries>2</max_concurrent_insert_queries>
</clickhouse>

View File

@ -0,0 +1,3 @@
<clickhouse>
<max_concurrent_select_queries>2</max_concurrent_select_queries>
</clickhouse>

View File

@ -0,0 +1,74 @@
import time
from multiprocessing.dummy import Pool
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node_insert = cluster.add_instance('node_insert', main_configs=['configs/concurrent_insert_restriction.xml'])
node_select = cluster.add_instance('node_select', main_configs=['configs/concurrent_select_restriction.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node_select.query("create table test_concurrent_insert (x UInt64) ENGINE = MergeTree() order by tuple()")
node_insert.query("create table test_concurrent_insert (x UInt64) ENGINE = MergeTree() order by tuple()")
yield cluster
finally:
cluster.shutdown()
def execute_with_background(node, sql, background_sql, background_times, wait_times=3):
r = None
for _ in range(wait_times):
r = node.query('show processlist', stdin='')
if not r.strip():
break
time.sleep(1)
else:
assert False, "there are unknown background queries: {}".format(r)
for _ in range(background_times):
node.get_query_request(background_sql, stdin='')
time.sleep(0.5) # wait background to start.
return node.query(sql, stdin='')
def common_pattern(node, query_kind, restricted_sql, normal_sql, limit):
# restriction is working
with pytest.raises(Exception, match=r".*Too many simultaneous {} queries.*".format(query_kind)):
execute_with_background(node, restricted_sql, restricted_sql, limit)
# different query kind is independent
execute_with_background(node, normal_sql, restricted_sql, limit)
# normal
execute_with_background(node, restricted_sql, '', 0)
def test_select(started_cluster):
common_pattern(
node_select, 'select',
'select sleep(3)',
'insert into test_concurrent_insert values (0)',
2
)
# subquery is not counted
execute_with_background(
node_select,
'select sleep(3)',
'insert into test_concurrent_insert select sleep(3)',
2
)
def test_insert(started_cluster):
common_pattern(
node_insert, 'insert',
'insert into test_concurrent_insert select sleep(3)',
'select 1',
2
)