CLICKHOUSE-4514 Unique query_id among all users (#5430)

* CLICKHOUSE-4514 Unique query_id among all users

* try 1

* Fix

* fix

* use condvar

* fix style

* Update ProcessList.cpp
This commit is contained in:
proller 2019-06-30 16:17:27 +03:00 committed by alexey-milovidov
parent 557886ef26
commit a69990ce27
4 changed files with 50 additions and 19 deletions

View File

@ -87,10 +87,9 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
{
std::unique_lock lock(mutex);
const auto max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
if (!is_unlimited_query && max_size && processes.size() >= max_size)
{
auto max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
if (!max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(max_wait_ms), [&]{ return processes.size() < max_size; }))
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
}
@ -117,20 +116,41 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
+ ", maximum: " + settings.max_concurrent_queries_for_user.toString(),
ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
auto range = user_process_list->second.queries.equal_range(client_info.current_query_id);
if (range.first != range.second)
auto running_query = user_process_list->second.queries.find(client_info.current_query_id);
if (running_query != user_process_list->second.queries.end())
{
if (!settings.replace_running_query)
throw Exception("Query with id = " + client_info.current_query_id + " is already running.",
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
/// Ask queries to cancel. They will check this flag.
for (auto it = range.first; it != range.second; ++it)
it->second->is_killed.store(true, std::memory_order_relaxed);
}
running_query->second->is_killed.store(true, std::memory_order_relaxed);
if (!max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(max_wait_ms), [&]
{
running_query = user_process_list->second.queries.find(client_info.current_query_id);
if (running_query == user_process_list->second.queries.end())
return true;
running_query->second->is_killed.store(true, std::memory_order_relaxed);
return false;
}))
throw Exception("Query with id = " + client_info.current_query_id + " is already running and can't be stopped",
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
}
}
}
/// Check other users running query with our query_id
for (const auto & user_process_list : user_to_queries)
{
if (user_process_list.first == client_info.current_user)
continue;
if (auto running_query = user_process_list.second.queries.find(client_info.current_query_id); running_query != user_process_list.second.queries.end())
throw Exception("Query with id = " + client_info.current_query_id + " is already running by user " + user_process_list.first,
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
}
auto process_it = processes.emplace(processes.end(),
query_, client_info, settings.max_memory_usage, settings.memory_tracker_fault_probability, priorities.insert(settings.priority));
@ -226,17 +246,12 @@ ProcessListEntry::~ProcessListEntry()
bool found = false;
auto range = user_process_list.queries.equal_range(query_id);
if (range.first != range.second)
if (auto running_query = user_process_list.queries.find(query_id); running_query != user_process_list.queries.end())
{
for (auto jt = range.first; jt != range.second; ++jt)
if (running_query->second == process_list_element_ptr)
{
if (jt->second == process_list_element_ptr)
{
user_process_list.queries.erase(jt);
found = true;
break;
}
user_process_list.queries.erase(running_query->first);
found = true;
}
}
@ -245,8 +260,7 @@ ProcessListEntry::~ProcessListEntry()
LOG_ERROR(&Logger::get("ProcessList"), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser");
std::terminate();
}
parent.have_space.notify_one();
parent.have_space.notify_all();
/// If there are no more queries for the user, then we will reset memory tracker and network throttler.
if (user_process_list.queries.empty())

View File

@ -203,7 +203,7 @@ struct ProcessListForUser
ProcessListForUser();
/// query_id -> ProcessListElement(s). There can be multiple queries with the same query_id as long as all queries except one are cancelled.
using QueryToElement = std::unordered_multimap<String, QueryStatus *>;
using QueryToElement = std::unordered_map<String, QueryStatus *>;
QueryToElement queries;
ProfileEvents::Counters user_performance_counters{VariableContext::User, &ProfileEvents::global_counters};

View File

@ -9,3 +9,16 @@ $CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d
sleep 0.1 # First query (usually) should be received by the server after this sleep.
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT 0'
wait
${CLICKHOUSE_CLIENT} --user=readonly --query_id=42 --query='SELECT 1, sleep(1)' &
sleep 0.1
( ${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 43' ||: ) 2>&1 | grep -F 'is already running by user' > /dev/null
wait
${CLICKHOUSE_CLIENT} --query='SELECT 3, sleep(1)' &
sleep 0.1
${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 2, sleep(1)' &
sleep 0.1
( ${CLICKHOUSE_CLIENT} --query_id=42 --replace_running_query=1 --queue_max_wait_ms=500 --query='SELECT 43' ||: ) 2>&1 | grep -F 'cant be stopped' > /dev/null
${CLICKHOUSE_CLIENT} --query_id=42 --replace_running_query=1 --query='SELECT 44'
wait