Merge pull request #36816 from azat/is_all_data_sent-on-exception

Set is_all_data_sent on exceptions too
This commit is contained in:
Alexey Milovidov 2022-05-01 14:22:06 +03:00 committed by GitHub
commit 321514a7f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 18 additions and 7 deletions

View File

@ -102,7 +102,8 @@ protected:
std::atomic<bool> is_killed { false };
/// All data to the client already had been sent. Including EndOfStream.
/// All data to the client already had been sent.
/// Including EndOfStream or Exception.
std::atomic<bool> is_all_data_sent { false };
void setUserProcessList(ProcessListForUser * user_process_list_);

View File

@ -47,5 +47,15 @@ BlockIO::~BlockIO()
reset();
}
void BlockIO::setAllDataSent() const
{
/// The following queries does not have process_list_entry:
/// - internal
/// - SHOW PROCESSLIST
if (process_list_entry)
(*process_list_entry)->setAllDataSent();
}
}

View File

@ -48,6 +48,9 @@ struct BlockIO
pipeline.reset();
}
/// Set is_all_data_sent in system.processes for this query.
void setAllDataSent() const;
private:
void reset();
};

View File

@ -31,7 +31,6 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/Session.h>
#include <Interpreters/ProcessList.h>
#include <Server/TCPServer.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
@ -1702,6 +1701,8 @@ void TCPHandler::sendTableColumns(const ColumnsDescription & columns)
void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
{
state.io.setAllDataSent();
writeVarUInt(Protocol::Server::Exception, *out);
writeException(e, *out, with_stack_trace);
out->next();
@ -1711,11 +1712,7 @@ void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
void TCPHandler::sendEndOfStream()
{
state.sent_all_data = true;
/// The following queries does not have process_list_entry:
/// - internal
/// - SHOW PROCESSLIST
if (state.io.process_list_entry)
(*state.io.process_list_entry)->setAllDataSent();
state.io.setAllDataSent();
writeVarUInt(Protocol::Server::EndOfStream, *out);
out->next();