diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index a4fe438af8f..def39d4d91c 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -87,10 +87,9 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as { std::unique_lock lock(mutex); + const auto max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds(); if (!is_unlimited_query && max_size && processes.size() >= max_size) { - auto max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds(); - if (!max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(max_wait_ms), [&]{ return processes.size() < max_size; })) throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES); } @@ -117,20 +116,41 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as + ", maximum: " + settings.max_concurrent_queries_for_user.toString(), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES); - auto range = user_process_list->second.queries.equal_range(client_info.current_query_id); - if (range.first != range.second) + auto running_query = user_process_list->second.queries.find(client_info.current_query_id); + + if (running_query != user_process_list->second.queries.end()) { if (!settings.replace_running_query) throw Exception("Query with id = " + client_info.current_query_id + " is already running.", ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); /// Ask queries to cancel. They will check this flag. - for (auto it = range.first; it != range.second; ++it) - it->second->is_killed.store(true, std::memory_order_relaxed); - } + running_query->second->is_killed.store(true, std::memory_order_relaxed); + + if (!max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(max_wait_ms), [&] + { + running_query = user_process_list->second.queries.find(client_info.current_query_id); + if (running_query == user_process_list->second.queries.end()) + return true; + running_query->second->is_killed.store(true, std::memory_order_relaxed); + return false; + })) + throw Exception("Query with id = " + client_info.current_query_id + " is already running and can't be stopped", + ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); + } } } + /// Check other users running query with our query_id + for (const auto & user_process_list : user_to_queries) + { + if (user_process_list.first == client_info.current_user) + continue; + if (auto running_query = user_process_list.second.queries.find(client_info.current_query_id); running_query != user_process_list.second.queries.end()) + throw Exception("Query with id = " + client_info.current_query_id + " is already running by user " + user_process_list.first, + ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); + } + auto process_it = processes.emplace(processes.end(), query_, client_info, settings.max_memory_usage, settings.memory_tracker_fault_probability, priorities.insert(settings.priority)); @@ -226,17 +246,12 @@ ProcessListEntry::~ProcessListEntry() bool found = false; - auto range = user_process_list.queries.equal_range(query_id); - if (range.first != range.second) + if (auto running_query = user_process_list.queries.find(query_id); running_query != user_process_list.queries.end()) { - for (auto jt = range.first; jt != range.second; ++jt) + if (running_query->second == process_list_element_ptr) { - if (jt->second == process_list_element_ptr) - { - user_process_list.queries.erase(jt); - found = true; - break; - } + user_process_list.queries.erase(running_query->first); + found = true; } } @@ -245,8 +260,7 @@ ProcessListEntry::~ProcessListEntry() LOG_ERROR(&Logger::get("ProcessList"), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser"); std::terminate(); } - - parent.have_space.notify_one(); + parent.have_space.notify_all(); /// If there are no more queries for the user, then we will reset memory tracker and network throttler. if (user_process_list.queries.empty()) diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index 32f59749450..b75a4e7a730 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -203,7 +203,7 @@ struct ProcessListForUser ProcessListForUser(); /// query_id -> ProcessListElement(s). There can be multiple queries with the same query_id as long as all queries except one are cancelled. - using QueryToElement = std::unordered_multimap; + using QueryToElement = std::unordered_map; QueryToElement queries; ProfileEvents::Counters user_performance_counters{VariableContext::User, &ProfileEvents::global_counters}; diff --git a/dbms/tests/queries/0_stateless/00600_replace_running_query.reference b/dbms/tests/queries/0_stateless/00600_replace_running_query.reference index 573541ac970..237dd6b5309 100644 --- a/dbms/tests/queries/0_stateless/00600_replace_running_query.reference +++ b/dbms/tests/queries/0_stateless/00600_replace_running_query.reference @@ -1 +1,5 @@ 0 +1 0 +3 0 +2 0 +44 diff --git a/dbms/tests/queries/0_stateless/00600_replace_running_query.sh b/dbms/tests/queries/0_stateless/00600_replace_running_query.sh index 6778bbce149..abe5dd69b8f 100755 --- a/dbms/tests/queries/0_stateless/00600_replace_running_query.sh +++ b/dbms/tests/queries/0_stateless/00600_replace_running_query.sh @@ -9,3 +9,16 @@ $CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d sleep 0.1 # First query (usually) should be received by the server after this sleep. $CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT 0' wait + +${CLICKHOUSE_CLIENT} --user=readonly --query_id=42 --query='SELECT 1, sleep(1)' & +sleep 0.1 +( ${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 43' ||: ) 2>&1 | grep -F 'is already running by user' > /dev/null +wait + +${CLICKHOUSE_CLIENT} --query='SELECT 3, sleep(1)' & +sleep 0.1 +${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 2, sleep(1)' & +sleep 0.1 +( ${CLICKHOUSE_CLIENT} --query_id=42 --replace_running_query=1 --queue_max_wait_ms=500 --query='SELECT 43' ||: ) 2>&1 | grep -F 'cant be stopped' > /dev/null +${CLICKHOUSE_CLIENT} --query_id=42 --replace_running_query=1 --query='SELECT 44' +wait