mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #3517 from yandex/kill_pending_queries
CLICKHOUSE-4094: Add ability to kill pending queries
This commit is contained in:
commit
c19b1d919e
@ -23,6 +23,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int READONLY;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CANNOT_KILL;
|
||||
}
|
||||
|
||||
|
||||
@ -138,13 +139,17 @@ public:
|
||||
|
||||
auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user, true);
|
||||
|
||||
if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSent)
|
||||
/// Raise exception if this query is immortal, user have to know
|
||||
/// This could happen only if query generate streams that don't implement IProfilingBlockInputStream
|
||||
if (code == CancellationCode::CancelCannotBeSent)
|
||||
throw Exception("Can't kill query '" + curr_process.query_id + "' it consits of unkillable stages", ErrorCodes::CANNOT_KILL);
|
||||
else if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSent)
|
||||
{
|
||||
curr_process.processed = true;
|
||||
insertResultRow(curr_process.source_num, code, processes_block, res_sample_block, columns);
|
||||
++num_processed_queries;
|
||||
}
|
||||
/// Wait if QueryIsNotInitializedYet or CancelSent
|
||||
/// Wait if CancelSent
|
||||
}
|
||||
|
||||
/// KILL QUERY could be killed also
|
||||
@ -194,6 +199,12 @@ BlockIO InterpreterKillQueryQuery::execute()
|
||||
for (const auto & query_desc : queries_to_stop)
|
||||
{
|
||||
auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user, true);
|
||||
|
||||
/// Raise exception if this query is immortal, user have to know
|
||||
/// This could happen only if query generate streams that don't implement IProfilingBlockInputStream
|
||||
if (code == CancellationCode::CancelCannotBeSent)
|
||||
throw Exception("Can't kill query '" + query_desc.query_id + "' it consits of unkillable stages", ErrorCodes::CANNOT_KILL);
|
||||
|
||||
insertResultRow(query_desc.source_num, code, processes_block, header, res_columns);
|
||||
}
|
||||
|
||||
|
@ -382,8 +382,9 @@ ProcessList::CancellationCode ProcessList::sendCancelToQuery(const String & curr
|
||||
}
|
||||
return CancellationCode::CancelCannotBeSent;
|
||||
}
|
||||
|
||||
return CancellationCode::QueryIsNotInitializedYet;
|
||||
/// Query is not even started
|
||||
elem->is_killed.store(true);
|
||||
return CancellationCode::CancelSent;
|
||||
}
|
||||
|
||||
|
||||
|
@ -191,6 +191,8 @@ public:
|
||||
|
||||
/// Get query in/out pointers from BlockIO
|
||||
bool tryGetQueryStreams(BlockInputStreamPtr & in, BlockOutputStreamPtr & out) const;
|
||||
|
||||
bool isKilled() const { return is_killed; }
|
||||
};
|
||||
|
||||
|
||||
|
@ -33,6 +33,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int QUERY_IS_TOO_LARGE;
|
||||
extern const int INTO_OUTFILE_NOT_ALLOWED;
|
||||
extern const int QUERY_WAS_CANCELLED;
|
||||
}
|
||||
|
||||
|
||||
@ -204,9 +205,15 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
auto interpreter = InterpreterFactory::get(ast, context, stage);
|
||||
res = interpreter->execute();
|
||||
|
||||
/// Delayed initialization of query streams (required for KILL QUERY purposes)
|
||||
if (process_list_entry)
|
||||
{
|
||||
/// Query was killed before execution
|
||||
if ((*process_list_entry)->isKilled())
|
||||
throw Exception("Query '" + (*process_list_entry)->getInfo().client_info.current_query_id + "' is killed in pending state",
|
||||
ErrorCodes::QUERY_WAS_CANCELLED);
|
||||
else
|
||||
(*process_list_entry)->setQueryStreams(res);
|
||||
}
|
||||
|
||||
/// Hold element of process list till end of query execution.
|
||||
res.process_list_entry = process_list_entry;
|
||||
|
@ -1,2 +1,3 @@
|
||||
SELECT sleep(1) FROM system.numbers LIMIT 4
|
||||
SELECT sleep(1) FROM system.numbers LIMIT 5
|
||||
0
|
||||
|
@ -19,3 +19,30 @@ $CLICKHOUSE_CLIENT -q "KILL QUERY WHERE 0 ASYNC"
|
||||
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE 0 FORMAT TabSeparated"
|
||||
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE 0 SYNC FORMAT TabSeparated"
|
||||
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE 1 TEST" &>/dev/null
|
||||
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.cannot_kill_query"
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE test.cannot_kill_query (x UInt64) ENGINE = MergeTree ORDER BY x" &> /dev/null
|
||||
$CLICKHOUSE_CLIENT -q "INSERT INTO test.cannot_kill_query SELECT * FROM numbers(10000000)" &> /dev/null
|
||||
|
||||
query_for_pending="SELECT count() FROM test.cannot_kill_query WHERE NOT ignore(sleep(1)) SETTINGS max_threads=1"
|
||||
$CLICKHOUSE_CLIENT -q "$query_for_pending" &>/dev/null &
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "ALTER TABLE test.cannot_kill_query MODIFY COLUMN x UInt64" &>/dev/null &
|
||||
|
||||
query_to_kill="SELECT sum(1) FROM test.cannot_kill_query WHERE NOT ignore(sleep(1)) SETTINGS max_threads=1"
|
||||
$CLICKHOUSE_CLIENT -q "$query_to_kill" &>/dev/null &
|
||||
|
||||
sleep 1 # just to be sure that 'KILL ...' will be executed after 'SELECT ... WHERE NOT ignore(sleep(1))'
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_to_kill'" &>/dev/null &
|
||||
|
||||
# 'SELECT ... WHERE NOT ignore(sleep(1))' is executing much longer than 3 secs, so this sleep doesn't fail test logic
|
||||
# but guarantees to eliminate flaps, when SELECT from system.process is executed before KILL is completed
|
||||
sleep 3
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes where query='$query_to_kill'"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query='$query_for_pending'" &>/dev/null & # kill pending query
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test.cannot_kill_query" &>/dev/null
|
||||
|
Loading…
Reference in New Issue
Block a user