From 0e3a8840b565164c9046445e56208e6b1dc5fd48 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Mon, 2 Nov 2020 03:47:43 +0300 Subject: [PATCH] Support cancellation of executing query via gRPC. --- src/Server/GRPCServer.cpp | 116 ++++++++++++++++--- src/Server/grpc_protos/clickhouse_grpc.proto | 4 +- tests/integration/test_grpc_protocol/test.py | 27 +++++ 3 files changed, 133 insertions(+), 14 deletions(-) diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index c1b99a809df..79e1630931a 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -179,6 +179,8 @@ namespace str.append(str.empty() ? "" : ", ").append("progress"); if (result.logs_size()) str.append(str.empty() ? "" : ", ").append("logs: ").append(std::to_string(result.logs_size())).append(" entries"); + if (result.cancelled()) + str.append(str.empty() ? "" : ", ").append("cancelled"); if (result.has_exception()) str.append(str.empty() ? "" : ", ").append("exception"); return str; @@ -468,6 +470,7 @@ namespace void readQueryInfo(); void throwIfFailedToReadQueryInfo(); + bool isQueryCancelled(); void addProgressToResult(); void addTotalsToResult(const Block & totals); @@ -505,6 +508,7 @@ namespace bool initial_query_info_read = false; bool finalize = false; bool responder_finished = false; + bool cancelled = false; std::optional read_buffer; std::optional write_buffer; @@ -522,6 +526,8 @@ namespace std::atomic reading_query_info = false; std::atomic failed_to_read_query_info = false; GRPCQueryInfo next_query_info_while_reading; + std::atomic want_to_cancel = false; + std::atomic check_query_info_contains_cancel_only = false; std::atomic sending_result = false; std::atomic failed_to_send_result = false; @@ -589,6 +595,9 @@ namespace readQueryInfo(); + if (query_info.cancel()) + throw Exception("Initial query info cannot set the 'cancel' field", ErrorCodes::INVALID_GRPC_QUERY_INFO); + LOG_DEBUG(log, "Received initial QueryInfo: {}", getQueryDescription(query_info)); } @@ -777,9 +786,13 @@ namespace || !query_info.session_id().empty()) { throw Exception("Extra query infos can be used only to add more input data. " - "Only the following fields can be set: input_data, next_query_info", + "Only the following fields can be set: input_data, next_query_info, cancel", ErrorCodes::INVALID_GRPC_QUERY_INFO); } + + if (isQueryCancelled()) + break; + LOG_DEBUG(log, "Received extra QueryInfo: input_data: {} bytes", query_info.input_data().size()); need_input_data_from_query_info = true; } @@ -824,10 +837,23 @@ namespace block_output_stream = query_context->getOutputFormat(output_format, *write_buffer, async_in.getHeader()); Stopwatch after_send_progress; + /// We are not going to receive input data anymore. + check_query_info_contains_cancel_only = true; + + auto check_for_cancel = [&] + { + if (isQueryCancelled()) + { + async_in.cancel(false); + return false; + } + return true; + }; + async_in.readPrefix(); block_output_stream->writePrefix(); - while (true) + while (check_for_cancel()) { Block block; if (async_in.poll(interactive_delay / 1000)) @@ -838,6 +864,8 @@ namespace } throwIfFailedToSendResult(); + if (!check_for_cancel()) + break; if (block && !io.null_format) block_output_stream->write(block); @@ -855,14 +883,19 @@ namespace sendResult(); throwIfFailedToSendResult(); + if (!check_for_cancel()) + break; } async_in.readSuffix(); block_output_stream->writeSuffix(); - addTotalsToResult(io.in->getTotals()); - addExtremesToResult(io.in->getExtremes()); - addProfileInfoToResult(io.in->getProfileInfo()); + if (!isQueryCancelled()) + { + addTotalsToResult(io.in->getTotals()); + addExtremesToResult(io.in->getExtremes()); + addProfileInfoToResult(io.in->getProfileInfo()); + } } void Call::generateOutputWithProcessors() @@ -876,10 +909,28 @@ namespace block_output_stream->writePrefix(); Stopwatch after_send_progress; - Block block; - while (executor->pull(block, interactive_delay / 1000)) + /// We are not going to receive input data anymore. + check_query_info_contains_cancel_only = true; + + auto check_for_cancel = [&] { + if (isQueryCancelled()) + { + executor->cancel(); + return false; + } + return true; + }; + + Block block; + while (check_for_cancel()) + { + if (!executor->pull(block, interactive_delay / 1000)) + break; + throwIfFailedToSendResult(); + if (!check_for_cancel()) + break; if (block && !io.null_format) block_output_stream->write(block); @@ -897,13 +948,18 @@ namespace sendResult(); throwIfFailedToSendResult(); + if (!check_for_cancel()) + break; } block_output_stream->writeSuffix(); - addTotalsToResult(executor->getTotalsBlock()); - addExtremesToResult(executor->getExtremesBlock()); - addProfileInfoToResult(executor->getProfileInfo()); + if (!isQueryCancelled()) + { + addTotalsToResult(executor->getTotalsBlock()); + addExtremesToResult(executor->getExtremesBlock()); + addProfileInfoToResult(executor->getProfileInfo()); + } } void Call::finishQuery() @@ -996,7 +1052,22 @@ namespace responder->read(next_query_info_while_reading, [this](bool ok) { /// Called on queue_thread. - if (!ok) + if (ok) + { + const auto & nqi = next_query_info_while_reading; + if (check_query_info_contains_cancel_only) + { + if (!nqi.query().empty() || !nqi.query_id().empty() || !nqi.settings().empty() || !nqi.database().empty() + || !nqi.input_data().empty() || !nqi.input_data_delimiter().empty() || !nqi.output_format().empty() + || !nqi.user_name().empty() || !nqi.password().empty() || !nqi.quota().empty() || !nqi.session_id().empty()) + { + LOG_WARNING(log, "Cannot add extra information to a query which is already executing. Only the 'cancel' field can be set"); + } + } + if (nqi.cancel()) + want_to_cancel = true; + } + else { /// We cannot throw an exception right here because this code is executed /// on queue_thread. @@ -1030,7 +1101,7 @@ namespace /// Maybe it's reading a query info right now. Let it finish. finish_reading(); - if (isInputStreaming(call_type) && query_info.next_query_info()) + if (isInputStreaming(call_type)) { /// Next query info can contain more input data. Now we start reading a next query info, /// so another call of readQueryInfo() in the future will probably be able to take it. @@ -1049,6 +1120,25 @@ namespace } } + bool Call::isQueryCancelled() + { + if (cancelled) + { + result.set_cancelled(true); + return true; + } + + if (want_to_cancel) + { + LOG_INFO(log, "Query cancelled"); + cancelled = true; + result.set_cancelled(true); + return true; + } + + return false; + } + void Call::addProgressToResult() { auto values = progress.fetchAndResetPiecewiseAtomically(); @@ -1151,7 +1241,7 @@ namespace return; /// If output is not streaming then only the final result can be sent. - bool send_final_message = finalize || result.has_exception(); + bool send_final_message = finalize || result.has_exception() || result.cancelled(); if (!send_final_message && !isOutputStreaming(call_type)) return; diff --git a/src/Server/grpc_protos/clickhouse_grpc.proto b/src/Server/grpc_protos/clickhouse_grpc.proto index 665f3247dbb..745e12d2dc4 100644 --- a/src/Server/grpc_protos/clickhouse_grpc.proto +++ b/src/Server/grpc_protos/clickhouse_grpc.proto @@ -16,7 +16,8 @@ message QueryInfo { string session_id = 11; bool session_check = 12; uint32 session_timeout = 13; - bool next_query_info = 14; + bool cancel = 14; + bool next_query_info = 15; } enum LogsLevel { @@ -72,6 +73,7 @@ message Result { Progress progress = 5; Stats stats = 6; Exception exception = 7; + bool cancelled = 8; } service ClickHouse { diff --git a/tests/integration/test_grpc_protocol/test.py b/tests/integration/test_grpc_protocol/test.py index 6e324151a5e..d242947f953 100644 --- a/tests/integration/test_grpc_protocol/test.py +++ b/tests/integration/test_grpc_protocol/test.py @@ -2,6 +2,7 @@ import os import pytest import subprocess import sys +import time import grpc from helpers.cluster import ClickHouseCluster from threading import Thread @@ -274,3 +275,29 @@ def test_simultaneous_queries_multiple_channels(): finally: for thread in threads: thread.join() + +def test_cancel_while_processing_input(): + query("CREATE TABLE t (a UInt8) ENGINE = Memory") + def send_query_info(): + yield clickhouse_grpc_pb2.QueryInfo(query="INSERT INTO t FORMAT TabSeparated", input_data="1\n2\n3\n", next_query_info=True) + yield clickhouse_grpc_pb2.QueryInfo(input_data="4\n5\n6\n", next_query_info=True) + yield clickhouse_grpc_pb2.QueryInfo(cancel=True) + stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel) + result = stub.ExecuteQueryWithStreamInput(send_query_info()) + assert result.cancelled == True + assert result.progress.written_rows == 6 + assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n" + +def test_cancel_while_generating_output(): + def send_query_info(): + yield clickhouse_grpc_pb2.QueryInfo(query="SELECT number, sleep(0.2) FROM numbers(10) SETTINGS max_block_size=2") + time.sleep(0.5) + yield clickhouse_grpc_pb2.QueryInfo(cancel=True) + stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel) + results = list(stub.ExecuteQueryWithStreamIO(send_query_info())) + assert len(results) >= 1 + assert results[-1].cancelled == True + output = '' + for result in results: + output += result.output + assert output == '0\t0\n1\t0\n2\t0\n3\t0\n'