2017-04-01 09:19:00 +00:00
|
|
|
#include <Interpreters/ProcessList.h>
|
|
|
|
#include <Interpreters/Settings.h>
|
2018-05-17 16:01:41 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2018-04-17 17:08:15 +00:00
|
|
|
#include <Parsers/ASTSelectWithUnionQuery.h>
|
|
|
|
#include <Parsers/ASTSelectQuery.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Parsers/ASTKillQueryQuery.h>
|
2018-04-17 17:08:15 +00:00
|
|
|
#include <Parsers/ASTIdentifier.h>
|
2018-05-17 16:01:41 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/Exception.h>
|
2018-05-29 18:14:31 +00:00
|
|
|
#include <Common/CurrentThread.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <DataStreams/IProfilingBlockInputStream.h>
|
2018-02-01 17:55:08 +00:00
|
|
|
#include <common/logger_useful.h>
|
|
|
|
#include <pthread.h>
|
2018-04-18 20:18:18 +00:00
|
|
|
#include <chrono>
|
|
|
|
|
2017-01-21 04:24:28 +00:00
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2018-03-09 23:23:15 +00:00
|
|
|
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING;
|
2018-03-09 22:11:42 +00:00
|
|
|
extern const int LOGICAL_ERROR;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
|
2018-04-17 17:08:15 +00:00
|
|
|
/// Should we execute the query even if max_concurrent_queries limit is exhausted
|
|
|
|
static bool isUnlimitedQuery(const IAST * ast)
|
|
|
|
{
|
|
|
|
if (!ast)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
/// It is KILL QUERY
|
|
|
|
if (typeid_cast<const ASTKillQueryQuery *>(ast))
|
|
|
|
return true;
|
|
|
|
|
|
|
|
/// It is SELECT FROM system.processes
|
2018-04-18 21:14:47 +00:00
|
|
|
/// NOTE: This is very rough check.
|
|
|
|
/// False negative: USE system; SELECT * FROM processes;
|
|
|
|
/// False positive: SELECT * FROM system.processes CROSS JOIN (SELECT ...)
|
|
|
|
|
2018-04-17 17:08:15 +00:00
|
|
|
if (auto ast_selects = typeid_cast<const ASTSelectWithUnionQuery *>(ast))
|
|
|
|
{
|
|
|
|
if (!ast_selects->list_of_selects || ast_selects->list_of_selects->children.empty())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
auto ast_select = typeid_cast<ASTSelectQuery *>(ast_selects->list_of_selects->children[0].get());
|
|
|
|
|
|
|
|
if (!ast_select)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
auto ast_database = ast_select->database();
|
|
|
|
if (!ast_database)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
auto ast_table = ast_select->table();
|
|
|
|
if (!ast_table)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
auto ast_database_id = typeid_cast<const ASTIdentifier *>(ast_database.get());
|
|
|
|
if (!ast_database_id)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
auto ast_table_id = typeid_cast<const ASTIdentifier *>(ast_table.get());
|
|
|
|
if (!ast_table_id)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
return ast_database_id->name == "system" && ast_table_id->name == "processes";
|
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
|
2018-05-17 16:01:41 +00:00
|
|
|
ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, Context & query_context)
|
2015-06-21 06:06:04 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
EntryPtr res;
|
|
|
|
|
2018-05-17 16:01:41 +00:00
|
|
|
const ClientInfo & client_info = query_context.getClientInfo();
|
|
|
|
const Settings & settings = query_context.getSettingsRef();
|
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
if (client_info.current_query_id.empty())
|
|
|
|
throw Exception("Query id cannot be empty", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2018-04-17 17:08:15 +00:00
|
|
|
bool is_unlimited_query = isUnlimitedQuery(ast);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
{
|
2018-04-18 20:18:18 +00:00
|
|
|
std::unique_lock lock(mutex);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-05-14 16:09:00 +00:00
|
|
|
if (!is_unlimited_query && max_size && processes.size() >= max_size)
|
2018-04-17 17:08:15 +00:00
|
|
|
{
|
2018-04-18 20:18:18 +00:00
|
|
|
auto max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
|
|
|
|
|
2018-05-14 16:09:00 +00:00
|
|
|
if (!max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(max_wait_ms), [&]{ return processes.size() < max_size; }))
|
2018-04-17 17:08:15 +00:00
|
|
|
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
/** Why we use current user?
|
|
|
|
* Because initial one is passed by client and credentials for it is not verified,
|
|
|
|
* and using initial_user for limits will be insecure.
|
|
|
|
*
|
|
|
|
* Why we use current_query_id?
|
|
|
|
* Because we want to allow distributed queries that will run multiple secondary queries on same server,
|
|
|
|
* like SELECT count() FROM remote('127.0.0.{1,2}', system.numbers)
|
|
|
|
* so they must have different query_ids.
|
|
|
|
*/
|
|
|
|
|
|
|
|
{
|
|
|
|
auto user_process_list = user_to_queries.find(client_info.current_user);
|
|
|
|
|
|
|
|
if (user_process_list != user_to_queries.end())
|
|
|
|
{
|
2018-04-17 17:08:15 +00:00
|
|
|
if (!is_unlimited_query && settings.max_concurrent_queries_for_user
|
2017-04-01 07:20:54 +00:00
|
|
|
&& user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user)
|
2017-10-25 19:17:37 +00:00
|
|
|
throw Exception("Too many simultaneous queries for user " + client_info.current_user
|
2017-04-01 07:20:54 +00:00
|
|
|
+ ". Current: " + toString(user_process_list->second.queries.size())
|
|
|
|
+ ", maximum: " + settings.max_concurrent_queries_for_user.toString(),
|
2018-03-09 23:23:15 +00:00
|
|
|
ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
auto range = user_process_list->second.queries.equal_range(client_info.current_query_id);
|
|
|
|
if (range.first != range.second)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-03-09 23:04:26 +00:00
|
|
|
if (!settings.replace_running_query)
|
|
|
|
throw Exception("Query with id = " + client_info.current_query_id + " is already running.",
|
|
|
|
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
|
|
|
|
|
|
|
|
/// Ask queries to cancel. They will check this flag.
|
|
|
|
for (auto it = range.first; it != range.second; ++it)
|
2018-04-07 03:49:49 +00:00
|
|
|
it->second->is_killed.store(true, std::memory_order_relaxed);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
auto process_it = processes.emplace(processes.end(),
|
2018-05-14 16:09:00 +00:00
|
|
|
query_, client_info, settings.max_memory_usage, settings.memory_tracker_fault_probability, priorities.insert(settings.priority));
|
2018-02-01 17:55:08 +00:00
|
|
|
|
|
|
|
res = std::make_shared<Entry>(*this, process_it);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-05-17 16:01:41 +00:00
|
|
|
process_it->query_context = &query_context;
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!client_info.current_query_id.empty())
|
|
|
|
{
|
|
|
|
ProcessListForUser & user_process_list = user_to_queries[client_info.current_user];
|
2018-05-14 16:09:00 +00:00
|
|
|
user_process_list.queries.emplace(client_info.current_query_id, &res->get());
|
|
|
|
|
|
|
|
process_it->setUserProcessList(&user_process_list);
|
2018-02-01 17:55:08 +00:00
|
|
|
|
|
|
|
/// Limits are only raised (to be more relaxed) or set to something instead of zero,
|
|
|
|
/// because settings for different queries will interfere each other:
|
|
|
|
/// setting from one query effectively sets values for all other queries.
|
|
|
|
|
|
|
|
/// Track memory usage for all simultaneously running queries.
|
|
|
|
/// You should specify this value in configuration for default profile,
|
|
|
|
/// not for specific users, sessions or queries,
|
|
|
|
/// because this setting is effectively global.
|
2018-03-11 00:15:26 +00:00
|
|
|
total_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_all_queries);
|
2018-02-01 17:55:08 +00:00
|
|
|
total_memory_tracker.setDescription("(total)");
|
|
|
|
|
|
|
|
/// Track memory usage for all simultaneously running queries from single user.
|
|
|
|
user_process_list.user_memory_tracker.setParent(&total_memory_tracker);
|
2018-05-14 16:09:00 +00:00
|
|
|
user_process_list.user_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_user);
|
2018-02-01 17:55:08 +00:00
|
|
|
user_process_list.user_memory_tracker.setDescription("(for user)");
|
|
|
|
|
|
|
|
/// Query-level memory tracker is already set in the QueryStatus constructor
|
|
|
|
|
2018-05-29 18:14:31 +00:00
|
|
|
/// Attach master thread
|
|
|
|
CurrentThread::attachQuery(&*process_it);
|
2018-05-31 15:54:08 +00:00
|
|
|
CurrentThread::getMemoryTracker().setOrRaiseLimit(settings.max_memory_usage);
|
|
|
|
CurrentThread::getMemoryTracker().setDescription("(for thread)");
|
2017-08-29 13:23:04 +00:00
|
|
|
|
2018-05-14 16:09:00 +00:00
|
|
|
if (!user_process_list.user_throttler)
|
2017-08-29 13:23:04 +00:00
|
|
|
{
|
2018-05-14 16:09:00 +00:00
|
|
|
if (settings.max_network_bandwidth_for_user)
|
|
|
|
user_process_list.user_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_user, total_network_throttler);
|
|
|
|
else if (settings.max_network_bandwidth_for_all_users)
|
|
|
|
user_process_list.user_throttler = total_network_throttler;
|
2017-08-29 13:23:04 +00:00
|
|
|
}
|
2018-03-09 23:04:26 +00:00
|
|
|
}
|
2017-08-29 13:23:04 +00:00
|
|
|
|
2018-03-29 13:24:36 +00:00
|
|
|
if (!total_network_throttler && settings.max_network_bandwidth_for_all_users)
|
2018-03-09 23:04:26 +00:00
|
|
|
{
|
2018-03-29 13:24:36 +00:00
|
|
|
total_network_throttler = std::make_shared<Throttler>(settings.max_network_bandwidth_for_all_users);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
2015-06-21 06:06:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ProcessListEntry::~ProcessListEntry()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Destroy all streams to avoid long lock of ProcessList
|
|
|
|
it->releaseQueryStreams();
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
/// Finalize all threads statuses
|
|
|
|
{
|
2018-05-29 18:14:31 +00:00
|
|
|
CurrentThread::detachQuery();
|
2018-03-01 16:52:24 +00:00
|
|
|
|
2018-05-31 15:54:08 +00:00
|
|
|
std::shared_lock lock(it->threads_mutex);
|
2018-02-01 17:55:08 +00:00
|
|
|
for (auto & elem : it->thread_statuses)
|
2018-06-01 19:39:32 +00:00
|
|
|
elem.second->clean();
|
2018-02-01 17:55:08 +00:00
|
|
|
}
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
std::lock_guard<std::mutex> lock(parent.mutex);
|
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
String user = it->getClientInfo().current_user;
|
|
|
|
String query_id = it->getClientInfo().current_query_id;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-05-14 16:09:00 +00:00
|
|
|
const QueryStatus * process_list_element_ptr = &*it;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-02 17:37:49 +00:00
|
|
|
/// This removes the memory_tracker of one request.
|
2018-02-01 17:55:08 +00:00
|
|
|
parent.processes.erase(it);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-03-09 22:11:42 +00:00
|
|
|
auto user_process_list_it = parent.user_to_queries.find(user);
|
2018-03-09 23:04:26 +00:00
|
|
|
if (user_process_list_it == parent.user_to_queries.end())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-03-09 23:04:26 +00:00
|
|
|
LOG_ERROR(&Logger::get("ProcessList"), "Logical error: cannot find user in ProcessList");
|
|
|
|
std::terminate();
|
|
|
|
}
|
2018-03-09 22:11:42 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
ProcessListForUser & user_process_list = user_process_list_it->second;
|
2018-03-09 22:11:42 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
bool found = false;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
auto range = user_process_list.queries.equal_range(query_id);
|
|
|
|
if (range.first != range.second)
|
|
|
|
{
|
|
|
|
for (auto it = range.first; it != range.second; ++it)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-03-09 23:04:26 +00:00
|
|
|
if (it->second == process_list_element_ptr)
|
2018-03-09 22:11:42 +00:00
|
|
|
{
|
2018-03-09 23:04:26 +00:00
|
|
|
user_process_list.queries.erase(it);
|
|
|
|
found = true;
|
|
|
|
break;
|
2018-03-09 22:11:42 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2018-03-09 23:04:26 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
if (!found)
|
|
|
|
{
|
|
|
|
LOG_ERROR(&Logger::get("ProcessList"), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser");
|
|
|
|
std::terminate();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
2018-05-14 16:09:00 +00:00
|
|
|
parent.have_space.notify_one();
|
|
|
|
|
2018-03-09 23:04:26 +00:00
|
|
|
/// If there are no more queries for the user, then we will reset memory tracker and network throttler.
|
|
|
|
if (user_process_list.queries.empty())
|
|
|
|
user_process_list.reset();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-04-02 17:37:49 +00:00
|
|
|
/// This removes memory_tracker for all requests. At this time, no other memory_trackers live.
|
2018-02-01 17:55:08 +00:00
|
|
|
if (parent.processes.size() == 0)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-04-02 17:37:49 +00:00
|
|
|
/// Reset MemoryTracker, similarly (see above).
|
2017-04-01 07:20:54 +00:00
|
|
|
parent.total_memory_tracker.logPeakMemoryUsage();
|
|
|
|
parent.total_memory_tracker.reset();
|
2018-03-29 13:24:36 +00:00
|
|
|
parent.total_network_throttler.reset();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2015-06-21 06:06:04 +00:00
|
|
|
}
|
|
|
|
|
2015-09-04 20:52:00 +00:00
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
QueryStatus::QueryStatus(
|
|
|
|
const String & query_,
|
|
|
|
const ClientInfo & client_info_,
|
|
|
|
size_t max_memory_usage,
|
|
|
|
double memory_tracker_fault_probability,
|
|
|
|
QueryPriorities::Handle && priority_handle_)
|
|
|
|
:
|
|
|
|
query(query_),
|
|
|
|
client_info(client_info_),
|
|
|
|
priority_handle(std::move(priority_handle_)),
|
|
|
|
performance_counters(ProfileEvents::Level::Process),
|
|
|
|
num_queries_increment{CurrentMetrics::Query}
|
|
|
|
{
|
|
|
|
memory_tracker.setOrRaiseLimit(max_memory_usage);
|
|
|
|
memory_tracker.setDescription("(for query)");
|
|
|
|
|
|
|
|
if (memory_tracker_fault_probability)
|
|
|
|
memory_tracker.setFaultProbability(memory_tracker_fault_probability);
|
|
|
|
}
|
|
|
|
|
2018-05-31 15:54:08 +00:00
|
|
|
QueryStatus::~QueryStatus()
|
|
|
|
{
|
|
|
|
LOG_DEBUG(&Poco::Logger::get("QueryStatus"), __PRETTY_FUNCTION__ << ":" << __LINE__);
|
|
|
|
}
|
2018-02-01 17:55:08 +00:00
|
|
|
|
|
|
|
void QueryStatus::setQueryStreams(const BlockIO & io)
|
2016-11-30 17:31:05 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::lock_guard<std::mutex> lock(query_streams_mutex);
|
2017-02-07 10:40:29 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
query_stream_in = io.in;
|
|
|
|
query_stream_out = io.out;
|
2018-04-17 15:16:32 +00:00
|
|
|
query_streams_status = QueryStreamsStatus::Initialized;
|
2017-02-03 16:15:12 +00:00
|
|
|
}
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
void QueryStatus::releaseQueryStreams()
|
2017-02-03 16:15:12 +00:00
|
|
|
{
|
2018-04-17 15:16:32 +00:00
|
|
|
BlockInputStreamPtr in;
|
|
|
|
BlockOutputStreamPtr out;
|
2017-02-03 16:15:12 +00:00
|
|
|
|
2018-04-17 15:16:32 +00:00
|
|
|
{
|
|
|
|
std::lock_guard<std::mutex> lock(query_streams_mutex);
|
|
|
|
|
|
|
|
query_streams_status = QueryStreamsStatus::Released;
|
|
|
|
in = std::move(query_stream_in);
|
|
|
|
out = std::move(query_stream_out);
|
|
|
|
}
|
2017-02-03 16:15:12 +00:00
|
|
|
|
2018-04-17 15:16:32 +00:00
|
|
|
/// Destroy streams outside the mutex lock
|
2017-02-03 16:15:12 +00:00
|
|
|
}
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
bool QueryStatus::streamsAreReleased()
|
2017-02-03 16:15:12 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::lock_guard<std::mutex> lock(query_streams_mutex);
|
2017-02-03 16:15:12 +00:00
|
|
|
|
2018-04-17 15:16:32 +00:00
|
|
|
return query_streams_status == QueryStreamsStatus::Released;
|
2016-11-30 17:31:05 +00:00
|
|
|
}
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
bool QueryStatus::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const
|
2016-11-30 17:31:05 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::lock_guard<std::mutex> lock(query_streams_mutex);
|
2017-02-03 16:15:12 +00:00
|
|
|
|
2018-04-17 15:16:32 +00:00
|
|
|
if (query_streams_status != QueryStreamsStatus::Initialized)
|
2017-04-01 07:20:54 +00:00
|
|
|
return false;
|
2016-11-30 17:31:05 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
in = query_stream_in;
|
|
|
|
out = query_stream_out;
|
|
|
|
return true;
|
2016-11-30 17:31:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
|
|
|
|
{
|
|
|
|
user_process_list = user_process_list_;
|
2018-05-31 15:54:08 +00:00
|
|
|
performance_counters.setParent(&user_process_list->user_performance_counters);
|
2018-02-01 17:55:08 +00:00
|
|
|
memory_tracker.setParent(&user_process_list->user_memory_tracker);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-05-14 16:09:00 +00:00
|
|
|
ThrottlerPtr QueryStatus::getUserNetworkThrottler()
|
2015-09-04 20:52:00 +00:00
|
|
|
{
|
2018-03-09 23:04:26 +00:00
|
|
|
if (!user_process_list)
|
|
|
|
return {};
|
|
|
|
return user_process_list->user_throttler;
|
2015-09-04 20:52:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
QueryStatus * ProcessList::tryGetProcessListElement(const String & current_query_id, const String & current_user)
|
2017-01-24 15:11:36 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
auto user_it = user_to_queries.find(current_user);
|
|
|
|
if (user_it != user_to_queries.end())
|
|
|
|
{
|
|
|
|
const auto & user_queries = user_it->second.queries;
|
|
|
|
auto query_it = user_queries.find(current_query_id);
|
2017-01-24 15:11:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (query_it != user_queries.end())
|
|
|
|
return query_it->second;
|
|
|
|
}
|
2017-01-24 15:11:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return nullptr;
|
2017-01-24 15:11:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-03-05 21:09:39 +00:00
|
|
|
ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill)
|
2016-11-30 17:31:05 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
2016-11-30 17:31:05 +00:00
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
QueryStatus * elem = tryGetProcessListElement(current_query_id, current_user);
|
2017-01-24 15:11:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!elem)
|
|
|
|
return CancellationCode::NotFound;
|
2017-01-24 15:11:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Streams are destroyed, and ProcessListElement will be deleted from ProcessList soon. We need wait a little bit
|
|
|
|
if (elem->streamsAreReleased())
|
|
|
|
return CancellationCode::CancelSent;
|
2017-02-07 10:40:29 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
BlockInputStreamPtr input_stream;
|
|
|
|
BlockOutputStreamPtr output_stream;
|
2016-11-30 17:31:05 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (elem->tryGetQueryStreams(input_stream, output_stream))
|
|
|
|
{
|
2017-09-07 21:04:48 +00:00
|
|
|
IProfilingBlockInputStream * input_stream_casted;
|
2017-04-01 07:20:54 +00:00
|
|
|
if (input_stream && (input_stream_casted = dynamic_cast<IProfilingBlockInputStream *>(input_stream.get())))
|
|
|
|
{
|
2018-03-05 21:09:39 +00:00
|
|
|
input_stream_casted->cancel(kill);
|
2017-04-01 07:20:54 +00:00
|
|
|
return CancellationCode::CancelSent;
|
|
|
|
}
|
|
|
|
return CancellationCode::CancelCannotBeSent;
|
|
|
|
}
|
2017-02-03 16:15:12 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return CancellationCode::QueryIsNotInitializedYet;
|
2016-11-30 17:31:05 +00:00
|
|
|
}
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
|
2018-05-17 16:01:41 +00:00
|
|
|
QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const
|
|
|
|
{
|
|
|
|
QueryStatusInfo res;
|
|
|
|
|
|
|
|
res.query = query;
|
|
|
|
res.client_info = client_info;
|
|
|
|
res.elapsed_seconds = watch.elapsedSeconds();
|
|
|
|
res.is_cancelled = is_killed.load(std::memory_order_relaxed);
|
|
|
|
res.read_rows = progress_in.rows;
|
|
|
|
res.read_bytes = progress_in.bytes;
|
|
|
|
res.total_rows = progress_in.total_rows;
|
|
|
|
res.written_rows = progress_out.rows;
|
|
|
|
res.written_bytes = progress_out.bytes;
|
|
|
|
res.memory_usage = memory_tracker.get();
|
|
|
|
res.peak_memory_usage = memory_tracker.getPeak();
|
|
|
|
|
|
|
|
if (get_thread_list)
|
|
|
|
{
|
2018-05-31 15:54:08 +00:00
|
|
|
std::shared_lock lock(threads_mutex);
|
2018-05-17 16:01:41 +00:00
|
|
|
res.thread_numbers.reserve(thread_statuses.size());
|
|
|
|
|
|
|
|
for (auto & thread_status_elem : thread_statuses)
|
2018-06-01 19:39:32 +00:00
|
|
|
res.thread_numbers.emplace_back(thread_status_elem.second->thread_number);
|
2018-05-17 16:01:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (get_profile_events)
|
2018-06-01 19:39:32 +00:00
|
|
|
res.profile_counters = std::make_shared<ProfileEvents::Counters>(performance_counters.getPartiallyAtomicSnapshot());
|
2018-05-17 16:01:41 +00:00
|
|
|
|
|
|
|
if (get_settings && query_context)
|
|
|
|
res.query_settings = std::make_shared<Settings>(query_context->getSettingsRef());
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_events, bool get_settings) const
|
|
|
|
{
|
|
|
|
Info per_query_infos;
|
|
|
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
per_query_infos.reserve(processes.size());
|
|
|
|
for (const auto & process : processes)
|
|
|
|
per_query_infos.emplace_back(process.getInfo(get_thread_list, get_profile_events, get_settings));
|
|
|
|
|
|
|
|
return per_query_infos;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2018-02-01 17:55:08 +00:00
|
|
|
ProcessListForUser::ProcessListForUser()
|
|
|
|
: user_performance_counters(ProfileEvents::Level::User, &ProfileEvents::global_counters)
|
|
|
|
{}
|
|
|
|
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
}
|