Merge pull request #36710 from azat/client-wait-insert

Properly cancel INSERT queries in clickhouse-client
This commit is contained in:
Kseniia Sumarokova 2022-04-28 13:19:26 +02:00 committed by GitHub
commit b100ca4b17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 14 deletions

View File

@ -765,21 +765,9 @@ void ClientBase::receiveResult(ASTPtr parsed_query)
/// to avoid losing sync.
if (!cancelled)
{
auto cancel_query = [&] {
connection->sendCancel();
if (is_interactive)
{
progress_indication.clearProgressOutput();
std::cout << "Cancelling query." << std::endl;
}
cancelled = true;
};
/// handler received sigint
if (QueryInterruptHandler::cancelled())
{
cancel_query();
cancelQuery();
}
else
{
@ -790,7 +778,7 @@ void ClientBase::receiveResult(ASTPtr parsed_query)
<< " Waited for " << static_cast<size_t>(elapsed) << " seconds,"
<< " timeout is " << receive_timeout.totalSeconds() << " seconds." << std::endl;
cancel_query();
cancelQuery();
}
}
}
@ -1066,6 +1054,9 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
return;
}
QueryInterruptHandler::start();
SCOPE_EXIT({ QueryInterruptHandler::stop(); });
connection->sendQuery(
connection_parameters.timeouts,
query,
@ -1242,6 +1233,13 @@ try
Block block;
while (executor.pull(block))
{
if (!cancelled && QueryInterruptHandler::cancelled())
{
cancelQuery();
executor.cancel();
return;
}
/// Check if server send Log packet
receiveLogs(parsed_query);
@ -1346,6 +1344,17 @@ bool ClientBase::receiveEndOfQuery()
}
}
void ClientBase::cancelQuery()
{
connection->sendCancel();
if (is_interactive)
{
progress_indication.clearProgressOutput();
std::cout << "Cancelling query." << std::endl;
}
cancelled = true;
}
void ClientBase::processParsedSingleQuery(const String & full_query, const String & query_to_execute,
ASTPtr parsed_query, std::optional<bool> echo_query_, bool report_error)

View File

@ -112,6 +112,7 @@ private:
void receiveLogs(ASTPtr parsed_query);
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description, ASTPtr parsed_query);
bool receiveEndOfQuery();
void cancelQuery();
void onProgress(const Progress & value);
void onData(Block & block, ASTPtr parsed_query);

View File

@ -0,0 +1 @@
0

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
yes 1 | $CLICKHOUSE_CLIENT --query_id "$CLICKHOUSE_TEST_UNIQUE_NAME" -q "insert into function null('n Int') format TSV" &
client_pid=$!
# wait for the query
while [ "$($CLICKHOUSE_CLIENT -q "select count() from system.processes where query_id = '$CLICKHOUSE_TEST_UNIQUE_NAME'")" = 0 ]; do
sleep 0.1
done
kill -INT $client_pid
wait $client_pid
# if client does not cancel it properly (i.e. cancel the query), then return code will be 2, otherwise 0
echo $?