mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Allow SELECT FROM system.processes while max_queries limit is exceeded. [#CLICKHOUSE-3670]
This commit is contained in:
parent
604c7071c3
commit
5536bf202c
@ -1,6 +1,9 @@
|
||||
#include <Interpreters/ProcessList.h>
|
||||
#include <Interpreters/Settings.h>
|
||||
#include <Parsers/ASTSelectWithUnionQuery.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTKillQueryQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <DataStreams/IProfilingBlockInputStream.h>
|
||||
@ -19,21 +22,70 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
/// Should we execute the query even if max_concurrent_queries limit is exhausted
|
||||
static bool isUnlimitedQuery(const IAST * ast)
|
||||
{
|
||||
if (!ast)
|
||||
return false;
|
||||
|
||||
/// It is KILL QUERY
|
||||
if (typeid_cast<const ASTKillQueryQuery *>(ast))
|
||||
return true;
|
||||
|
||||
/// It is SELECT FROM system.processes
|
||||
if (auto ast_selects = typeid_cast<const ASTSelectWithUnionQuery *>(ast))
|
||||
{
|
||||
if (!ast_selects->list_of_selects || ast_selects->list_of_selects->children.empty())
|
||||
return false;
|
||||
|
||||
auto ast_select = typeid_cast<ASTSelectQuery *>(ast_selects->list_of_selects->children[0].get());
|
||||
|
||||
if (!ast_select)
|
||||
return false;
|
||||
|
||||
auto ast_database = ast_select->database();
|
||||
if (!ast_database)
|
||||
return false;
|
||||
|
||||
auto ast_table = ast_select->table();
|
||||
if (!ast_table)
|
||||
return false;
|
||||
|
||||
auto ast_database_id = typeid_cast<const ASTIdentifier *>(ast_database.get());
|
||||
if (!ast_database_id)
|
||||
return false;
|
||||
|
||||
auto ast_table_id = typeid_cast<const ASTIdentifier *>(ast_table.get());
|
||||
if (!ast_table_id)
|
||||
return false;
|
||||
|
||||
return ast_database_id->name == "system" && ast_table_id->name == "processes";
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
ProcessList::EntryPtr ProcessList::insert(
|
||||
const String & query_, const IAST * ast, const ClientInfo & client_info, const Settings & settings)
|
||||
{
|
||||
EntryPtr res;
|
||||
bool is_kill_query = ast && typeid_cast<const ASTKillQueryQuery *>(ast);
|
||||
|
||||
if (client_info.current_query_id.empty())
|
||||
throw Exception("Query id cannot be empty", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
bool is_unlimited_query = isUnlimitedQuery(ast);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
if (!is_kill_query && max_size && cur_size >= max_size
|
||||
&& (!settings.queue_max_wait_ms.totalMilliseconds() || !have_space.tryWait(mutex, settings.queue_max_wait_ms.totalMilliseconds())))
|
||||
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
|
||||
if (!is_unlimited_query && max_size && cur_size >= max_size)
|
||||
{
|
||||
if (!settings.queue_max_wait_ms.totalMilliseconds() || !have_space.tryWait(mutex, settings.queue_max_wait_ms.totalMilliseconds()))
|
||||
{
|
||||
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
|
||||
}
|
||||
}
|
||||
|
||||
/** Why we use current user?
|
||||
* Because initial one is passed by client and credentials for it is not verified,
|
||||
@ -50,7 +102,7 @@ ProcessList::EntryPtr ProcessList::insert(
|
||||
|
||||
if (user_process_list != user_to_queries.end())
|
||||
{
|
||||
if (!is_kill_query && settings.max_concurrent_queries_for_user
|
||||
if (!is_unlimited_query && settings.max_concurrent_queries_for_user
|
||||
&& user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user)
|
||||
throw Exception("Too many simultaneous queries for user " + client_info.current_user
|
||||
+ ". Current: " + toString(user_process_list->second.queries.size())
|
||||
|
Loading…
Reference in New Issue
Block a user