2017-04-01 09:19:00 +00:00
|
|
|
#include <Interpreters/ProcessList.h>
|
2019-03-22 12:08:30 +00:00
|
|
|
#include <Core/Settings.h>
|
2018-05-17 16:01:41 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2018-10-30 16:31:21 +00:00
|
|
|
#include <Interpreters/DatabaseAndTableWithAlias.h>
|
2018-04-17 17:08:15 +00:00
|
|
|
#include <Parsers/ASTSelectWithUnionQuery.h>
|
2021-11-01 13:19:31 +00:00
|
|
|
#include <Parsers/ASTSelectIntersectExceptQuery.h>
|
2018-04-17 17:08:15 +00:00
|
|
|
#include <Parsers/ASTSelectQuery.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Parsers/ASTKillQueryQuery.h>
|
2020-12-02 12:08:03 +00:00
|
|
|
#include <Parsers/queryNormalization.h>
|
2021-07-27 09:52:45 +00:00
|
|
|
#include <Processors/Executors/PipelineExecutor.h>
|
2018-05-17 16:01:41 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/Exception.h>
|
2018-05-29 18:14:31 +00:00
|
|
|
#include <Common/CurrentThread.h>
|
2021-12-29 14:25:56 +00:00
|
|
|
#include "Parsers/IAST.h"
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/WriteHelpers.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/logger_useful.h>
|
2018-04-18 20:18:18 +00:00
|
|
|
#include <chrono>
|
|
|
|
|
2017-01-21 04:24:28 +00:00
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2018-03-09 23:23:15 +00:00
|
|
|
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING;
|
2018-03-09 22:11:42 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2021-11-26 11:44:39 +00:00
|
|
|
extern const int QUERY_WAS_CANCELLED;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
|
2018-04-17 17:08:15 +00:00
|
|
|
/// Should we execute the query even if max_concurrent_queries limit is exhausted
|
|
|
|
static bool isUnlimitedQuery(const IAST * ast)
|
|
|
|
{
|
|
|
|
if (!ast)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
/// It is KILL QUERY
|
2019-03-11 13:22:51 +00:00
|
|
|
if (ast->as<ASTKillQueryQuery>())
|
2018-04-17 17:08:15 +00:00
|
|
|
return true;
|
|
|
|
|
|
|
|
/// It is SELECT FROM system.processes
|
2018-04-18 21:14:47 +00:00
|
|
|
/// NOTE: This is very rough check.
|
|
|
|
/// False negative: USE system; SELECT * FROM processes;
|
|
|
|
/// False positive: SELECT * FROM system.processes CROSS JOIN (SELECT ...)
|
|
|
|
|
2019-03-11 13:22:51 +00:00
|
|
|
if (const auto * ast_selects = ast->as<ASTSelectWithUnionQuery>())
|
2018-04-17 17:08:15 +00:00
|
|
|
{
|
|
|
|
if (!ast_selects->list_of_selects || ast_selects->list_of_selects->children.empty())
|
|
|
|
return false;
|
|
|
|
|
2019-03-11 13:22:51 +00:00
|
|
|
const auto * ast_select = ast_selects->list_of_selects->children[0]->as<ASTSelectQuery>();
|
2018-04-17 17:08:15 +00:00
|
|
|
if (!ast_select)
|
|
|
|
return false;
|
|
|
|
|
2018-10-30 16:31:21 +00:00
|
|
|
if (auto database_and_table = getDatabaseAndTable(*ast_select, 0))
|
|
|
|
return database_and_table->database == "system" && database_and_table->table == "processes";
|
2018-04-17 17:08:15 +00:00
|
|
|
|
2018-10-30 16:31:21 +00:00
|
|
|
return false;
|
2018-04-17 17:08:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, ContextPtr query_context)
|
2015-06-21 06:06:04 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
EntryPtr res;
|
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
const ClientInfo & client_info = query_context->getClientInfo();
|
|
|
|
const Settings & settings = query_context->getSettingsRef();
|
2018-05-17 16:01:41 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
if (client_info.current_query_id.empty())
|
|
|
|
throw Exception("Query id cannot be empty", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2018-04-17 17:08:15 +00:00
|
|
|
bool is_unlimited_query = isUnlimitedQuery(ast);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
{
|
2018-04-18 20:18:18 +00:00
|
|
|
std::unique_lock lock(mutex);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-08-28 15:20:22 +00:00
|
|
|
const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
|
2018-05-14 16:09:00 +00:00
|
|
|
if (!is_unlimited_query && max_size && processes.size() >= max_size)
|
2018-04-17 17:08:15 +00:00
|
|
|
{
|
2019-10-18 20:35:41 +00:00
|
|
|
if (queue_max_wait_ms)
|
2020-05-30 21:57:37 +00:00
|
|
|
LOG_WARNING(&Poco::Logger::get("ProcessList"), "Too many simultaneous queries, will wait {} ms.", queue_max_wait_ms);
|
2019-08-28 15:20:22 +00:00
|
|
|
if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms), [&]{ return processes.size() < max_size; }))
|
2018-04-17 17:08:15 +00:00
|
|
|
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-12-12 08:07:06 +00:00
|
|
|
if (!is_unlimited_query)
|
|
|
|
{
|
2021-12-29 14:25:56 +00:00
|
|
|
QueryAmount amount = getQueryKindAmount(ast->getQueryKind());
|
|
|
|
if (max_insert_queries_amount && ast->getQueryKind() == IAST::QueryKind::Insert && amount >= max_insert_queries_amount)
|
2021-12-12 08:07:06 +00:00
|
|
|
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
|
|
|
|
"Too many simultaneous insert queries. Maximum: {}, current: {}",
|
|
|
|
max_insert_queries_amount, amount);
|
2021-12-29 14:25:56 +00:00
|
|
|
if (max_select_queries_amount && ast->getQueryKind() == IAST::QueryKind::Select && amount >= max_select_queries_amount)
|
2021-12-12 08:07:06 +00:00
|
|
|
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
|
|
|
|
"Too many simultaneous select queries. Maximum: {}, current: {}",
|
|
|
|
max_select_queries_amount, amount);
|
|
|
|
}
|
|
|
|
|
2020-10-19 14:31:12 +00:00
|
|
|
{
|
2020-10-20 10:57:18 +00:00
|
|
|
/**
|
|
|
|
* `max_size` check above is controlled by `max_concurrent_queries` server setting and is a "hard" limit for how many
|
|
|
|
* queries the server can process concurrently. It is configured at startup. When the server is overloaded with queries and the
|
|
|
|
* hard limit is reached it is impossible to connect to the server to run queries for investigation.
|
|
|
|
*
|
|
|
|
* With `max_concurrent_queries_for_all_users` it is possible to configure an additional, runtime configurable, limit for query concurrency.
|
|
|
|
* Usually it should be configured just once for `default_profile` which is inherited by all users. DBAs can override
|
|
|
|
* this setting when connecting to ClickHouse, or it can be configured for a DBA profile to have a value greater than that of
|
|
|
|
* the default profile (or 0 for unlimited).
|
|
|
|
*
|
|
|
|
* One example is to set `max_size=X`, `max_concurrent_queries_for_all_users=X-10` for default profile,
|
|
|
|
* and `max_concurrent_queries_for_all_users=0` for DBAs or accounts that are vital for ClickHouse operations (like metrics
|
|
|
|
* exporters).
|
|
|
|
*
|
|
|
|
* Another creative example is to configure `max_concurrent_queries_for_all_users=50` for "analyst" profiles running adhoc queries
|
|
|
|
* and `max_concurrent_queries_for_all_users=100` for "customer facing" services. This way "analyst" queries will be rejected
|
|
|
|
* once is already processing 50+ concurrent queries (including analysts or any other users).
|
|
|
|
*/
|
|
|
|
|
2020-10-19 14:31:12 +00:00
|
|
|
if (!is_unlimited_query && settings.max_concurrent_queries_for_all_users
|
2020-10-20 10:57:18 +00:00
|
|
|
&& processes.size() >= settings.max_concurrent_queries_for_all_users)
|
2020-10-19 14:31:12 +00:00
|
|
|
throw Exception(
|
|
|
|
"Too many simultaneous queries for all users. Current: " + toString(processes.size())
|
|
|
|
+ ", maximum: " + settings.max_concurrent_queries_for_all_users.toString(),
|
|
|
|
ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
|
|
|
|
}
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/** Why we use current user?
|
|
|
|
* Because initial one is passed by client and credentials for it is not verified,
|
|
|
|
* and using initial_user for limits will be insecure.
|
|
|
|
*
|
|
|
|
* Why we use current_query_id?
|
|
|
|
* Because we want to allow distributed queries that will run multiple secondary queries on same server,
|
|
|
|
* like SELECT count() FROM remote('127.0.0.{1,2}', system.numbers)
|
|
|
|
* so they must have different query_ids.
|
|
|
|
*/
|
|
|
|
|
|
|
|
{
|
|
|
|
auto user_process_list = user_to_queries.find(client_info.current_user);
|
|
|
|
|
|
|
|
if (user_process_list != user_to_queries.end())
|
|
|
|
{
|
2018-04-17 17:08:15 +00:00
|
|
|
if (!is_unlimited_query && settings.max_concurrent_queries_for_user
|
2017-04-01 07:20:54 +00:00
|
|
|
&& user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user)
|
2017-10-25 19:17:37 +00:00
|
|
|
throw Exception("Too many simultaneous queries for user " + client_info.current_user
|
2017-04-01 07:20:54 +00:00
|
|
|
+ ". Current: " + toString(user_process_list->second.queries.size())
|
|
|
|
+ ", maximum: " + settings.max_concurrent_queries_for_user.toString(),
|
2018-03-09 23:23:15 +00:00
|
|
|
ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-06-30 13:17:27 +00:00
|
|
|
auto running_query = user_process_list->second.queries.find(client_info.current_query_id);
|
|
|
|
|
|
|
|
if (running_query != user_process_list->second.queries.end())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-03-09 23:04:26 +00:00
|
|
|
if (!settings.replace_running_query)
|
|
|
|
throw Exception("Query with id = " + client_info.current_query_id + " is already running.",
|
|
|
|
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
|
|
|
|
|
|
|
|
/// Ask queries to cancel. They will check this flag.
|
2019-06-30 13:17:27 +00:00
|
|
|
running_query->second->is_killed.store(true, std::memory_order_relaxed);
|
|
|
|
|
2019-08-28 15:20:22 +00:00
|
|
|
const auto replace_running_query_max_wait_ms = settings.replace_running_query_max_wait_ms.totalMilliseconds();
|
|
|
|
if (!replace_running_query_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(replace_running_query_max_wait_ms),
|
|
|
|
[&]
|
2019-06-30 13:17:27 +00:00
|
|
|
{
|
|
|
|
running_query = user_process_list->second.queries.find(client_info.current_query_id);
|
|
|
|
if (running_query == user_process_list->second.queries.end())
|
|
|
|
return true;
|
|
|
|
running_query->second->is_killed.store(true, std::memory_order_relaxed);
|
|
|
|
return false;
|
|
|
|
}))
|
2019-08-28 15:20:22 +00:00
|
|
|
{
|
2019-06-30 13:17:27 +00:00
|
|
|
throw Exception("Query with id = " + client_info.current_query_id + " is already running and can't be stopped",
|
|
|
|
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
|
2019-08-28 15:20:22 +00:00
|
|
|
}
|
2019-06-30 13:17:27 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-30 13:17:27 +00:00
|
|
|
/// Check other users running query with our query_id
|
|
|
|
for (const auto & user_process_list : user_to_queries)
|
|
|
|
{
|
|
|
|
if (user_process_list.first == client_info.current_user)
|
|
|
|
continue;
|
|
|
|
if (auto running_query = user_process_list.second.queries.find(client_info.current_query_id); running_query != user_process_list.second.queries.end())
|
|
|
|
throw Exception("Query with id = " + client_info.current_query_id + " is already running by user " + user_process_list.first,
|
|
|
|
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
|
|
|
|
}
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
auto process_it = processes.emplace(processes.end(),
|
2021-12-29 14:25:56 +00:00
|
|
|
query_context, query_, client_info, priorities.insert(settings.priority), ast->getQueryKind());
|
2021-12-12 08:07:06 +00:00
|
|
|
|
2021-12-29 14:25:56 +00:00
|
|
|
increaseQueryKindAmount(ast->getQueryKind());
|
2018-02-01 17:55:08 +00:00
|
|
|
|
|
|
|
res = std::make_shared<Entry>(*this, process_it);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-05-01 19:01:02 +00:00
|
|
|
ProcessListForUser & user_process_list = user_to_queries[client_info.current_user];
|
|
|
|
user_process_list.queries.emplace(client_info.current_query_id, &res->get());
|
2018-05-14 16:09:00 +00:00
|
|
|
|
2020-05-01 19:01:02 +00:00
|
|
|
process_it->setUserProcessList(&user_process_list);
|
2018-02-01 17:55:08 +00:00
|
|
|
|
2020-05-01 19:01:02 +00:00
|
|
|
/// Track memory usage for all simultaneously running queries from single user.
|
|
|
|
user_process_list.user_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_user);
|
|
|
|
user_process_list.user_memory_tracker.setDescription("(for user)");
|
2018-02-01 17:55:08 +00:00
|
|
|
|
2020-05-01 19:01:02 +00:00
|
|
|
/// Actualize thread group info
|
|
|
|
if (auto thread_group = CurrentThread::getGroup())
|
|
|
|
{
|
|
|
|
std::lock_guard lock_thread_group(thread_group->mutex);
|
|
|
|
thread_group->performance_counters.setParent(&user_process_list.user_performance_counters);
|
|
|
|
thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker);
|
|
|
|
thread_group->query = process_it->query;
|
2021-01-26 14:51:30 +00:00
|
|
|
thread_group->normalized_query_hash = normalizedQueryHash<false>(process_it->query);
|
2018-06-19 20:30:35 +00:00
|
|
|
|
2020-05-01 19:01:02 +00:00
|
|
|
/// Set query-level memory trackers
|
|
|
|
thread_group->memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage);
|
2020-03-03 00:24:44 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
if (query_context->hasTraceCollector())
|
2020-05-01 19:01:02 +00:00
|
|
|
{
|
|
|
|
/// Set up memory profiling
|
|
|
|
thread_group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
|
2020-05-01 19:17:56 +00:00
|
|
|
thread_group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
|
2020-05-01 19:01:02 +00:00
|
|
|
}
|
2020-03-03 00:24:44 +00:00
|
|
|
|
2020-05-01 19:01:02 +00:00
|
|
|
thread_group->memory_tracker.setDescription("(for query)");
|
|
|
|
if (settings.memory_tracker_fault_probability)
|
|
|
|
thread_group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);
|
2018-06-19 20:30:35 +00:00
|
|
|
|
2020-05-01 19:01:02 +00:00
|
|
|
/// NOTE: Do not set the limit for thread-level memory tracker since it could show unreal values
|
|
|
|
/// since allocation and deallocation could happen in different threads
|
2018-06-19 20:30:35 +00:00
|
|
|
|
2020-05-01 19:01:02 +00:00
|
|
|
process_it->thread_group = std::move(thread_group);
|
|
|
|
}
|
2017-08-29 13:23:04 +00:00
|
|
|
|
2020-05-01 19:01:02 +00:00
|
|
|
if (!user_process_list.user_throttler)
|
|
|
|
{
|
|
|
|
if (settings.max_network_bandwidth_for_user)
|
|
|
|
user_process_list.user_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_user, total_network_throttler);
|
|
|
|
else if (settings.max_network_bandwidth_for_all_users)
|
|
|
|
user_process_list.user_throttler = total_network_throttler;
|
2018-03-09 23:04:26 +00:00
|
|
|
}
|
2017-08-29 13:23:04 +00:00
|
|
|
|
2018-03-29 13:24:36 +00:00
|
|
|
if (!total_network_throttler && settings.max_network_bandwidth_for_all_users)
|
2018-03-09 23:04:26 +00:00
|
|
|
{
|
2018-03-29 13:24:36 +00:00
|
|
|
total_network_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_all_users);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
2015-06-21 06:06:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ProcessListEntry::~ProcessListEntry()
|
|
|
|
{
|
2018-10-08 05:30:03 +00:00
|
|
|
std::lock_guard lock(parent.mutex);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
String user = it->getClientInfo().current_user;
|
|
|
|
String query_id = it->getClientInfo().current_query_id;
|
2021-12-29 14:25:56 +00:00
|
|
|
IAST::QueryKind query_kind = it->query_kind;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-05-14 16:09:00 +00:00
|
|
|
const QueryStatus * process_list_element_ptr = &*it;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-02 17:37:49 +00:00
|
|
|
/// This removes the memory_tracker of one request.
|
2018-02-01 17:55:08 +00:00
|
|
|
parent.processes.erase(it);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-03-09 22:11:42 +00:00
|
|
|
auto user_process_list_it = parent.user_to_queries.find(user);
|
2018-03-09 23:04:26 +00:00
|
|
|
if (user_process_list_it == parent.user_to_queries.end())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2020-05-30 21:57:37 +00:00
|
|
|
LOG_ERROR(&Poco::Logger::get("ProcessList"), "Logical error: cannot find user in ProcessList");
|
2018-03-09 23:04:26 +00:00
|
|
|
std::terminate();
|
|
|
|
}
|
2018-03-09 22:11:42 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
ProcessListForUser & user_process_list = user_process_list_it->second;
|
2018-03-09 22:11:42 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
bool found = false;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-06-30 13:17:27 +00:00
|
|
|
if (auto running_query = user_process_list.queries.find(query_id); running_query != user_process_list.queries.end())
|
2018-03-09 23:04:26 +00:00
|
|
|
{
|
2019-06-30 13:17:27 +00:00
|
|
|
if (running_query->second == process_list_element_ptr)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-06-30 13:17:27 +00:00
|
|
|
user_process_list.queries.erase(running_query->first);
|
|
|
|
found = true;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2018-03-09 23:04:26 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
if (!found)
|
|
|
|
{
|
2020-05-30 21:57:37 +00:00
|
|
|
LOG_ERROR(&Poco::Logger::get("ProcessList"), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser");
|
2018-03-09 23:04:26 +00:00
|
|
|
std::terminate();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2021-12-12 08:07:06 +00:00
|
|
|
|
|
|
|
parent.decreaseQueryKindAmount(query_kind);
|
|
|
|
|
2019-06-30 13:17:27 +00:00
|
|
|
parent.have_space.notify_all();
|
2018-05-14 16:09:00 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
/// If there are no more queries for the user, then we will reset memory tracker and network throttler.
|
|
|
|
if (user_process_list.queries.empty())
|
2018-06-19 20:30:35 +00:00
|
|
|
user_process_list.resetTrackers();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-04-19 21:43:06 +00:00
|
|
|
/// Reset throttler, similarly (see above).
|
2020-03-08 21:40:00 +00:00
|
|
|
if (parent.processes.empty())
|
2018-03-29 13:24:36 +00:00
|
|
|
parent.total_network_throttler.reset();
|
2015-06-21 06:06:04 +00:00
|
|
|
}
|
|
|
|
|
2015-09-04 20:52:00 +00:00
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
QueryStatus::QueryStatus(
|
2021-12-29 14:25:56 +00:00
|
|
|
ContextPtr context_, const String & query_, const ClientInfo & client_info_, QueryPriorities::Handle && priority_handle_, const IAST::QueryKind & query_kind_)
|
2021-04-10 23:33:54 +00:00
|
|
|
: WithContext(context_)
|
|
|
|
, query(query_)
|
|
|
|
, client_info(client_info_)
|
|
|
|
, priority_handle(std::move(priority_handle_))
|
2021-12-12 08:07:06 +00:00
|
|
|
, query_kind(query_kind_)
|
2018-02-01 17:55:08 +00:00
|
|
|
{
|
2021-11-26 11:44:39 +00:00
|
|
|
auto settings = getContext()->getSettings();
|
|
|
|
limits.max_execution_time = settings.max_execution_time;
|
|
|
|
overflow_mode = settings.timeout_overflow_mode;
|
2018-02-01 17:55:08 +00:00
|
|
|
}
|
|
|
|
|
2021-07-27 09:52:45 +00:00
|
|
|
QueryStatus::~QueryStatus()
|
|
|
|
{
|
|
|
|
assert(executors.empty());
|
|
|
|
}
|
2018-02-01 17:55:08 +00:00
|
|
|
|
2021-09-16 17:40:42 +00:00
|
|
|
CancellationCode QueryStatus::cancelQuery(bool)
|
2017-02-03 16:15:12 +00:00
|
|
|
{
|
2021-09-16 17:40:42 +00:00
|
|
|
if (is_killed.load())
|
2019-02-01 01:48:25 +00:00
|
|
|
return CancellationCode::CancelSent;
|
|
|
|
|
2021-09-21 08:01:00 +00:00
|
|
|
is_killed.store(true);
|
|
|
|
|
2021-09-20 19:07:33 +00:00
|
|
|
std::lock_guard lock(executors_mutex);
|
|
|
|
for (auto * e : executors)
|
|
|
|
e->cancel();
|
2019-02-01 01:48:25 +00:00
|
|
|
|
|
|
|
return CancellationCode::CancelSent;
|
|
|
|
}
|
|
|
|
|
2021-07-27 09:52:45 +00:00
|
|
|
void QueryStatus::addPipelineExecutor(PipelineExecutor * e)
|
|
|
|
{
|
2021-09-16 17:40:42 +00:00
|
|
|
std::lock_guard lock(executors_mutex);
|
2021-07-27 09:52:45 +00:00
|
|
|
assert(std::find(executors.begin(), executors.end(), e) == executors.end());
|
|
|
|
executors.push_back(e);
|
|
|
|
}
|
|
|
|
|
|
|
|
void QueryStatus::removePipelineExecutor(PipelineExecutor * e)
|
|
|
|
{
|
2021-09-16 17:40:42 +00:00
|
|
|
std::lock_guard lock(executors_mutex);
|
2021-07-27 09:52:45 +00:00
|
|
|
assert(std::find(executors.begin(), executors.end(), e) != executors.end());
|
|
|
|
std::erase_if(executors, [e](PipelineExecutor * x) { return x == e; });
|
|
|
|
}
|
|
|
|
|
2021-11-26 11:44:39 +00:00
|
|
|
bool QueryStatus::checkTimeLimit()
|
|
|
|
{
|
|
|
|
if (is_killed.load())
|
|
|
|
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
|
|
|
|
|
|
|
|
return limits.checkTimeLimit(watch, overflow_mode);
|
|
|
|
}
|
|
|
|
|
2021-12-02 13:53:55 +00:00
|
|
|
bool QueryStatus::checkTimeLimitSoft()
|
|
|
|
{
|
|
|
|
if (is_killed.load())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
return limits.checkTimeLimit(watch, OverflowMode::BREAK);
|
|
|
|
}
|
|
|
|
|
2016-11-30 17:31:05 +00:00
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
|
|
|
|
{
|
|
|
|
user_process_list = user_process_list_;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-05-14 16:09:00 +00:00
|
|
|
ThrottlerPtr QueryStatus::getUserNetworkThrottler()
|
2015-09-04 20:52:00 +00:00
|
|
|
{
|
2018-03-09 23:04:26 +00:00
|
|
|
if (!user_process_list)
|
|
|
|
return {};
|
|
|
|
return user_process_list->user_throttler;
|
2015-09-04 20:52:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
QueryStatus * ProcessList::tryGetProcessListElement(const String & current_query_id, const String & current_user)
|
2017-01-24 15:11:36 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
auto user_it = user_to_queries.find(current_user);
|
|
|
|
if (user_it != user_to_queries.end())
|
|
|
|
{
|
|
|
|
const auto & user_queries = user_it->second.queries;
|
|
|
|
auto query_it = user_queries.find(current_query_id);
|
2017-01-24 15:11:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (query_it != user_queries.end())
|
|
|
|
return query_it->second;
|
|
|
|
}
|
2017-01-24 15:11:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return nullptr;
|
2017-01-24 15:11:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-02-01 01:48:25 +00:00
|
|
|
CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill)
|
2016-11-30 17:31:05 +00:00
|
|
|
{
|
2019-01-02 06:44:36 +00:00
|
|
|
std::lock_guard lock(mutex);
|
2016-11-30 17:31:05 +00:00
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
QueryStatus * elem = tryGetProcessListElement(current_query_id, current_user);
|
2017-01-24 15:11:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!elem)
|
|
|
|
return CancellationCode::NotFound;
|
2017-01-24 15:11:36 +00:00
|
|
|
|
2019-02-01 01:48:25 +00:00
|
|
|
return elem->cancelQuery(kill);
|
2016-11-30 17:31:05 +00:00
|
|
|
}
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
|
2019-07-30 23:12:04 +00:00
|
|
|
void ProcessList::killAllQueries()
|
|
|
|
{
|
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
|
|
|
|
for (auto & process : processes)
|
|
|
|
process.cancelQuery(true);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-05-17 16:01:41 +00:00
|
|
|
QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const
|
|
|
|
{
|
2020-09-24 23:24:58 +00:00
|
|
|
QueryStatusInfo res{};
|
2018-05-17 16:01:41 +00:00
|
|
|
|
|
|
|
res.query = query;
|
|
|
|
res.client_info = client_info;
|
|
|
|
res.elapsed_seconds = watch.elapsedSeconds();
|
|
|
|
res.is_cancelled = is_killed.load(std::memory_order_relaxed);
|
2019-05-20 11:37:41 +00:00
|
|
|
res.read_rows = progress_in.read_rows;
|
|
|
|
res.read_bytes = progress_in.read_bytes;
|
|
|
|
res.total_rows = progress_in.total_rows_to_read;
|
2019-05-21 04:06:36 +00:00
|
|
|
|
2021-12-09 13:00:12 +00:00
|
|
|
res.written_rows = progress_out.written_rows;
|
|
|
|
res.written_bytes = progress_out.written_bytes;
|
2018-05-17 16:01:41 +00:00
|
|
|
|
2018-06-19 20:30:35 +00:00
|
|
|
if (thread_group)
|
2018-05-17 16:01:41 +00:00
|
|
|
{
|
2018-06-19 20:30:35 +00:00
|
|
|
res.memory_usage = thread_group->memory_tracker.get();
|
|
|
|
res.peak_memory_usage = thread_group->memory_tracker.getPeak();
|
2018-05-17 16:01:41 +00:00
|
|
|
|
2018-06-19 20:30:35 +00:00
|
|
|
if (get_thread_list)
|
|
|
|
{
|
2019-02-08 13:23:10 +00:00
|
|
|
std::lock_guard lock(thread_group->mutex);
|
2020-02-02 02:27:15 +00:00
|
|
|
res.thread_ids = thread_group->thread_ids;
|
2018-06-19 20:30:35 +00:00
|
|
|
}
|
2018-05-17 16:01:41 +00:00
|
|
|
|
2018-06-19 20:30:35 +00:00
|
|
|
if (get_profile_events)
|
2021-10-11 15:56:23 +00:00
|
|
|
res.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_group->performance_counters.getPartiallyAtomicSnapshot());
|
2018-06-19 20:30:35 +00:00
|
|
|
}
|
2018-05-17 16:01:41 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
if (get_settings && getContext())
|
2021-03-30 13:35:33 +00:00
|
|
|
{
|
2021-04-10 23:33:54 +00:00
|
|
|
res.query_settings = std::make_shared<Settings>(getContext()->getSettings());
|
|
|
|
res.current_database = getContext()->getCurrentDatabase();
|
2021-03-30 13:35:33 +00:00
|
|
|
}
|
2018-05-17 16:01:41 +00:00
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const
|
|
|
|
{
|
|
|
|
Info per_query_infos;
|
|
|
|
|
2019-01-02 06:44:36 +00:00
|
|
|
std::lock_guard lock(mutex);
|
2018-05-17 16:01:41 +00:00
|
|
|
|
|
|
|
per_query_infos.reserve(processes.size());
|
|
|
|
for (const auto & process : processes)
|
|
|
|
per_query_infos.emplace_back(process.getInfo(get_thread_list, get_profile_events, get_settings));
|
|
|
|
|
|
|
|
return per_query_infos;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-06-09 15:29:08 +00:00
|
|
|
ProcessListForUser::ProcessListForUser() = default;
|
2018-02-01 17:55:08 +00:00
|
|
|
|
|
|
|
|
2020-03-20 17:27:17 +00:00
|
|
|
ProcessListForUserInfo ProcessListForUser::getInfo(bool get_profile_events) const
|
|
|
|
{
|
|
|
|
ProcessListForUserInfo res;
|
|
|
|
|
|
|
|
res.memory_usage = user_memory_tracker.get();
|
|
|
|
res.peak_memory_usage = user_memory_tracker.getPeak();
|
|
|
|
|
|
|
|
if (get_profile_events)
|
2021-10-11 15:56:23 +00:00
|
|
|
res.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(user_performance_counters.getPartiallyAtomicSnapshot());
|
2020-03-20 17:27:17 +00:00
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ProcessList::UserInfo ProcessList::getUserInfo(bool get_profile_events) const
|
|
|
|
{
|
|
|
|
UserInfo per_user_infos;
|
|
|
|
|
|
|
|
std::lock_guard lock(mutex);
|
|
|
|
|
|
|
|
per_user_infos.reserve(user_to_queries.size());
|
|
|
|
|
|
|
|
for (const auto & [user, user_queries] : user_to_queries)
|
|
|
|
per_user_infos.emplace(user, user_queries.getInfo(get_profile_events));
|
|
|
|
|
|
|
|
return per_user_infos;
|
|
|
|
}
|
|
|
|
|
2021-12-29 14:25:56 +00:00
|
|
|
void ProcessList::increaseQueryKindAmount(const IAST::QueryKind & query_kind) const
|
2021-12-12 08:07:06 +00:00
|
|
|
{
|
2021-12-29 14:25:56 +00:00
|
|
|
if (query_kind == IAST::QueryKind::Insert)
|
|
|
|
query_kind_amounts->insert++;
|
|
|
|
else if (query_kind == IAST::QueryKind::Select)
|
|
|
|
query_kind_amounts->select++;
|
2021-12-12 08:07:06 +00:00
|
|
|
}
|
|
|
|
|
2021-12-29 14:25:56 +00:00
|
|
|
void ProcessList::decreaseQueryKindAmount(const IAST::QueryKind & query_kind) const
|
2021-12-12 08:07:06 +00:00
|
|
|
{
|
2021-12-29 14:25:56 +00:00
|
|
|
if (!(query_kind == IAST::QueryKind::Insert || query_kind == IAST::QueryKind::Select))
|
|
|
|
return;
|
2021-12-12 08:07:06 +00:00
|
|
|
|
2021-12-29 14:25:56 +00:00
|
|
|
QueryAmount amount = getQueryKindAmount(query_kind);
|
|
|
|
if (amount == 0)
|
|
|
|
{
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong query kind amount: decrease to negative on '{}'", query_kind, amount);
|
|
|
|
}
|
|
|
|
if (query_kind == IAST::QueryKind::Insert)
|
|
|
|
query_kind_amounts->insert--;
|
|
|
|
else if (query_kind == IAST::QueryKind::Select)
|
|
|
|
query_kind_amounts->select--;
|
2021-12-12 08:07:06 +00:00
|
|
|
}
|
2021-12-29 14:25:56 +00:00
|
|
|
|
|
|
|
ProcessList::QueryAmount ProcessList::getQueryKindAmount(const IAST::QueryKind & query_kind) const
|
2021-12-12 08:07:06 +00:00
|
|
|
{
|
2021-12-29 14:25:56 +00:00
|
|
|
if (query_kind == IAST::QueryKind::Insert)
|
|
|
|
return query_kind_amounts->insert;
|
|
|
|
else if (query_kind == IAST::QueryKind::Select)
|
|
|
|
return query_kind_amounts->select;
|
|
|
|
else
|
2021-12-12 08:07:06 +00:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
}
|