Set is_all_data_sent on exceptions too

This will avoid clickhouse-test complains about left queries after the
test, like in [1].

  [1]: https://s3.amazonaws.com/clickhouse-test-reports/36258/9646487c093a75dc31e3e818417cfad83580b40f/stateful_tests__memory__actions_.html

Follow-up for: #36649
This commit is contained in:
Azat Khuzhin 2022-04-30 03:04:12 +03:00
parent 65f33a3089
commit d82f2ff162
4 changed files with 18 additions and 7 deletions

View File

@ -102,7 +102,8 @@ protected:
std::atomic<bool> is_killed { false };
/// All data to the client already had been sent. Including EndOfStream.
/// All data to the client already had been sent.
/// Including EndOfStream or Exception.
std::atomic<bool> is_all_data_sent { false };
void setUserProcessList(ProcessListForUser * user_process_list_);

View File

@ -47,5 +47,15 @@ BlockIO::~BlockIO()
reset();
}
void BlockIO::setAllDataSent() const
{
/// The following queries does not have process_list_entry:
/// - internal
/// - SHOW PROCESSLIST
if (process_list_entry)
(*process_list_entry)->setAllDataSent();
}
}

View File

@ -48,6 +48,9 @@ struct BlockIO
pipeline.reset();
}
/// Set is_all_data_sent in system.processes for this query.
void setAllDataSent() const;
private:
void reset();
};

View File

@ -31,7 +31,6 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/Session.h>
#include <Interpreters/ProcessList.h>
#include <Server/TCPServer.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
@ -1702,6 +1701,8 @@ void TCPHandler::sendTableColumns(const ColumnsDescription & columns)
void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
{
state.io.setAllDataSent();
writeVarUInt(Protocol::Server::Exception, *out);
writeException(e, *out, with_stack_trace);
out->next();
@ -1711,11 +1712,7 @@ void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
void TCPHandler::sendEndOfStream()
{
state.sent_all_data = true;
/// The following queries does not have process_list_entry:
/// - internal
/// - SHOW PROCESSLIST
if (state.io.process_list_entry)
(*state.io.process_list_entry)->setAllDataSent();
state.io.setAllDataSent();
writeVarUInt(Protocol::Server::EndOfStream, *out);
out->next();