Merge pull request #45689 from alexX512/master

Add an option to show the partial result on query cancellation
This commit is contained in:
Nikolai Kochetov 2023-03-23 17:34:15 +01:00 committed by GitHub
commit 0f01725d8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 252 additions and 57 deletions

View File

@ -4049,3 +4049,32 @@ Possible values:
- 1 - enabled
Default value: `0`.
## stop_reading_on_first_cancel {#stop_reading_on_first_cancel}
When set to `true` and the user wants to interrupt a query (for example using `Ctrl+C` on the client), then the query continues execution only on data that was already read from the table. Afterward, it will return a partial result of the query for the part of the table that was read. To fully stop the execution of a query without a partial result, the user should send 2 cancel requests.
**Example without setting on Ctrl+C**
```sql
SELECT sum(number) FROM numbers(10000000000)
Cancelling query.
Ok.
Query was cancelled.
0 rows in set. Elapsed: 1.334 sec. Processed 52.65 million rows, 421.23 MB (39.48 million rows/s., 315.85 MB/s.)
```
**Example with setting on Ctrl+C**
```sql
SELECT sum(number) FROM numbers(10000000000) SETTINGS stop_reading_on_first_cancel=true
┌──────sum(number)─┐
│ 1355411451286266 │
└──────────────────┘
1 row in set. Elapsed: 1.331 sec. Processed 52.13 million rows, 417.05 MB (39.17 million rows/s., 313.33 MB/s.)
```
Possible values: `true`, `false`
Default value: `false`

View File

@ -4084,3 +4084,32 @@ ALTER TABLE test FREEZE SETTINGS alter_partition_verbose_result = 1;
Задает символ, который интерпретируется как суффикс после результирующего набора данных формата [CustomSeparated](../../interfaces/formats.md#format-customseparated).
Значение по умолчанию: `''`.
## stop_reading_on_first_cancel {#stop_reading_on_first_cancel}
Если установлено значение `true` и пользователь хочет прервать запрос (например, с помощью `Ctrl+C` на клиенте), то запрос продолжает выполнение только для данных, которые уже были считаны из таблицы. После этого он вернет частичный результат запроса для той части таблицы, которая была прочитана. Чтобы полностью остановить выполнение запроса без частичного результата, пользователь должен отправить 2 запроса отмены.
**Пример с выключенной настройкой при нажатии Ctrl+C**
```sql
SELECT sum(number) FROM numbers(10000000000)
Cancelling query.
Ok.
Query was cancelled.
0 rows in set. Elapsed: 1.334 sec. Processed 52.65 million rows, 421.23 MB (39.48 million rows/s., 315.85 MB/s.)
```
**Пример с включенной настройкой при нажатии Ctrl+C**
```sql
SELECT sum(number) FROM numbers(10000000000) SETTINGS stop_reading_on_first_cancel=true
┌──────sum(number)─┐
│ 1355411451286266 │
└──────────────────┘
1 row in set. Elapsed: 1.331 sec. Processed 52.13 million rows, 417.05 MB (39.17 million rows/s., 313.33 MB/s.)
```
Возможные значения:: `true`, `false`
Значение по умолчанию: `false`

View File

@ -261,21 +261,31 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
}
std::atomic_flag exit_on_signal;
std::atomic<Int32> exit_after_signals = 0;
class QueryInterruptHandler : private boost::noncopyable
{
public:
static void start() { exit_on_signal.clear(); }
/// Store how much interrupt signals can be before stopping the query
/// by default stop after the first interrupt signal.
static void start(Int32 signals_before_stop = 1) { exit_after_signals.store(signals_before_stop); }
/// Set value not greater then 0 to mark the query as stopped.
static void stop() { return exit_after_signals.store(0); }
/// Return true if the query was stopped.
static bool stop() { return exit_on_signal.test_and_set(); }
static bool cancelled() { return exit_on_signal.test(); }
/// Query was stopped if it received at least "signals_before_stop" interrupt signals.
static bool try_stop() { return exit_after_signals.fetch_sub(1) <= 0; }
static bool cancelled() { return exit_after_signals.load() <= 0; }
/// Return how much interrupt signals remain before stop.
static Int32 cancelled_status() { return exit_after_signals.load(); }
};
/// This signal handler is set only for SIGINT.
void interruptSignalHandler(int signum)
{
if (QueryInterruptHandler::stop())
if (QueryInterruptHandler::try_stop())
safeExit(128 + signum);
}
@ -850,12 +860,15 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
}
}
const auto & settings = global_context->getSettingsRef();
const Int32 signals_before_stop = settings.stop_reading_on_first_cancel ? 2 : 1;
int retries_left = 10;
while (retries_left)
{
try
{
QueryInterruptHandler::start();
QueryInterruptHandler::start(signals_before_stop);
SCOPE_EXIT({ QueryInterruptHandler::stop(); });
connection->sendQuery(
@ -872,7 +885,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
if (send_external_tables)
sendExternalTables(parsed_query);
receiveResult(parsed_query);
receiveResult(parsed_query, signals_before_stop, settings.stop_reading_on_first_cancel);
break;
}
@ -897,7 +910,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
/// Receives and processes packets coming from server.
/// Also checks if query execution should be cancelled.
void ClientBase::receiveResult(ASTPtr parsed_query)
void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, bool stop_reading_on_first_cancel)
{
// TODO: get the poll_interval from commandline.
const auto receive_timeout = connection_parameters.timeouts.receive_timeout;
@ -921,7 +934,13 @@ void ClientBase::receiveResult(ASTPtr parsed_query)
/// to avoid losing sync.
if (!cancelled)
{
if (QueryInterruptHandler::cancelled())
if (stop_reading_on_first_cancel && QueryInterruptHandler::cancelled_status() == signals_before_stop - 1)
{
connection->sendCancel();
/// First cancel reading request was sent. Next requests will only be with a full cancel
stop_reading_on_first_cancel = false;
}
else if (QueryInterruptHandler::cancelled())
{
cancelQuery();
}

View File

@ -131,7 +131,7 @@ protected:
private:
void receiveResult(ASTPtr parsed_query);
void receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, bool stop_reading_on_first_cancel);
bool receiveAndProcessPacket(ASTPtr parsed_query, bool cancelled_);
void receiveLogsAndProfileEvents(ASTPtr parsed_query);
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query);

View File

@ -281,6 +281,7 @@ class IColumn;
\
M(Bool, final, false, "Query with the FINAL modifier by default. If the engine does not support final, it does not have any effect. On queries with multiple tables final is applied only on those that support it. It also works on distributed tables", 0) \
\
M(Bool, stop_reading_on_first_cancel, false, "Allows query to return a partial result after cancel.", 0) \
/** Settings for testing hedged requests */ \
M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \

View File

@ -16,6 +16,7 @@ ExecutingGraph::ExecutingGraph(std::shared_ptr<Processors> processors_, bool pro
{
uint64_t num_processors = processors->size();
nodes.reserve(num_processors);
source_processors.reserve(num_processors);
/// Create nodes.
for (uint64_t node = 0; node < num_processors; ++node)
@ -23,6 +24,9 @@ ExecutingGraph::ExecutingGraph(std::shared_ptr<Processors> processors_, bool pro
IProcessor * proc = processors->at(node).get();
processors_map[proc] = node;
nodes.emplace_back(std::make_unique<Node>(proc, node));
bool is_source = proc->getInputs().empty();
source_processors.emplace_back(is_source);
}
/// Create edges.
@ -117,6 +121,14 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
return false;
}
processors->insert(processors->end(), new_processors.begin(), new_processors.end());
source_processors.reserve(source_processors.size() + new_processors.size());
for (auto & proc: new_processors)
{
bool is_source = proc->getInputs().empty();
source_processors.emplace_back(is_source);
}
}
uint64_t num_processors = processors->size();
@ -390,17 +402,25 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
return true;
}
void ExecutingGraph::cancel()
void ExecutingGraph::cancel(bool cancel_all_processors)
{
std::exception_ptr exception_ptr;
{
std::lock_guard guard(processors_mutex);
for (auto & processor : *processors)
uint64_t num_processors = processors->size();
for (uint64_t proc = 0; proc < num_processors; ++proc)
{
try
{
processor->cancel();
/// Stop all processors in the general case, but in a specific case
/// where the pipeline needs to return a result on a partially read table,
/// stop only the processors that read from the source
if (cancel_all_processors || source_processors.at(proc))
{
IProcessor * processor = processors->at(proc).get();
processor->cancel();
}
}
catch (...)
{
@ -415,7 +435,8 @@ void ExecutingGraph::cancel()
tryLogCurrentException("ExecutingGraph");
}
}
cancelled = true;
if (cancel_all_processors)
cancelled = true;
}
if (exception_ptr)

View File

@ -6,6 +6,7 @@
#include <mutex>
#include <queue>
#include <stack>
#include <vector>
namespace DB
@ -137,7 +138,7 @@ public:
/// If processor wants to be expanded, lock will be upgraded to get write access to pipeline.
bool updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
void cancel();
void cancel(bool cancel_all_processors = true);
private:
/// Add single edge to edges list. Check processor is known.
@ -152,6 +153,7 @@ private:
bool expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
std::shared_ptr<Processors> processors;
std::vector<bool> source_processors;
std::mutex processors_mutex;
UpgradableMutex nodes_mutex;

View File

@ -74,6 +74,15 @@ void PipelineExecutor::cancel()
graph->cancel();
}
void PipelineExecutor::cancelReading()
{
if (!cancelled_reading)
{
cancelled_reading = true;
graph->cancel(/*cancel_all_processors*/ false);
}
}
void PipelineExecutor::finish()
{
tasks.finish();
@ -148,6 +157,7 @@ bool PipelineExecutor::checkTimeLimitSoft()
// so that the "break" is faster and doesn't wait for long events
if (!continuing)
cancel();
return continuing;
}

View File

@ -50,6 +50,9 @@ public:
/// Cancel execution. May be called from another thread.
void cancel();
/// Cancel processors which only read data from source. May be called from another thread.
void cancelReading();
/// Checks the query time limits (cancelled or timeout). Throws on cancellation or when time limit is reached and the query uses "break"
bool checkTimeLimit();
/// Same as checkTimeLimit but it never throws. It returns false on cancellation or time limit reached
@ -78,6 +81,7 @@ private:
bool trace_processors = false;
std::atomic_bool cancelled = false;
std::atomic_bool cancelled_reading = false;
Poco::Logger * log = &Poco::Logger::get("PipelineExecutor");

View File

@ -179,10 +179,41 @@ void PullingAsyncPipelineExecutor::cancel()
return;
/// Cancel execution if it wasn't finished.
try
cancelWithExceptionHandling([&]()
{
if (!data->is_finished && data->executor)
data->executor->cancel();
});
/// The following code is needed to rethrow exception from PipelineExecutor.
/// It could have been thrown from pull(), but we will not likely call it again.
/// Join thread here to wait for possible exception.
if (data->thread.joinable())
data->thread.join();
/// Rethrow exception to not swallow it in destructor.
data->rethrowExceptionIfHas();
}
void PullingAsyncPipelineExecutor::cancelReading()
{
if (!data)
return;
/// Stop reading from source if pipeline wasn't finished.
cancelWithExceptionHandling([&]()
{
if (!data->is_finished && data->executor)
data->executor->cancelReading();
});
}
void PullingAsyncPipelineExecutor::cancelWithExceptionHandling(CancelFunc && cancel_func)
{
try
{
cancel_func();
}
catch (...)
{
@ -194,16 +225,6 @@ void PullingAsyncPipelineExecutor::cancel()
data->has_exception = true;
}
}
/// The following code is needed to rethrow exception from PipelineExecutor.
/// It could have been thrown from pull(), but we will not likely call it again.
/// Join thread here to wait for possible exception.
if (data->thread.joinable())
data->thread.join();
/// Rethrow exception to not swallow it in destructor.
data->rethrowExceptionIfHas();
}
Chunk PullingAsyncPipelineExecutor::getTotals()

View File

@ -1,4 +1,5 @@
#pragma once
#include <functional>
#include <memory>
namespace DB
@ -32,9 +33,12 @@ public:
bool pull(Chunk & chunk, uint64_t milliseconds = 0);
bool pull(Block & block, uint64_t milliseconds = 0);
/// Stop execution. It is not necessary, but helps to stop execution before executor is destroyed.
/// Stop execution of all processors. It is not necessary, but helps to stop execution before executor is destroyed.
void cancel();
/// Stop processors which only read data from source.
void cancelReading();
/// Get totals and extremes. Returns empty chunk if doesn't have any.
Chunk getTotals();
Chunk getExtremes();
@ -49,6 +53,11 @@ public:
/// Internal executor data.
struct Data;
private:
using CancelFunc = std::function<void()>;
void cancelWithExceptionHandling(CancelFunc && cancel_func);
private:
QueryPipeline & pipeline;
std::shared_ptr<LazyOutputFormat> lazy_format;

View File

@ -376,7 +376,7 @@ void TCPHandler::runImpl()
std::lock_guard lock(task_callback_mutex);
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return {};
sendReadTaskRequestAssumeLocked();
@ -392,7 +392,7 @@ void TCPHandler::runImpl()
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeAllRangesAnnouncementsSent);
std::lock_guard lock(task_callback_mutex);
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return;
sendMergeTreeAllRangesAnnounecementAssumeLocked(announcement);
@ -406,7 +406,7 @@ void TCPHandler::runImpl()
CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeReadTaskRequestsSent);
std::lock_guard lock(task_callback_mutex);
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return std::nullopt;
sendMergeTreeReadTaskRequestAssumeLocked(std::move(request));
@ -424,7 +424,7 @@ void TCPHandler::runImpl()
auto finish_or_cancel = [this]()
{
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
state.io.onCancelOrConnectionLoss();
else
state.io.onFinish();
@ -454,7 +454,7 @@ void TCPHandler::runImpl()
{
std::scoped_lock lock(task_callback_mutex, fatal_error_mutex);
if (isQueryCancelled())
if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED)
return true;
sendProgress();
@ -673,7 +673,7 @@ bool TCPHandler::readDataNext()
{
LOG_INFO(log, "Client has dropped the connection, cancel the query.");
state.is_connection_closed = true;
state.is_cancelled = true;
state.cancellation_status = CancellationStatus::FULLY_CANCELLED;
break;
}
@ -718,7 +718,7 @@ void TCPHandler::readData()
while (readDataNext())
;
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
@ -731,7 +731,7 @@ void TCPHandler::skipData()
while (readDataNext())
;
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled");
}
@ -769,7 +769,7 @@ void TCPHandler::processInsertQuery()
while (readDataNext())
executor.push(std::move(state.block_for_insert));
if (state.is_cancelled)
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
executor.cancel();
else
executor.finish();
@ -823,7 +823,8 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
{
std::unique_lock lock(task_callback_mutex);
if (isQueryCancelled())
auto cancellation_status = getQueryCancellationStatus();
if (cancellation_status == CancellationStatus::FULLY_CANCELLED)
{
/// Several callback like callback for parallel reading could be called from inside the pipeline
/// and we have to unlock the mutex from our side to prevent deadlock.
@ -832,6 +833,10 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
executor.cancel();
break;
}
else if (cancellation_status == CancellationStatus::READ_CANCELLED)
{
executor.cancelReading();
}
if (after_send_progress.elapsed() / 1000 >= interactive_delay)
{
@ -862,7 +867,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
*/
if (!isQueryCancelled())
if (getQueryCancellationStatus() != CancellationStatus::FULLY_CANCELLED)
{
sendTotals(executor.getTotalsBlock());
sendExtremes(executor.getExtremesBlock());
@ -1352,8 +1357,7 @@ bool TCPHandler::receivePacket()
return false;
case Protocol::Client::Cancel:
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the query");
state.is_cancelled = true;
decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the query.");
return false;
case Protocol::Client::Hello:
@ -1394,8 +1398,7 @@ String TCPHandler::receiveReadTaskResponseAssumeLocked()
{
if (packet_type == Protocol::Client::Cancel)
{
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the read task");
state.is_cancelled = true;
decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the read task.");
return {};
}
else
@ -1422,8 +1425,7 @@ std::optional<ParallelReadResponse> TCPHandler::receivePartitionMergeTreeReadTas
{
if (packet_type == Protocol::Client::Cancel)
{
LOG_INFO(log, "Received 'Cancel' packet from the client, canceling the MergeTree read task");
state.is_cancelled = true;
decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the MergeTree read task.");
return std::nullopt;
}
else
@ -1812,14 +1814,37 @@ void TCPHandler::initProfileEventsBlockOutput(const Block & block)
}
}
bool TCPHandler::isQueryCancelled()
void TCPHandler::decreaseCancellationStatus(const std::string & log_message)
{
if (state.is_cancelled || state.sent_all_data)
return true;
auto prev_status = magic_enum::enum_name(state.cancellation_status);
bool stop_reading_on_first_cancel = false;
if (query_context)
{
const auto & settings = query_context->getSettingsRef();
stop_reading_on_first_cancel = settings.stop_reading_on_first_cancel;
}
if (stop_reading_on_first_cancel && state.cancellation_status == CancellationStatus::NOT_CANCELLED)
{
state.cancellation_status = CancellationStatus::READ_CANCELLED;
}
else
{
state.cancellation_status = CancellationStatus::FULLY_CANCELLED;
}
auto current_status = magic_enum::enum_name(state.cancellation_status);
LOG_INFO(log, "Change cancellation status from {} to {}. Log message: {}", prev_status, current_status, log_message);
}
QueryState::CancellationStatus TCPHandler::getQueryCancellationStatus()
{
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED || state.sent_all_data)
return CancellationStatus::FULLY_CANCELLED;
if (after_check_cancelled.elapsed() / 1000 < interactive_delay)
return false;
return state.cancellation_status;
after_check_cancelled.restart();
@ -1829,9 +1854,9 @@ bool TCPHandler::isQueryCancelled()
if (in->eof())
{
LOG_INFO(log, "Client has dropped the connection, cancel the query.");
state.is_cancelled = true;
state.cancellation_status = CancellationStatus::FULLY_CANCELLED;
state.is_connection_closed = true;
return true;
return CancellationStatus::FULLY_CANCELLED;
}
UInt64 packet_type = 0;
@ -1842,16 +1867,17 @@ bool TCPHandler::isQueryCancelled()
case Protocol::Client::Cancel:
if (state.empty())
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Cancel received from client");
LOG_INFO(log, "Query was cancelled.");
state.is_cancelled = true;
return true;
decreaseCancellationStatus("Query was cancelled.");
return state.cancellation_status;
default:
throw NetException(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT, "Unknown packet from client {}", toString(packet_type));
}
}
return false;
return state.cancellation_status;
}

View File

@ -76,8 +76,17 @@ struct QueryState
/// Streams of blocks, that are processing the query.
BlockIO io;
enum class CancellationStatus: UInt8
{
FULLY_CANCELLED,
READ_CANCELLED,
NOT_CANCELLED
};
static std::string cancellationStatusToName(CancellationStatus status);
/// Is request cancelled
bool is_cancelled = false;
CancellationStatus cancellation_status = CancellationStatus::NOT_CANCELLED;
bool is_connection_closed = false;
/// empty or not
bool is_empty = true;
@ -272,7 +281,10 @@ private:
void initLogsBlockOutput(const Block & block);
void initProfileEventsBlockOutput(const Block & block);
bool isQueryCancelled();
using CancellationStatus = QueryState::CancellationStatus;
void decreaseCancellationStatus(const std::string & log_message);
CancellationStatus getQueryCancellationStatus();
/// This function is called from different threads.
void updateProgress(const Progress & value);

View File

@ -0,0 +1,11 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -n --query="SELECT sum(number * 0) FROM numbers(10000000000) SETTINGS stop_reading_on_first_cancel=true;" &
pid=$!
sleep 2
kill -SIGINT $pid
wait $pid