2015-06-21 06:06:04 +00:00
|
|
|
|
#include <DB/Interpreters/ProcessList.h>
|
2017-01-21 04:24:28 +00:00
|
|
|
|
#include <DB/Interpreters/Settings.h>
|
|
|
|
|
#include <DB/Common/Exception.h>
|
|
|
|
|
#include <DB/IO/WriteHelpers.h>
|
2016-11-30 17:31:05 +00:00
|
|
|
|
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
2017-01-21 04:24:28 +00:00
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
|
namespace ErrorCodes
|
|
|
|
|
{
|
|
|
|
|
extern const int TOO_MUCH_SIMULTANEOUS_QUERIES;
|
|
|
|
|
extern const int QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING;
|
|
|
|
|
}
|
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
|
|
|
|
|
|
ProcessList::EntryPtr ProcessList::insert(
|
2016-10-24 21:40:39 +00:00
|
|
|
|
const String & query_, const ClientInfo & client_info, const Settings & settings)
|
2015-06-21 06:06:04 +00:00
|
|
|
|
{
|
|
|
|
|
EntryPtr res;
|
|
|
|
|
|
|
|
|
|
{
|
2016-05-28 10:15:36 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
2015-06-21 06:06:04 +00:00
|
|
|
|
|
2015-09-08 21:01:43 +00:00
|
|
|
|
if (max_size && cur_size >= max_size
|
|
|
|
|
&& (!settings.queue_max_wait_ms.totalMilliseconds() || !have_space.tryWait(mutex, settings.queue_max_wait_ms.totalMilliseconds())))
|
2015-06-21 06:06:04 +00:00
|
|
|
|
throw Exception("Too much simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES);
|
|
|
|
|
|
2016-10-25 05:07:29 +00:00
|
|
|
|
/** Why we use current user?
|
|
|
|
|
* Because initial one is passed by client and credentials for it is not verified,
|
|
|
|
|
* and using initial_user for limits will be insecure.
|
2016-10-24 22:46:27 +00:00
|
|
|
|
*
|
2016-10-25 05:07:29 +00:00
|
|
|
|
* Why we use current_query_id?
|
|
|
|
|
* Because we want to allow distributed queries that will run multiple secondary queries on same server,
|
2016-10-24 22:46:27 +00:00
|
|
|
|
* like SELECT count() FROM remote('127.0.0.{1,2}', system.numbers)
|
|
|
|
|
* so they must have different query_ids.
|
|
|
|
|
*/
|
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
|
{
|
2016-11-30 17:31:05 +00:00
|
|
|
|
auto user_process_list = user_to_queries.find(client_info.current_user);
|
2015-06-21 06:06:04 +00:00
|
|
|
|
|
2016-01-13 02:38:30 +00:00
|
|
|
|
if (user_process_list != user_to_queries.end())
|
2015-06-21 06:06:04 +00:00
|
|
|
|
{
|
2016-11-30 17:31:05 +00:00
|
|
|
|
if (settings.max_concurrent_queries_for_user
|
|
|
|
|
&& user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user)
|
2016-10-25 05:07:29 +00:00
|
|
|
|
throw Exception("Too much simultaneous queries for user " + client_info.current_user
|
2016-01-13 02:38:30 +00:00
|
|
|
|
+ ". Current: " + toString(user_process_list->second.queries.size())
|
|
|
|
|
+ ", maximum: " + toString(settings.max_concurrent_queries_for_user),
|
|
|
|
|
ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES);
|
|
|
|
|
|
2016-10-24 22:46:27 +00:00
|
|
|
|
if (!client_info.current_query_id.empty())
|
2015-06-21 06:06:04 +00:00
|
|
|
|
{
|
2016-11-30 17:31:05 +00:00
|
|
|
|
auto element = user_process_list->second.queries.find(client_info.current_query_id);
|
2016-01-13 02:38:30 +00:00
|
|
|
|
if (element != user_process_list->second.queries.end())
|
|
|
|
|
{
|
|
|
|
|
if (!settings.replace_running_query)
|
2016-10-24 22:46:27 +00:00
|
|
|
|
throw Exception("Query with id = " + client_info.current_query_id + " is already running.",
|
2016-01-13 02:38:30 +00:00
|
|
|
|
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
|
|
|
|
|
|
|
|
|
|
element->second->is_cancelled = true;
|
|
|
|
|
/// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены.
|
|
|
|
|
user_process_list->second.queries.erase(element);
|
|
|
|
|
}
|
2015-06-21 06:06:04 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
++cur_size;
|
|
|
|
|
|
2016-09-23 20:33:06 +00:00
|
|
|
|
res = std::make_shared<Entry>(*this, cont.emplace(cont.end(),
|
2016-10-24 21:40:39 +00:00
|
|
|
|
query_, client_info,
|
2015-12-23 07:39:28 +00:00
|
|
|
|
settings.limits.max_memory_usage, settings.memory_tracker_fault_probability,
|
2016-09-23 20:33:06 +00:00
|
|
|
|
priorities.insert(settings.priority)));
|
2015-06-21 06:06:04 +00:00
|
|
|
|
|
2016-10-24 22:46:27 +00:00
|
|
|
|
if (!client_info.current_query_id.empty())
|
2016-01-13 02:38:30 +00:00
|
|
|
|
{
|
2016-10-25 05:07:29 +00:00
|
|
|
|
ProcessListForUser & user_process_list = user_to_queries[client_info.current_user];
|
2016-10-24 22:46:27 +00:00
|
|
|
|
user_process_list.queries[client_info.current_query_id] = &res->get();
|
2016-01-13 02:38:30 +00:00
|
|
|
|
|
|
|
|
|
if (current_memory_tracker)
|
|
|
|
|
{
|
|
|
|
|
/// Отслеживаем суммарное потребление оперативки на одновременно выполняющиеся запросы одного пользователя.
|
|
|
|
|
user_process_list.user_memory_tracker.setLimit(settings.limits.max_memory_usage_for_user);
|
|
|
|
|
user_process_list.user_memory_tracker.setDescription("(for user)");
|
|
|
|
|
current_memory_tracker->setNext(&user_process_list.user_memory_tracker);
|
2016-01-13 03:59:24 +00:00
|
|
|
|
|
|
|
|
|
/// Отслеживаем суммарное потребление оперативки на все одновременно выполняющиеся запросы.
|
|
|
|
|
total_memory_tracker.setLimit(settings.limits.max_memory_usage_for_all_queries);
|
|
|
|
|
total_memory_tracker.setDescription("(total)");
|
|
|
|
|
user_process_list.user_memory_tracker.setNext(&total_memory_tracker);
|
2016-01-13 02:38:30 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
2015-06-21 06:06:04 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ProcessListEntry::~ProcessListEntry()
|
|
|
|
|
{
|
2016-05-28 10:15:36 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(parent.mutex);
|
2015-06-21 06:06:04 +00:00
|
|
|
|
|
2016-01-13 03:59:24 +00:00
|
|
|
|
/// Важен порядок удаления memory_tracker-ов.
|
|
|
|
|
|
2016-10-25 05:07:29 +00:00
|
|
|
|
String user = it->client_info.current_user;
|
2016-10-24 22:46:27 +00:00
|
|
|
|
String query_id = it->client_info.current_query_id;
|
2016-01-18 21:33:05 +00:00
|
|
|
|
bool is_cancelled = it->is_cancelled;
|
|
|
|
|
|
2016-01-13 03:59:24 +00:00
|
|
|
|
/// Здесь удаляется memory_tracker одного запроса.
|
|
|
|
|
parent.cont.erase(it);
|
|
|
|
|
|
2016-01-18 21:33:05 +00:00
|
|
|
|
ProcessList::UserToQueries::iterator user_process_list = parent.user_to_queries.find(user);
|
2016-01-13 03:59:24 +00:00
|
|
|
|
if (user_process_list != parent.user_to_queries.end())
|
2015-06-21 06:06:04 +00:00
|
|
|
|
{
|
2016-01-13 03:59:24 +00:00
|
|
|
|
/// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены, а не здесь.
|
2016-01-18 21:33:05 +00:00
|
|
|
|
if (!is_cancelled && !query_id.empty())
|
2015-06-21 06:06:04 +00:00
|
|
|
|
{
|
2016-01-18 21:33:05 +00:00
|
|
|
|
ProcessListForUser::QueryToElement::iterator element = user_process_list->second.queries.find(query_id);
|
2016-01-13 02:38:30 +00:00
|
|
|
|
if (element != user_process_list->second.queries.end())
|
|
|
|
|
user_process_list->second.queries.erase(element);
|
2015-06-21 06:06:04 +00:00
|
|
|
|
}
|
2016-01-13 03:59:24 +00:00
|
|
|
|
|
|
|
|
|
/// Здесь удаляется memory_tracker на пользователя. В это время, ссылающийся на него memory_tracker одного запроса не живёт.
|
|
|
|
|
|
|
|
|
|
/// Если запросов для пользователя больше нет, то удаляем запись.
|
|
|
|
|
/// При этом также очищается MemoryTracker на пользователя, и сообщение о потреблении памяти выводится в лог.
|
|
|
|
|
/// Важно иногда сбрасывать MemoryTracker, так как в нём может накапливаться смещённость
|
|
|
|
|
/// в следствие того, что есть случаи, когда память может быть выделена при обработке запроса, а освобождена - позже.
|
|
|
|
|
if (user_process_list->second.queries.empty())
|
|
|
|
|
parent.user_to_queries.erase(user_process_list);
|
2015-06-21 06:06:04 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
--parent.cur_size;
|
|
|
|
|
parent.have_space.signal();
|
2016-01-13 03:59:24 +00:00
|
|
|
|
|
|
|
|
|
/// Здесь удаляется memory_tracker на все запросы. В это время никакие другие memory_tracker-ы не живут.
|
|
|
|
|
if (parent.cur_size == 0)
|
|
|
|
|
{
|
|
|
|
|
/// Сбрасываем MemoryTracker, аналогично (см. выше).
|
|
|
|
|
parent.total_memory_tracker.logPeakMemoryUsage();
|
|
|
|
|
parent.total_memory_tracker.reset();
|
|
|
|
|
}
|
2015-06-21 06:06:04 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-09-04 20:52:00 +00:00
|
|
|
|
|
2016-11-30 17:31:05 +00:00
|
|
|
|
void ProcessListElement::setQueryStreams(const BlockIO & io)
|
|
|
|
|
{
|
|
|
|
|
query_stream_in = io.in;
|
|
|
|
|
query_stream_out = io.out;
|
|
|
|
|
query_streams_initialized = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool ProcessListElement::tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const
|
|
|
|
|
{
|
|
|
|
|
if (!query_streams_initialized)
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
in = query_stream_in;
|
|
|
|
|
out = query_stream_out;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-09-04 20:52:00 +00:00
|
|
|
|
void ProcessList::addTemporaryTable(ProcessListElement & elem, const String & table_name, StoragePtr storage)
|
|
|
|
|
{
|
2016-05-28 10:15:36 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
2015-09-04 20:52:00 +00:00
|
|
|
|
|
|
|
|
|
elem.temporary_tables[table_name] = storage;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StoragePtr ProcessList::tryGetTemporaryTable(const String & query_id, const String & table_name) const
|
|
|
|
|
{
|
2016-05-28 10:15:36 +00:00
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
2015-09-04 20:52:00 +00:00
|
|
|
|
|
|
|
|
|
/// NOTE Ищем по всем user-ам. То есть, нет изоляции, и сложность O(users).
|
|
|
|
|
for (const auto & user_queries : user_to_queries)
|
|
|
|
|
{
|
2016-01-13 02:38:30 +00:00
|
|
|
|
auto it = user_queries.second.queries.find(query_id);
|
|
|
|
|
if (user_queries.second.queries.end() == it)
|
2015-09-04 20:52:00 +00:00
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
auto jt = (*it->second).temporary_tables.find(table_name);
|
|
|
|
|
if ((*it->second).temporary_tables.end() == jt)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
return jt->second;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return {};
|
|
|
|
|
}
|
|
|
|
|
|
2016-11-30 17:31:05 +00:00
|
|
|
|
|
|
|
|
|
ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & current_query_id, const String & current_user)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
BlockInputStreamPtr input_stream;
|
|
|
|
|
BlockOutputStreamPtr output_stream;
|
|
|
|
|
IProfilingBlockInputStream * input_stream_casted;
|
|
|
|
|
|
|
|
|
|
for (const auto & elem : cont)
|
|
|
|
|
{
|
|
|
|
|
if (elem.client_info.current_query_id == current_query_id && elem.client_info.current_user == current_user)
|
|
|
|
|
{
|
|
|
|
|
if (elem.tryGetQueryStreams(input_stream, output_stream))
|
|
|
|
|
{
|
|
|
|
|
if (input_stream && (input_stream_casted = dynamic_cast<IProfilingBlockInputStream *>(input_stream.get())))
|
|
|
|
|
{
|
|
|
|
|
input_stream_casted->cancel();
|
|
|
|
|
return CancellationCode::CancelSended;
|
|
|
|
|
}
|
|
|
|
|
return CancellationCode::CancelCannotBeSended;
|
|
|
|
|
}
|
|
|
|
|
return CancellationCode::QueryIsNotInitializedYet;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return CancellationCode::NotFound;
|
|
|
|
|
}
|
|
|
|
|
|
2015-06-21 06:06:04 +00:00
|
|
|
|
}
|