Support cancellation of executing query via gRPC.

This commit is contained in:
Vitaly Baranov 2020-11-02 03:47:43 +03:00
parent 18ebea5d66
commit 0e3a8840b5
3 changed files with 133 additions and 14 deletions

View File

@ -179,6 +179,8 @@ namespace
str.append(str.empty() ? "" : ", ").append("progress");
if (result.logs_size())
str.append(str.empty() ? "" : ", ").append("logs: ").append(std::to_string(result.logs_size())).append(" entries");
if (result.cancelled())
str.append(str.empty() ? "" : ", ").append("cancelled");
if (result.has_exception())
str.append(str.empty() ? "" : ", ").append("exception");
return str;
@ -468,6 +470,7 @@ namespace
void readQueryInfo();
void throwIfFailedToReadQueryInfo();
bool isQueryCancelled();
void addProgressToResult();
void addTotalsToResult(const Block & totals);
@ -505,6 +508,7 @@ namespace
bool initial_query_info_read = false;
bool finalize = false;
bool responder_finished = false;
bool cancelled = false;
std::optional<ReadBufferFromCallback> read_buffer;
std::optional<WriteBufferFromString> write_buffer;
@ -522,6 +526,8 @@ namespace
std::atomic<bool> reading_query_info = false;
std::atomic<bool> failed_to_read_query_info = false;
GRPCQueryInfo next_query_info_while_reading;
std::atomic<bool> want_to_cancel = false;
std::atomic<bool> check_query_info_contains_cancel_only = false;
std::atomic<bool> sending_result = false;
std::atomic<bool> failed_to_send_result = false;
@ -589,6 +595,9 @@ namespace
readQueryInfo();
if (query_info.cancel())
throw Exception("Initial query info cannot set the 'cancel' field", ErrorCodes::INVALID_GRPC_QUERY_INFO);
LOG_DEBUG(log, "Received initial QueryInfo: {}", getQueryDescription(query_info));
}
@ -777,9 +786,13 @@ namespace
|| !query_info.session_id().empty())
{
throw Exception("Extra query infos can be used only to add more input data. "
"Only the following fields can be set: input_data, next_query_info",
"Only the following fields can be set: input_data, next_query_info, cancel",
ErrorCodes::INVALID_GRPC_QUERY_INFO);
}
if (isQueryCancelled())
break;
LOG_DEBUG(log, "Received extra QueryInfo: input_data: {} bytes", query_info.input_data().size());
need_input_data_from_query_info = true;
}
@ -824,10 +837,23 @@ namespace
block_output_stream = query_context->getOutputFormat(output_format, *write_buffer, async_in.getHeader());
Stopwatch after_send_progress;
/// We are not going to receive input data anymore.
check_query_info_contains_cancel_only = true;
auto check_for_cancel = [&]
{
if (isQueryCancelled())
{
async_in.cancel(false);
return false;
}
return true;
};
async_in.readPrefix();
block_output_stream->writePrefix();
while (true)
while (check_for_cancel())
{
Block block;
if (async_in.poll(interactive_delay / 1000))
@ -838,6 +864,8 @@ namespace
}
throwIfFailedToSendResult();
if (!check_for_cancel())
break;
if (block && !io.null_format)
block_output_stream->write(block);
@ -855,14 +883,19 @@ namespace
sendResult();
throwIfFailedToSendResult();
if (!check_for_cancel())
break;
}
async_in.readSuffix();
block_output_stream->writeSuffix();
addTotalsToResult(io.in->getTotals());
addExtremesToResult(io.in->getExtremes());
addProfileInfoToResult(io.in->getProfileInfo());
if (!isQueryCancelled())
{
addTotalsToResult(io.in->getTotals());
addExtremesToResult(io.in->getExtremes());
addProfileInfoToResult(io.in->getProfileInfo());
}
}
void Call::generateOutputWithProcessors()
@ -876,10 +909,28 @@ namespace
block_output_stream->writePrefix();
Stopwatch after_send_progress;
Block block;
while (executor->pull(block, interactive_delay / 1000))
/// We are not going to receive input data anymore.
check_query_info_contains_cancel_only = true;
auto check_for_cancel = [&]
{
if (isQueryCancelled())
{
executor->cancel();
return false;
}
return true;
};
Block block;
while (check_for_cancel())
{
if (!executor->pull(block, interactive_delay / 1000))
break;
throwIfFailedToSendResult();
if (!check_for_cancel())
break;
if (block && !io.null_format)
block_output_stream->write(block);
@ -897,13 +948,18 @@ namespace
sendResult();
throwIfFailedToSendResult();
if (!check_for_cancel())
break;
}
block_output_stream->writeSuffix();
addTotalsToResult(executor->getTotalsBlock());
addExtremesToResult(executor->getExtremesBlock());
addProfileInfoToResult(executor->getProfileInfo());
if (!isQueryCancelled())
{
addTotalsToResult(executor->getTotalsBlock());
addExtremesToResult(executor->getExtremesBlock());
addProfileInfoToResult(executor->getProfileInfo());
}
}
void Call::finishQuery()
@ -996,7 +1052,22 @@ namespace
responder->read(next_query_info_while_reading, [this](bool ok)
{
/// Called on queue_thread.
if (!ok)
if (ok)
{
const auto & nqi = next_query_info_while_reading;
if (check_query_info_contains_cancel_only)
{
if (!nqi.query().empty() || !nqi.query_id().empty() || !nqi.settings().empty() || !nqi.database().empty()
|| !nqi.input_data().empty() || !nqi.input_data_delimiter().empty() || !nqi.output_format().empty()
|| !nqi.user_name().empty() || !nqi.password().empty() || !nqi.quota().empty() || !nqi.session_id().empty())
{
LOG_WARNING(log, "Cannot add extra information to a query which is already executing. Only the 'cancel' field can be set");
}
}
if (nqi.cancel())
want_to_cancel = true;
}
else
{
/// We cannot throw an exception right here because this code is executed
/// on queue_thread.
@ -1030,7 +1101,7 @@ namespace
/// Maybe it's reading a query info right now. Let it finish.
finish_reading();
if (isInputStreaming(call_type) && query_info.next_query_info())
if (isInputStreaming(call_type))
{
/// Next query info can contain more input data. Now we start reading a next query info,
/// so another call of readQueryInfo() in the future will probably be able to take it.
@ -1049,6 +1120,25 @@ namespace
}
}
bool Call::isQueryCancelled()
{
if (cancelled)
{
result.set_cancelled(true);
return true;
}
if (want_to_cancel)
{
LOG_INFO(log, "Query cancelled");
cancelled = true;
result.set_cancelled(true);
return true;
}
return false;
}
void Call::addProgressToResult()
{
auto values = progress.fetchAndResetPiecewiseAtomically();
@ -1151,7 +1241,7 @@ namespace
return;
/// If output is not streaming then only the final result can be sent.
bool send_final_message = finalize || result.has_exception();
bool send_final_message = finalize || result.has_exception() || result.cancelled();
if (!send_final_message && !isOutputStreaming(call_type))
return;

View File

@ -16,7 +16,8 @@ message QueryInfo {
string session_id = 11;
bool session_check = 12;
uint32 session_timeout = 13;
bool next_query_info = 14;
bool cancel = 14;
bool next_query_info = 15;
}
enum LogsLevel {
@ -72,6 +73,7 @@ message Result {
Progress progress = 5;
Stats stats = 6;
Exception exception = 7;
bool cancelled = 8;
}
service ClickHouse {

View File

@ -2,6 +2,7 @@ import os
import pytest
import subprocess
import sys
import time
import grpc
from helpers.cluster import ClickHouseCluster
from threading import Thread
@ -274,3 +275,29 @@ def test_simultaneous_queries_multiple_channels():
finally:
for thread in threads:
thread.join()
def test_cancel_while_processing_input():
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
def send_query_info():
yield clickhouse_grpc_pb2.QueryInfo(query="INSERT INTO t FORMAT TabSeparated", input_data="1\n2\n3\n", next_query_info=True)
yield clickhouse_grpc_pb2.QueryInfo(input_data="4\n5\n6\n", next_query_info=True)
yield clickhouse_grpc_pb2.QueryInfo(cancel=True)
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
result = stub.ExecuteQueryWithStreamInput(send_query_info())
assert result.cancelled == True
assert result.progress.written_rows == 6
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n"
def test_cancel_while_generating_output():
def send_query_info():
yield clickhouse_grpc_pb2.QueryInfo(query="SELECT number, sleep(0.2) FROM numbers(10) SETTINGS max_block_size=2")
time.sleep(0.5)
yield clickhouse_grpc_pb2.QueryInfo(cancel=True)
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
results = list(stub.ExecuteQueryWithStreamIO(send_query_info()))
assert len(results) >= 1
assert results[-1].cancelled == True
output = ''
for result in results:
output += result.output
assert output == '0\t0\n1\t0\n2\t0\n3\t0\n'