Merge pull request #26675 from Algunenano/kill_better

Handle KILL requests while running pipeline executors
This commit is contained in:
Nikolai Kochetov 2021-08-02 17:37:54 +03:00 committed by GitHub
commit d63a5e1c96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 101 additions and 2 deletions

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTKillQueryQuery.h> #include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/queryNormalization.h> #include <Parsers/queryNormalization.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
@ -297,7 +298,10 @@ QueryStatus::QueryStatus(
{ {
} }
QueryStatus::~QueryStatus() = default; QueryStatus::~QueryStatus()
{
assert(executors.empty());
}
void QueryStatus::setQueryStreams(const BlockIO & io) void QueryStatus::setQueryStreams(const BlockIO & io)
{ {
@ -351,6 +355,11 @@ CancellationCode QueryStatus::cancelQuery(bool kill)
BlockInputStreamPtr input_stream; BlockInputStreamPtr input_stream;
BlockOutputStreamPtr output_stream; BlockOutputStreamPtr output_stream;
SCOPE_EXIT({
std::lock_guard lock(query_streams_mutex);
for (auto * e : executors)
e->cancel();
});
if (tryGetQueryStreams(input_stream, output_stream)) if (tryGetQueryStreams(input_stream, output_stream))
{ {
@ -366,6 +375,20 @@ CancellationCode QueryStatus::cancelQuery(bool kill)
return CancellationCode::CancelSent; return CancellationCode::CancelSent;
} }
void QueryStatus::addPipelineExecutor(PipelineExecutor * e)
{
std::lock_guard lock(query_streams_mutex);
assert(std::find(executors.begin(), executors.end(), e) == executors.end());
executors.push_back(e);
}
void QueryStatus::removePipelineExecutor(PipelineExecutor * e)
{
std::lock_guard lock(query_streams_mutex);
assert(std::find(executors.begin(), executors.end(), e) != executors.end());
std::erase_if(executors, [e](PipelineExecutor * x) { return x == e; });
}
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_) void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
{ {

View File

@ -22,6 +22,7 @@
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include <unordered_map> #include <unordered_map>
#include <vector>
namespace CurrentMetrics namespace CurrentMetrics
@ -34,6 +35,7 @@ namespace DB
struct Settings; struct Settings;
class IAST; class IAST;
class PipelineExecutor;
struct ProcessListForUser; struct ProcessListForUser;
class QueryStatus; class QueryStatus;
@ -109,6 +111,9 @@ protected:
BlockInputStreamPtr query_stream_in; BlockInputStreamPtr query_stream_in;
BlockOutputStreamPtr query_stream_out; BlockOutputStreamPtr query_stream_out;
/// Array of PipelineExecutors to be cancelled when a cancelQuery is received
std::vector<PipelineExecutor *> executors;
enum QueryStreamsStatus enum QueryStreamsStatus
{ {
NotInitialized, NotInitialized,
@ -183,6 +188,12 @@ public:
CancellationCode cancelQuery(bool kill); CancellationCode cancelQuery(bool kill);
bool isKilled() const { return is_killed; } bool isKilled() const { return is_killed; }
/// Adds a pipeline to the QueryStatus
void addPipelineExecutor(PipelineExecutor * e);
/// Removes a pipeline to the QueryStatus
void removePipelineExecutor(PipelineExecutor * e);
}; };

View File

@ -45,6 +45,8 @@ PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
try try
{ {
graph = std::make_unique<ExecutingGraph>(processors); graph = std::make_unique<ExecutingGraph>(processors);
if (process_list_element)
process_list_element->addPipelineExecutor(this);
} }
catch (Exception & exception) catch (Exception & exception)
{ {
@ -59,6 +61,12 @@ PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
} }
} }
PipelineExecutor::~PipelineExecutor()
{
if (process_list_element)
process_list_element->removePipelineExecutor(this);
}
void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack) void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
{ {
UInt64 num_processors = processors.size(); UInt64 num_processors = processors.size();

View File

@ -31,6 +31,7 @@ public:
/// ///
/// Explicit graph representation is built in constructor. Throws if graph is not correct. /// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(Processors & processors_, QueryStatus * elem = nullptr); explicit PipelineExecutor(Processors & processors_, QueryStatus * elem = nullptr);
~PipelineExecutor();
/// Execute pipeline in multiple threads. Must be called once. /// Execute pipeline in multiple threads. Must be called once.
/// In case of exception during execution throws any occurred. /// In case of exception during execution throws any occurred.
@ -127,7 +128,7 @@ private:
ProcessorsMap processors_map; ProcessorsMap processors_map;
/// Now it's used to check if query was killed. /// Now it's used to check if query was killed.
QueryStatus * process_list_element = nullptr; QueryStatus * const process_list_element = nullptr;
/// Graph related methods. /// Graph related methods.
bool expandPipeline(Stack & stack, UInt64 pid); bool expandPipeline(Stack & stack, UInt64 pid);

View File

@ -0,0 +1,2 @@
finished test_01948_tcp_default default SELECT * FROM\n (\n SELECT a.name as n\n FROM\n (\n SELECT \'Name\' as name, number FROM system.numbers LIMIT 2000000\n ) AS a,\n (\n SELECT \'Name\' as name, number FROM system.numbers LIMIT 2000000\n ) as b\n GROUP BY n\n )\n LIMIT 20\n FORMAT Null
finished test_01948_http_default default SELECT * FROM\n (\n SELECT a.name as n\n FROM\n (\n SELECT \'Name\' as name, number FROM system.numbers LIMIT 2000000\n ) AS a,\n (\n SELECT \'Name\' as name, number FROM system.numbers LIMIT 2000000\n ) as b\n GROUP BY n\n )\n LIMIT 20\n FORMAT Null

View File

@ -0,0 +1,54 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e -o pipefail
function wait_for_query_to_start()
{
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE query_id = '$1'") == 0 ]]; do sleep 0.1; done
}
# TCP CLIENT
$CLICKHOUSE_CLIENT --max_execution_time 10 --query_id "test_01948_tcp_$CLICKHOUSE_DATABASE" -q \
"SELECT * FROM
(
SELECT a.name as n
FROM
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) AS a,
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) as b
GROUP BY n
)
LIMIT 20
FORMAT Null" > /dev/null 2>&1 &
wait_for_query_to_start "test_01948_tcp_$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CLIENT --max_execution_time 10 -q "KILL QUERY WHERE query_id = 'test_01948_tcp_$CLICKHOUSE_DATABASE' SYNC"
# HTTP CLIENT
${CLICKHOUSE_CURL_COMMAND} -q --max-time 10 -sS "$CLICKHOUSE_URL&query_id=test_01948_http_$CLICKHOUSE_DATABASE" -d \
"SELECT * FROM
(
SELECT a.name as n
FROM
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) AS a,
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) as b
GROUP BY n
)
LIMIT 20
FORMAT Null" > /dev/null 2>&1 &
wait_for_query_to_start "test_01948_http_$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CURL --max-time 10 -sS "$CLICKHOUSE_URL" -d "KILL QUERY WHERE query_id = 'test_01948_http_$CLICKHOUSE_DATABASE' SYNC"