rework TCPHandler exception handling

This commit is contained in:
Sema Checherinda 2024-10-22 14:26:06 +02:00
parent 392676b89a
commit fe2aa961b6
10 changed files with 743 additions and 743 deletions

View File

@ -18,7 +18,6 @@
using Poco::Exception;
using Poco::ErrorHandler;
namespace Poco {
@ -31,9 +30,7 @@ TCPServerConnection::TCPServerConnection(const StreamSocket& socket):
}
TCPServerConnection::~TCPServerConnection()
{
}
TCPServerConnection::~TCPServerConnection() = default;
void TCPServerConnection::start()

View File

@ -14,6 +14,7 @@
#include <Client/ClientBase.h>
#include <Client/Connection.h>
#include <Client/ConnectionParameters.h>
#include "Common/StackTrace.h"
#include <Common/ClickHouseRevision.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
@ -892,6 +893,7 @@ void Connection::sendQuery(
void Connection::sendCancel()
{
LOG_DEBUG(log_wrapper.get(), "send cancel from {}", StackTrace().toString());
/// If we already disconnected.
if (!out)
return;

View File

@ -611,6 +611,7 @@
M(730, REFRESH_FAILED) \
M(731, QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE) \
M(733, TABLE_IS_BEING_RESTARTED) \
M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -17,6 +17,7 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
extern const int CANNOT_WRITE_AFTER_BUFFER_CANCELED;
extern const int LOGICAL_ERROR;
}
@ -102,7 +103,7 @@ public:
throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot write to finalized buffer"};
if (canceled)
throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot write to canceled buffer"};
throw Exception{ErrorCodes::CANNOT_WRITE_AFTER_BUFFER_CANCELED, "Cannot write to canceled buffer"};
size_t bytes_copied = 0;

View File

@ -239,7 +239,6 @@ public:
/// Returns an entry in the ProcessList associated with this QueryStatus. The function can return nullptr.
std::shared_ptr<ProcessListEntry> getProcessListEntry() const;
bool isAllDataSent() const { return is_all_data_sent; }
void setAllDataSent() { is_all_data_sent = true; }
/// Adds a pipeline to the QueryStatus

View File

@ -54,10 +54,12 @@ void BlockIO::onFinish()
pipeline.reset();
}
void BlockIO::onException()
void BlockIO::onException(bool log_as_error)
{
setAllDataSent();
if (exception_callback)
exception_callback(/* log_error */ true);
exception_callback(log_as_error);
pipeline.cancel();
pipeline.reset();

View File

@ -32,7 +32,7 @@ struct BlockIO
bool null_format = false;
void onFinish();
void onException();
void onException(bool log_as_error=true);
void onCancelOrConnectionLoss();
/// Set is_all_data_sent in system.processes for this query.

View File

@ -1,3 +1,4 @@
#include "Common/logger_useful.h"
#include <Common/ConcurrentBoundedQueue.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
@ -750,10 +751,12 @@ void RemoteQueryExecutor::finish()
switch (packet.type)
{
case Protocol::Server::EndOfStream:
LOG_DEBUG(log, "RemoteQueryExecutor::finish EndOfStream");
finished = true;
break;
case Protocol::Server::Exception:
LOG_DEBUG(log, "RemoteQueryExecutor::finish Exception :: {}", packet.exception->what());
got_exception_from_replica = true;
packet.exception->rethrow();
break;

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,7 @@
#include "Server/TCPProtocolStackData.h"
#include "Storages/MergeTree/RequestResponse.h"
#include "base/types.h"
#include "base/defines.h"
namespace CurrentMetrics
@ -53,6 +54,8 @@ struct QueryState
/// Identifier of the query.
String query_id;
ContextMutablePtr query_context;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
Protocol::Compression compression = Protocol::Compression::Disable;
@ -82,18 +85,10 @@ struct QueryState
/// Streams of blocks, that are processing the query.
BlockIO io;
enum class CancellationStatus: UInt8
{
FULLY_CANCELLED,
READ_CANCELLED,
NOT_CANCELLED
};
/// Is request cancelled
CancellationStatus cancellation_status = CancellationStatus::NOT_CANCELLED;
bool is_connection_closed = false;
/// empty or not
bool is_empty = true;
bool allow_partial_result_on_first_cancel = false;
bool stop_read_return_partial_result = false;
/// Data was sent.
bool sent_all_data = false;
/// Request requires data from the client (INSERT, but not INSERT SELECT).
@ -113,6 +108,9 @@ struct QueryState
/// If true, the data packets will be skipped instead of reading. Used to recover after errors.
bool skipping_data = false;
bool query_duration_already_logged = false;
ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
/// To output progress, the difference after the previous sending of progress.
Progress progress;
@ -122,10 +120,7 @@ struct QueryState
/// Timeouts setter for current query
std::unique_ptr<TimeoutSetter> timeout_setter;
void reset()
{
*this = QueryState();
}
std::mutex mutex;
void finalizeOut()
{
@ -141,10 +136,7 @@ struct QueryState
maybe_compressed_out->cancel();
}
bool empty() const
{
return is_empty;
}
bool isEnanbledPartialResultOnFirstCancel() const;
};
@ -220,9 +212,11 @@ private:
Poco::Timespan sleep_after_receiving_query;
std::unique_ptr<Session> session;
ContextMutablePtr query_context;
ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::NO_QUERY;
/// A state got uuids to exclude from a query
std::optional<std::vector<UUID>> part_uuids_to_ignore;
/// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBufferFromPocoSocketChunked> in;
std::shared_ptr<WriteBufferFromPocoSocketChunked> out;
@ -249,21 +243,12 @@ private:
/// `out_mutex` protects `out` (WriteBuffer).
/// So it is used for method sendData(), sendProgress(), sendLogs(), etc.
std::mutex out_mutex;
/// `task_callback_mutex` protects tasks callbacks.
/// Inside these callbacks we might also change cancellation status,
/// so it also protects cancellation status checks.
std::mutex task_callback_mutex;
/// At the moment, only one ongoing query in the connection is supported at a time.
QueryState state;
/// Last block input parameters are saved to be able to receive unexpected data packet sent after exception.
LastBlockInputParameters last_block_in;
CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection};
ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
/// It is the name of the server that will be sent to the client.
String server_display_name;
String host_name;
@ -277,66 +262,67 @@ private:
bool receiveProxyHeader();
void receiveHello();
void receiveAddendum();
bool receivePacket();
void receiveQuery();
void receiveIgnoredPartUUIDs();
String receiveReadTaskResponseAssumeLocked();
std::optional<ParallelReadResponse> receivePartitionMergeTreeReadTaskResponseAssumeLocked();
bool receiveData(bool scalar);
bool readDataNext();
void readData();
void skipData();
void receiveClusterNameAndSalt();
bool receivePacketsExpectQuery(std::optional<QueryState> & state);
bool receivePacketsExpectData(QueryState & state);
void receivePacketsExpectCancel(QueryState & state);
String receiveReadTaskResponseAssumeLocked(QueryState & state);
std::optional<ParallelReadResponse> receivePartitionMergeTreeReadTaskResponseAssumeLocked(QueryState & state);
bool receiveUnexpectedData(bool throw_exception = true);
[[noreturn]] void receiveUnexpectedQuery();
[[noreturn]] void receiveUnexpectedIgnoredPartUUIDs();
[[noreturn]] void receiveUnexpectedHello();
[[noreturn]] void receiveUnexpectedTablesStatusRequest();
//bool receivePacket(std::optional<QueryState> & state);
void processCancel(QueryState & state);
void processQuery(std::optional<QueryState> & state);
void processIgnoredPartUUIDs();
bool processData(QueryState & state, bool scalar);
void processClusterNameAndSalt();
void readData(QueryState & state);
void skipData(QueryState & state);
bool processUnexpectedData();
[[noreturn]] void processUnexpectedQuery();
[[noreturn]] void processUnexpectedIgnoredPartUUIDs();
[[noreturn]] void processUnexpectedHello();
[[noreturn]] void processUnexpectedTablesStatusRequest();
/// Process INSERT query
void startInsertQuery();
void processInsertQuery();
AsynchronousInsertQueue::PushResult processAsyncInsertQuery(AsynchronousInsertQueue & insert_queue);
void startInsertQuery(QueryState & state);
void processInsertQuery(QueryState & state);
AsynchronousInsertQueue::PushResult processAsyncInsertQuery(QueryState & state, AsynchronousInsertQueue & insert_queue);
/// Process a request that does not require the receiving of data blocks from the client
void processOrdinaryQuery();
void processOrdinaryQuery(QueryState & state);
void processTablesStatusRequest();
void sendHello();
void sendData(const Block & block); /// Write a block to the network.
void sendLogData(const Block & block);
void sendTableColumns(const ColumnsDescription & columns);
void sendException(const Exception & e, bool with_stack_trace);
void sendProgress();
void sendLogs();
void sendEndOfStream();
void sendPartUUIDs();
void sendReadTaskRequestAssumeLocked();
void sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement);
void sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request);
void sendProfileInfo(const ProfileInfo & info);
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);
void sendProfileEvents();
void sendSelectProfileEvents();
void sendInsertProfileEvents();
void sendTimezone();
void sendHello() TSA_REQUIRES(out_mutex);
void sendData(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex); /// Write a block to the network.
void sendLogData(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex);
void sendTableColumns(QueryState & state, const ColumnsDescription & columns)TSA_REQUIRES(out_mutex) ;
void sendException(const Exception & e, bool with_stack_trace) TSA_REQUIRES(out_mutex);
void sendProgress(QueryState & state) TSA_REQUIRES(out_mutex);
void sendLogs(QueryState & state) TSA_REQUIRES(out_mutex);
void sendEndOfStream(QueryState & state) TSA_REQUIRES(out_mutex);
void sendPartUUIDs(QueryState & state) TSA_REQUIRES(out_mutex);
void sendReadTaskRequestAssumeLocked() TSA_REQUIRES(out_mutex);
void sendMergeTreeAllRangesAnnouncementAssumeLocked(QueryState & state, InitialAllRangesAnnouncement announcement) TSA_REQUIRES(out_mutex);
void sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request) TSA_REQUIRES(out_mutex);
void sendProfileInfo(QueryState & state, const ProfileInfo & info) TSA_REQUIRES(out_mutex);
void sendTotals(QueryState & state, const Block & totals) TSA_REQUIRES(out_mutex);
void sendExtremes(QueryState & state, const Block & extremes) TSA_REQUIRES(out_mutex);
void sendProfileEvents(QueryState & state) TSA_REQUIRES(out_mutex);
void sendSelectProfileEvents(QueryState & state) TSA_REQUIRES(out_mutex);
void sendInsertProfileEvents(QueryState & state) TSA_REQUIRES(out_mutex);
void sendTimezone(QueryState & state) TSA_REQUIRES(out_mutex);
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();
void initBlockOutput(const Block & block);
void initLogsBlockOutput(const Block & block);
void initProfileEventsBlockOutput(const Block & block);
using CancellationStatus = QueryState::CancellationStatus;
void decreaseCancellationStatus(const std::string & log_message);
CancellationStatus getQueryCancellationStatus();
void initBlockInput(QueryState & state);
void initBlockOutput(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex);
void initLogsBlockOutput(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex);
void initProfileEventsBlockOutput(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex);
/// This function is called from different threads.
void updateProgress(const Progress & value);
void updateProgress(QueryState & state, const Progress & value);
void logQueryDuration(QueryState & state);
Poco::Net::SocketAddress getClientAddress(const ClientInfo & client_info);
};