diff --git a/base/poco/Net/src/TCPServerConnection.cpp b/base/poco/Net/src/TCPServerConnection.cpp index 1072b182c23..2148af4267b 100644 --- a/base/poco/Net/src/TCPServerConnection.cpp +++ b/base/poco/Net/src/TCPServerConnection.cpp @@ -18,7 +18,6 @@ using Poco::Exception; -using Poco::ErrorHandler; namespace Poco { @@ -31,9 +30,7 @@ TCPServerConnection::TCPServerConnection(const StreamSocket& socket): } -TCPServerConnection::~TCPServerConnection() -{ -} +TCPServerConnection::~TCPServerConnection() = default; void TCPServerConnection::start() diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 8301eda9334..1a51c9ce0fe 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -14,6 +14,7 @@ #include #include #include +#include "Common/StackTrace.h" #include #include #include @@ -892,6 +893,7 @@ void Connection::sendQuery( void Connection::sendCancel() { + LOG_DEBUG(log_wrapper.get(), "send cancel from {}", StackTrace().toString()); /// If we already disconnected. if (!out) return; diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 3f4a75fae3c..72cc4c8d89d 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -611,6 +611,7 @@ M(730, REFRESH_FAILED) \ M(731, QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE) \ M(733, TABLE_IS_BEING_RESTARTED) \ + M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/IO/WriteBuffer.h b/src/IO/WriteBuffer.h index 8f500da7d49..5c1ef1e63e8 100644 --- a/src/IO/WriteBuffer.h +++ b/src/IO/WriteBuffer.h @@ -17,6 +17,7 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER; + extern const int CANNOT_WRITE_AFTER_BUFFER_CANCELED; extern const int LOGICAL_ERROR; } @@ -102,7 +103,7 @@ public: throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot write to finalized buffer"}; if (canceled) - throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot write to canceled buffer"}; + throw Exception{ErrorCodes::CANNOT_WRITE_AFTER_BUFFER_CANCELED, "Cannot write to canceled buffer"}; size_t bytes_copied = 0; diff --git a/src/Interpreters/ProcessList.h b/src/Interpreters/ProcessList.h index f171fe8f4d4..d721cc6a008 100644 --- a/src/Interpreters/ProcessList.h +++ b/src/Interpreters/ProcessList.h @@ -239,7 +239,6 @@ public: /// Returns an entry in the ProcessList associated with this QueryStatus. The function can return nullptr. std::shared_ptr getProcessListEntry() const; - bool isAllDataSent() const { return is_all_data_sent; } void setAllDataSent() { is_all_data_sent = true; } /// Adds a pipeline to the QueryStatus diff --git a/src/QueryPipeline/BlockIO.cpp b/src/QueryPipeline/BlockIO.cpp index bcf3830db01..9281dffcc61 100644 --- a/src/QueryPipeline/BlockIO.cpp +++ b/src/QueryPipeline/BlockIO.cpp @@ -54,10 +54,12 @@ void BlockIO::onFinish() pipeline.reset(); } -void BlockIO::onException() +void BlockIO::onException(bool log_as_error) { + setAllDataSent(); + if (exception_callback) - exception_callback(/* log_error */ true); + exception_callback(log_as_error); pipeline.cancel(); pipeline.reset(); diff --git a/src/QueryPipeline/BlockIO.h b/src/QueryPipeline/BlockIO.h index ff85a0d6772..a84894255a1 100644 --- a/src/QueryPipeline/BlockIO.h +++ b/src/QueryPipeline/BlockIO.h @@ -32,7 +32,7 @@ struct BlockIO bool null_format = false; void onFinish(); - void onException(); + void onException(bool log_as_error=true); void onCancelOrConnectionLoss(); /// Set is_all_data_sent in system.processes for this query. diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 5faae03bc8f..5314e691daa 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -1,3 +1,4 @@ +#include "Common/logger_useful.h" #include #include #include @@ -750,10 +751,12 @@ void RemoteQueryExecutor::finish() switch (packet.type) { case Protocol::Server::EndOfStream: + LOG_DEBUG(log, "RemoteQueryExecutor::finish EndOfStream"); finished = true; break; case Protocol::Server::Exception: + LOG_DEBUG(log, "RemoteQueryExecutor::finish Exception :: {}", packet.exception->what()); got_exception_from_replica = true; packet.exception->rethrow(); break; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 4cc6cf8402b..a285b097270 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -46,6 +47,9 @@ #include #include #include +#include "Core/Settings.h" +#include "base/defines.h" +#include "base/scope_guard.h" #include #include @@ -131,23 +135,24 @@ namespace ProfileEvents namespace DB::ErrorCodes { - extern const int LOGICAL_ERROR; + extern const int ABORTED; extern const int ATTEMPT_TO_READ_AFTER_EOF; + extern const int AUTHENTICATION_FAILED; extern const int CLIENT_HAS_CONNECTED_TO_WRONG_PORT; + extern const int CLIENT_INFO_DOES_NOT_MATCH; + extern const int LOGICAL_ERROR; + extern const int NETWORK_ERROR; + extern const int POCO_EXCEPTION; + extern const int QUERY_WAS_CANCELLED; + extern const int SOCKET_TIMEOUT; + extern const int SUPPORT_IS_DISABLED; + extern const int TIMEOUT_EXCEEDED; + extern const int UNEXPECTED_PACKET_FROM_CLIENT; extern const int UNKNOWN_EXCEPTION; extern const int UNKNOWN_PACKET_FROM_CLIENT; - extern const int POCO_EXCEPTION; - extern const int SOCKET_TIMEOUT; - extern const int UNEXPECTED_PACKET_FROM_CLIENT; extern const int UNKNOWN_PROTOCOL; - extern const int AUTHENTICATION_FAILED; - extern const int QUERY_WAS_CANCELLED; - extern const int CLIENT_INFO_DOES_NOT_MATCH; - extern const int TIMEOUT_EXCEEDED; - extern const int SUPPORT_IS_DISABLED; extern const int UNSUPPORTED_METHOD; extern const int USER_EXPIRED; - extern const int NETWORK_ERROR; } namespace @@ -211,6 +216,26 @@ void validateClientInfo(const ClientInfo & session_client_info, const ClientInfo // os_user, quota_key, client_trace_context can be different. } } +struct TurnOffBoolSettingTemporary +{ + bool & setting; + bool prev_val; + + explicit TurnOffBoolSettingTemporary(bool & setting_) + : setting(setting_) + , prev_val(setting_) + { + if (prev_val) + setting = false; + } + + ~TurnOffBoolSettingTemporary() + { + if (prev_val) + setting = true; + } +}; + } namespace DB @@ -262,8 +287,10 @@ TCPHandler::TCPHandler( LOG_TRACE(log, "Forwarded client address: {}", forwarded_for); } + TCPHandler::~TCPHandler() = default; + void TCPHandler::runImpl() { setThreadName("TCPHandler"); @@ -299,7 +326,11 @@ void TCPHandler::runImpl() if (!is_interserver_mode) session->makeSessionContext(); - sendHello(); + { + std::lock_guard lock(out_mutex); + sendHello(); + } + if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM) receiveAddendum(); @@ -385,6 +416,10 @@ void TCPHandler::runImpl() while (tcp_server.isOpen()) { + /// We don't really have session in interserver mode, new one is created for each query. It's better to reset it now. + if (is_interserver_mode) + session.reset(); + /// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down. { Stopwatch idle_time; @@ -394,8 +429,6 @@ void TCPHandler::runImpl() if (idle_time.elapsedSeconds() > idle_connection_timeout) { LOG_TRACE(log, "Closing idle connection"); - state.cancelOut(); - state.reset(); return; } } @@ -405,36 +438,23 @@ void TCPHandler::runImpl() if (!tcp_server.isOpen() || server.isCancelled() || in->eof()) { LOG_TEST(log, "Closing connection (open: {}, cancelled: {}, eof: {})", tcp_server.isOpen(), server.isCancelled(), in->eof()); - state.cancelOut(); - state.reset(); return; } - state.reset(); - - /// Initialized later. - std::optional query_scope; - /// This is bag prone part. The lifetime of thread_trace_context has to be longer than Spans inside `QueryState::BlockIO` OpenTelemetry::TracingContextHolderPtr thread_trace_context; + /// Initialized later. It has to be destroyed after query_state is destroyed. + std::optional query_scope; + /// QueryState should be cleared before QueryScope, since otherwise + /// the MemoryTracker will be wrong for possible deallocations. + /// (i.e. deallocations from the Aggregator with two-level aggregation) + /// Also it resets socket's timeouts. + std::optional query_state; /** An exception during the execution of request (it must be sent over the network to the client). * The client will be able to accept it, if it did not happen while sending another packet and the client has not disconnected yet. */ std::unique_ptr exception; - bool network_error = false; - bool user_expired = false; - bool query_duration_already_logged = false; - auto log_query_duration = [this, &query_duration_already_logged]() - { - if (query_duration_already_logged) - return; - query_duration_already_logged = true; - auto elapsed_sec = state.watch.elapsedSeconds(); - /// We already logged more detailed info if we read some rows - if (elapsed_sec < 1.0 && state.progress.read_rows) - return; - LOG_DEBUG(log, "Processed in {} sec.", elapsed_sec); - }; + bool close_connection = false; try { @@ -442,213 +462,204 @@ void TCPHandler::runImpl() * There may come settings for a separate query that modify `query_context`. * It's possible to receive part uuids packet before the query, so then receivePacket has to be called twice. */ - if (!receivePacket()) + if (!receivePacketsExpectQuery(query_state)) continue; /** If part_uuids got received in previous packet, trying to read again. */ - if (state.empty() && state.part_uuids_to_ignore && !receivePacket()) + if (part_uuids_to_ignore.has_value() && !receivePacketsExpectQuery(query_state)) continue; + chassert(query_state.has_value()); + /// Set up tracing context for this query on current thread thread_trace_context = std::make_unique("TCPHandler", - query_context->getClientInfo().client_trace_context, - query_context->getSettingsRef(), - query_context->getOpenTelemetrySpanLog()); + query_state->query_context->getClientInfo().client_trace_context, + query_state->query_context->getSettingsRef(), + query_state->query_context->getOpenTelemetrySpanLog()); thread_trace_context->root_span.kind = OpenTelemetry::SpanKind::SERVER; - query_scope.emplace(query_context, /* fatal_error_callback */ [this] + query_scope.emplace(query_state->query_context, /* fatal_error_callback */ [this, &query_state] { std::lock_guard lock(out_mutex); - sendLogs(); + sendLogs(query_state.value()); }); /// If query received, then settings in query_context has been updated. /// So it's better to update the connection settings for flexibility. - extractConnectionSettingsFromContext(query_context); + extractConnectionSettingsFromContext(query_state->query_context); /// Sync timeouts on client and server during current query to avoid dangling queries on server. /// It should be reset at the end of query. - state.timeout_setter = std::make_unique(socket(), send_timeout, receive_timeout); + query_state->timeout_setter = std::make_unique(socket(), send_timeout, receive_timeout); + + SCOPE_EXIT(logQueryDuration(query_state.value())); /// Should we send internal logs to client? - const auto client_logs_level = query_context->getSettingsRef()[Setting::send_logs_level]; + const auto client_logs_level = query_state->query_context->getSettingsRef()[Setting::send_logs_level]; if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_LOGS && client_logs_level != LogsLevel::none) { - state.logs_queue = std::make_shared(); - state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString()); - state.logs_queue->setSourceRegexp(query_context->getSettingsRef()[Setting::send_logs_source_regexp]); - CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level); + query_state->logs_queue = std::make_shared(); + query_state->logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString()); + query_state->logs_queue->setSourceRegexp(query_state->query_context->getSettingsRef()[Setting::send_logs_source_regexp]); + CurrentThread::attachInternalTextLogsQueue(query_state->logs_queue, client_logs_level); } if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) { - state.profile_queue = std::make_shared(std::numeric_limits::max()); - CurrentThread::attachInternalProfileEventsQueue(state.profile_queue); + query_state->profile_queue = std::make_shared(std::numeric_limits::max()); + CurrentThread::attachInternalProfileEventsQueue(query_state->profile_queue); } if (!is_interserver_mode) session->checkIfUserIsStillValid(); - query_context->setExternalTablesInitializer([this] (ContextPtr context) + query_state->query_context->setExternalTablesInitializer([this, &query_state] (ContextPtr context) { - if (context != query_context) + if (context != query_state->query_context) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected context in external tables initializer"); /// Get blocks of temporary tables - readData(); + readData(query_state.value()); /// Reset the input stream, as we received an empty block while receiving external table data. /// So, the stream has been marked as cancelled and we can't read from it anymore. - state.block_in.reset(); - state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker. + query_state->block_in.reset(); + query_state->maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker. }); /// Send structure of columns to client for function input() - query_context->setInputInitializer([this] (ContextPtr context, const StoragePtr & input_storage) + query_state->query_context->setInputInitializer([this, &query_state] (ContextPtr context, const StoragePtr & input_storage) { - if (context != query_context) + + if (context != query_state->query_context) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected context in Input initializer"); auto metadata_snapshot = input_storage->getInMemoryMetadataPtr(); - state.need_receive_data_for_input = true; + query_state->need_receive_data_for_input = true; + + std::lock_guard lock(out_mutex); /// Send ColumnsDescription for input storage. if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA - && query_context->getSettingsRef()[Setting::input_format_defaults_for_omitted_fields]) + && query_state->query_context->getSettingsRef()[Setting::input_format_defaults_for_omitted_fields]) { - sendTableColumns(metadata_snapshot->getColumns()); + sendTableColumns(query_state.value(), metadata_snapshot->getColumns()); } /// Send block to the client - input storage structure. - state.input_header = metadata_snapshot->getSampleBlock(); - sendData(state.input_header); - sendTimezone(); + query_state->input_header = metadata_snapshot->getSampleBlock(); + sendData(query_state.value(), query_state->input_header); + sendTimezone(query_state.value()); }); - query_context->setInputBlocksReaderCallback([this] (ContextPtr context) -> Block + query_state->query_context->setInputBlocksReaderCallback([this, &query_state] (ContextPtr context) -> Block { - if (context != query_context) + if (context != query_state->query_context) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected context in InputBlocksReader"); - if (!readDataNext()) - { - state.block_in.reset(); - state.maybe_compressed_in.reset(); - return Block(); - } - return state.block_for_input; + if (receivePacketsExpectData(query_state.value())) + return query_state->block_for_input; + + query_state->read_all_data = true; + query_state->block_in.reset(); + query_state->maybe_compressed_in.reset(); + return {}; }); - customizeContext(query_context); + customizeContext(query_state->query_context); /// This callback is needed for requesting read tasks inside pipeline for distributed processing - query_context->setReadTaskCallback([this]() -> String + query_state->query_context->setReadTaskCallback([this, &query_state]() -> String { Stopwatch watch; CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::ReadTaskRequestsSent); - std::scoped_lock lock(out_mutex, task_callback_mutex); - - if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) - return {}; - - sendReadTaskRequestAssumeLocked(); + { + std::lock_guard lock(out_mutex); + sendReadTaskRequestAssumeLocked(); + } ProfileEvents::increment(ProfileEvents::ReadTaskRequestsSent); - auto res = receiveReadTaskResponseAssumeLocked(); + auto res = receiveReadTaskResponseAssumeLocked(query_state.value()); ProfileEvents::increment(ProfileEvents::ReadTaskRequestsSentElapsedMicroseconds, watch.elapsedMicroseconds()); return res; }); - query_context->setMergeTreeAllRangesCallback([this](InitialAllRangesAnnouncement announcement) + query_state->query_context->setMergeTreeAllRangesCallback([this, &query_state](InitialAllRangesAnnouncement announcement) { Stopwatch watch; CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeAllRangesAnnouncementsSent); - std::scoped_lock lock(out_mutex, task_callback_mutex); - if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) - return; - - sendMergeTreeAllRangesAnnouncementAssumeLocked(announcement); + std::lock_guard lock(out_mutex); + sendMergeTreeAllRangesAnnouncementAssumeLocked(query_state.value(), announcement); ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSent); ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, watch.elapsedMicroseconds()); }); - query_context->setMergeTreeReadTaskCallback([this](ParallelReadRequest request) -> std::optional + query_state->query_context->setMergeTreeReadTaskCallback([this, &query_state](ParallelReadRequest request) -> std::optional { Stopwatch watch; CurrentMetrics::Increment callback_metric_increment(CurrentMetrics::MergeTreeReadTaskRequestsSent); - std::scoped_lock lock(out_mutex, task_callback_mutex); - if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) - return std::nullopt; + { + std::lock_guard lock(out_mutex); + sendMergeTreeReadTaskRequestAssumeLocked(std::move(request)); + } - sendMergeTreeReadTaskRequestAssumeLocked(std::move(request)); ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsSent); - auto res = receivePartitionMergeTreeReadTaskResponseAssumeLocked(); + auto res = receivePartitionMergeTreeReadTaskResponseAssumeLocked(query_state.value()); ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsSentElapsedMicroseconds, watch.elapsedMicroseconds()); return res; }); /// Processing Query - std::tie(state.parsed_query, state.io) = executeQuery(state.query, query_context, QueryFlags{}, state.stage); + std::tie(query_state->parsed_query, query_state->io) = executeQuery(query_state->query, query_state->query_context, QueryFlags{}, query_state->stage); after_check_cancelled.restart(); after_send_progress.restart(); - auto finish_or_cancel = [this]() - { - if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) - state.io.onCancelOrConnectionLoss(); - else - state.io.onFinish(); - }; - - if (state.io.pipeline.pushing()) + if (query_state->io.pipeline.pushing()) { /// FIXME: check explicitly that insert query suggests to receive data via native protocol, - state.need_receive_data_for_insert = true; - processInsertQuery(); - finish_or_cancel(); + query_state->need_receive_data_for_insert = true; + processInsertQuery(query_state.value()); + query_state->io.onFinish(); } - else if (state.io.pipeline.pulling()) + else if (query_state->io.pipeline.pulling()) { - processOrdinaryQuery(); - finish_or_cancel(); + processOrdinaryQuery(query_state.value()); + query_state->io.onFinish(); } - else if (state.io.pipeline.completed()) + else if (query_state->io.pipeline.completed()) { { - CompletedPipelineExecutor executor(state.io.pipeline); + CompletedPipelineExecutor executor(query_state->io.pipeline); /// Should not check for cancel in case of input. - if (!state.need_receive_data_for_input) + if (!query_state->need_receive_data_for_input) { - auto callback = [this]() + auto callback = [this, &query_state]() { - std::scoped_lock lock(out_mutex, task_callback_mutex); + receivePacketsExpectCancel(query_state.value()); - if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED) - { + if (query_state->stop_read_return_partial_result) return true; - } - - sendProgress(); - sendSelectProfileEvents(); - sendLogs(); + std::lock_guard lock1(out_mutex); + sendProgress(query_state.value()); + sendSelectProfileEvents(query_state.value()); + sendLogs(query_state.value()); return false; }; - executor.setCancelCallback(callback, interactive_delay / 1000); + executor.setCancelCallback(std::move(callback), interactive_delay / 1000); } executor.execute(); } - finish_or_cancel(); + query_state->io.onFinish(); - if (!state.empty() && state.cancellation_status != CancellationStatus::FULLY_CANCELLED) { std::lock_guard lock(out_mutex); @@ -657,53 +668,34 @@ void TCPHandler::runImpl() /// NOTE: we cannot send Progress for regular INSERT (with VALUES) /// without breaking protocol compatibility, but it can be done /// by increasing revision. - sendProgress(); - sendSelectProfileEvents(); + sendProgress(query_state.value()); + sendSelectProfileEvents(query_state.value()); } } else { - finish_or_cancel(); + query_state->io.onFinish(); } /// Do it before sending end of stream, to have a chance to show log message in client. query_scope->logPeakMemoryUsage(); - log_query_duration(); - - if (state.is_connection_closed) - { - state.cancelOut(); - state.reset(); - break; - } { std::lock_guard lock(out_mutex); - sendLogs(); - sendEndOfStream(); + sendLogs(query_state.value()); + sendEndOfStream(query_state.value()); } - if (state.empty() || state.cancellation_status == CancellationStatus::FULLY_CANCELLED) - { - state.cancelOut(); - } - else - { - state.finalizeOut(); - } - - - /// QueryState should be cleared before QueryScope, since otherwise - /// the MemoryTracker will be wrong for possible deallocations. - /// (i.e. deallocations from the Aggregator with two-level aggregation) - /// Also it resets socket's timeouts. - state.reset(); - last_sent_snapshots = ProfileEvents::ThreadIdToCountersSnapshot{}; - query_scope.reset(); - thread_trace_context.reset(); + query_state->finalizeOut(); + } + catch (const NetException & e) + { + LOG_DEBUG(log, "XX NetException: {}", e.what()); + exception.reset(e.clone()); } catch (const Exception & e) { + LOG_DEBUG(log, "XX Exception: {}", e.what()); /// Authentication failure with interserver secret /// - early exit without trying to send the exception to the client. /// Because the server should not try to skip (parse, decompress) the remaining packets sent by the client, @@ -716,41 +708,40 @@ void TCPHandler::runImpl() /// In this case, the user is already authenticated with this server, /// is_interserver_mode is false, and we can send the exception to the client normally. + if (is_interserver_mode && e.code() == ErrorCodes::AUTHENTICATION_FAILED) { - state.cancelOut(); - state.reset(); - throw; + if (!query_state.has_value()) + return; + query_state->io.onException(); + query_state->cancelOut(); + return; } - state.io.onException(); - exception.reset(e.clone()); - - /// In case of exception state was not reset, so socket's timouts must be reset explicitly - state.timeout_setter.reset(); - if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT) { - state.cancelOut(); - state.reset(); - throw; + LOG_DEBUG(log, "XX Exception UNKNOWN_PACKET_FROM_CLIENT: {}", e.what()); + + if (!query_state.has_value()) + return; + query_state->io.onException(); + query_state->cancelOut(); + return; } + exception.reset(e.clone()); + /// If there is UNEXPECTED_PACKET_FROM_CLIENT emulate network_error /// to break the loop, but do not throw to send the exception to /// the client. if (e.code() == ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT) - network_error = true; - - /// If a timeout occurred, try to inform client about it and close the session - if (e.code() == ErrorCodes::SOCKET_TIMEOUT) - network_error = true; + close_connection = true; if (e.code() == ErrorCodes::USER_EXPIRED) - user_expired = true; + close_connection = true; - if (network_error || user_expired) - LOG_TEST(log, "Going to close connection due to exception: {}", e.message()); + // if (e.code() == ErrorCodes::QUERY_WAS_CANCELLED) + // close_connection = true; } catch (const Poco::Net::NetException & e) { @@ -760,12 +751,10 @@ void TCPHandler::runImpl() * Although in one of them, we have to send exception to the client, but in the other - we can not. * We will try to send exception to the client in any case - see below. */ - state.io.onException(); exception = std::make_unique(Exception::CreateFromPocoTag{}, e); } catch (const Poco::Exception & e) { - state.io.onException(); exception = std::make_unique(Exception::CreateFromPocoTag{}, e); } // Server should die on std logic errors in debug, like with assert() @@ -774,7 +763,8 @@ void TCPHandler::runImpl() #ifdef DEBUG_OR_SANITIZER_BUILD catch (const std::logic_error & e) { - state.io.onException(); + if (query_state.has_value()) + query_state->io.onException(); exception = std::make_unique(Exception::CreateFromSTDTag{}, e); sendException(*exception, send_exception_with_stack_trace); std::abort(); @@ -782,94 +772,112 @@ void TCPHandler::runImpl() #endif catch (const std::exception & e) { - state.io.onException(); exception = std::make_unique(Exception::CreateFromSTDTag{}, e); } catch (...) { - state.io.onException(); exception = std::make_unique(Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Unknown exception")); } - /// In case of exception state was not reset, so socket's timouts must be reset explicitly - state.timeout_setter.reset(); + LOG_DEBUG(log, "we have an exception {}", bool(exception)); - try + if (exception) { - if (exception) + bool log_as_error = exception->code() != ErrorCodes::QUERY_WAS_CANCELLED; + + if (!query_state.has_value()) { - if (thread_trace_context) + LOG_DEBUG(log, "we do not have an query state"); + + if (log_as_error) + LOG_INFO(log, getExceptionMessageAndPattern(*exception, send_exception_with_stack_trace)); + else + LOG_ERROR(log, getExceptionMessageAndPattern(*exception, send_exception_with_stack_trace)); + return; + } + + try + { + exception->rethrow(); + } + catch(...) + { + LOG_DEBUG(log, "query_state->io.onException()"); + query_state->io.onException(log_as_error); + } + + if (thread_trace_context) thread_trace_context->root_span.addAttribute(*exception); - try + LOG_DEBUG(log, "is_interserver_mode && !is_interserver_authenticated: {}", is_interserver_mode && !is_interserver_authenticated); + /// Interserver authentication is done only after we read the query. + /// This fact can be abused by producing exception before or while we read the query. + /// To avoid any potential exploits, we simply close connection on any exceptions + /// that happen before the first query is authenticated with the cluster secret. + if (is_interserver_mode && !is_interserver_authenticated) + return; + + try + { + LOG_DEBUG(log, "try send logs"); + /// Try to send logs to client, but it could be risky too + /// Assume that we can't break output here + std::lock_guard lock(out_mutex); + sendLogs(query_state.value()); + + LOG_DEBUG(log, "try skip data"); + /// A query packet is always followed by one or more data packets. + /// If some of those data packets are left, try to skip them. + if (!query_state->read_all_data) + skipData(query_state.value()); + + if (log_as_error) { - /// Try to send logs to client, but it could be risky too - /// Assume that we can't break output here - sendLogs(); + LOG_DEBUG(log, "try send exception"); + sendException(*exception, send_exception_with_stack_trace); } - catch (...) + else { - tryLogCurrentException(log, "Can't send logs to client"); + LOG_DEBUG(log, "try send EndOfStream"); + sendEndOfStream(query_state.value()); } - const auto & e = *exception; - LOG_ERROR(log, getExceptionMessageAndPattern(e, send_exception_with_stack_trace)); - sendException(*exception, send_exception_with_stack_trace); + LOG_DEBUG(log, "Logs and exception has been sent"); + } + catch (...) + { + LOG_DEBUG(log, "failed"); + query_state->cancelOut(); + tryLogCurrentException(log, "Can't send logs or exception to client. Close connection."); + return; + } + + if (close_connection) + { + LOG_DEBUG(log, "Going to close connection due to exception: {}", exception->message()); + query_state->finalizeOut(); + return; } } - catch (...) - { - /** Could not send exception information to the client. */ - network_error = true; - LOG_WARNING(log, "Client has gone away."); - } - /// Interserver authentication is done only after we read the query. - /// This fact can be abused by producing exception before or while we read the query. - /// To avoid any potential exploits, we simply close connection on any exceptions - /// that happen before the first query is authenticated with the cluster secret. - if (is_interserver_mode && exception && !is_interserver_authenticated) - exception->rethrow(); - - try - { - /// A query packet is always followed by one or more data packets. - /// If some of those data packets are left, try to skip them. - if (exception && !state.empty() && !state.read_all_data) - skipData(); - } - catch (...) - { - network_error = true; - LOG_WARNING(log, "Can't skip data packets after query failure."); - } - - log_query_duration(); - - /// QueryState should be cleared before QueryScope, since otherwise - /// the MemoryTracker will be wrong for possible deallocations. - /// (i.e. deallocations from the Aggregator with two-level aggregation) - - state.cancelOut(); - state.reset(); - query_scope.reset(); - thread_trace_context.reset(); - - /// It is important to destroy query context here. We do not want it to live arbitrarily longer than the query. - query_context.reset(); - - if (is_interserver_mode) - { - /// We don't really have session in interserver mode, new one is created for each query. It's better to reset it now. - session.reset(); - } - - if (network_error || user_expired) - break; + query_state->finalizeOut(); } } +void TCPHandler::logQueryDuration(QueryState & state) +{ + if (state.query_duration_already_logged) + return; + state.query_duration_already_logged = true; + auto elapsed_sec = state.watch.elapsedSeconds(); + /// We already logged more detailed info if we read some rows + if (elapsed_sec < 1.0 && state.progress.read_rows) + return; + LOG_DEBUG(log, "Processed in {} sec.", elapsed_sec); +} + + void TCPHandler::extractConnectionSettingsFromContext(const ContextPtr & context) { const auto & settings = context->getSettingsRef(); @@ -885,120 +893,192 @@ void TCPHandler::extractConnectionSettingsFromContext(const ContextPtr & context } -bool TCPHandler::readDataNext() +bool TCPHandler::receivePacketsExpectQuery(std::optional & state) { - Stopwatch watch(CLOCK_MONOTONIC_COARSE); + UInt64 packet_type = 0; + readVarUInt(packet_type, *in); + LOG_DEBUG(log, "receivePacketsExpectQuery got {}", Protocol::Client::toString(packet_type)); + + switch (packet_type) + { + case Protocol::Client::Hello: + processUnexpectedHello(); + + case Protocol::Client::Data: + case Protocol::Client::Scalar: + processUnexpectedData(); + throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Data received from client"); + + case Protocol::Client::Ping: + writeVarUInt(Protocol::Server::Pong, *out); + out->finishChunk(); + out->next(); + return false; + + case Protocol::Client::Cancel: + return false; + + case Protocol::Client::TablesStatusRequest: + processTablesStatusRequest(); + return false; + + case Protocol::Client::IgnoredPartUUIDs: + /// Part uuids packet if any comes before query. + processIgnoredPartUUIDs(); + return true; + + case Protocol::Client::Query: + processQuery(state); + return true; + + default: + throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT, "Unknown packet {} from client", toString(packet_type)); + } + + chassert(server.isCancelled() || !tcp_server.isOpen()); + throw Exception(ErrorCodes::ABORTED, "Server shutdown is called"); +} + + +bool TCPHandler::receivePacketsExpectData(QueryState & state) +{ /// Poll interval should not be greater than receive_timeout constexpr UInt64 min_timeout_us = 5000; // 5 ms - UInt64 timeout_us - = std::max(min_timeout_us, std::min(poll_interval * 1000000, static_cast(receive_timeout.totalMicroseconds()))); - bool read_ok = false; + UInt64 timeout_us = std::max( + min_timeout_us, + std::min( + poll_interval * 1000000, + static_cast(receive_timeout.totalMicroseconds()))); - /// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down. - while (true) + Stopwatch watch; + while (!server.isCancelled() && tcp_server.isOpen()) { - if (in->poll(timeout_us)) + if (!in->poll(timeout_us)) { - /// If client disconnected. - if (in->eof()) + size_t elapsed = size_t(watch.elapsedSeconds()); + if (elapsed > size_t(receive_timeout.totalSeconds())) { - LOG_INFO(log, "Client has dropped the connection, cancel the query."); - state.is_connection_closed = true; - state.cancelOut(); - state.cancellation_status = CancellationStatus::FULLY_CANCELLED; - break; + throw NetException(ErrorCodes::SOCKET_TIMEOUT, + "Timeout exceeded while receiving data from client. Waited for {} seconds, timeout is {} seconds.", + elapsed, receive_timeout.totalSeconds()); } - - /// We accept and process data. - read_ok = receivePacket(); - /// Reset the timeout on Ping packet (NOTE: there is no Ping for INSERT queries yet). - watch.restart(); - break; } - /// Do we need to shut down? - if (server.isCancelled()) - break; + UInt64 packet_type = 0; + readVarUInt(packet_type, *in); - /** Have we waited for data for too long? - * If we periodically poll, the receive_timeout of the socket itself does not work. - * Therefore, an additional check is added. - */ - Float64 elapsed = watch.elapsedSeconds(); - if (elapsed > static_cast(receive_timeout.totalSeconds())) + LOG_DEBUG(log, "receivePacketsExpectData got {}", Protocol::Client::toString(packet_type)); + + switch (packet_type) { - throw Exception(ErrorCodes::SOCKET_TIMEOUT, - "Timeout exceeded while receiving data from client. Waited for {} seconds, timeout is {} seconds.", - static_cast(elapsed), receive_timeout.totalSeconds()); + case Protocol::Client::IgnoredPartUUIDs: + processUnexpectedIgnoredPartUUIDs(); + + case Protocol::Client::Query: + processUnexpectedQuery(); + + case Protocol::Client::Hello: + processUnexpectedHello(); + + case Protocol::Client::TablesStatusRequest: + processUnexpectedTablesStatusRequest(); + + case Protocol::Client::Data: + case Protocol::Client::Scalar: + if (state.skipping_data) + return processUnexpectedData(); + return processData(state, packet_type == Protocol::Client::Scalar); + + case Protocol::Client::Ping: + writeVarUInt(Protocol::Server::Pong, *out); + out->finishChunk(); + out->next(); + continue; + + case Protocol::Client::Cancel: + processCancel(state); + return false; // We return false from this function as if no more data received + + default: + throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT, "Unknown packet {} from client", toString(packet_type)); } } - if (read_ok) - { - sendLogs(); - sendInsertProfileEvents(); - } - else - state.read_all_data = true; - - return read_ok; + chassert(server.isCancelled() || !tcp_server.isOpen()); + throw Exception(ErrorCodes::ABORTED, "Server shutdown is called"); } -void TCPHandler::readData() +void TCPHandler::readData(QueryState & state) { - sendLogs(); + { + std::lock_guard lock(out_mutex); + sendLogs(state); + } - while (readDataNext()) - ; + /// no sence in partial_result_on_first_cancel setting when temporary data is read. + auto off_setting_guard = TurnOffBoolSettingTemporary(state.allow_partial_result_on_first_cancel); - if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) - throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); + while (receivePacketsExpectData(state)) + { + std::lock_guard lock(out_mutex); + sendLogs(state); + sendInsertProfileEvents(state); + } + + state.read_all_data = true; } -void TCPHandler::skipData() +void TCPHandler::skipData(QueryState & state) { state.skipping_data = true; SCOPE_EXIT({ state.skipping_data = false; }); - while (readDataNext()) - ; + while (receivePacketsExpectData(state)) + { + /// no op + } - if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) - throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled"); + state.read_all_data = true; } -void TCPHandler::startInsertQuery() + +void TCPHandler::startInsertQuery(QueryState & state) { /// Send ColumnsDescription for insertion table if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA) { - const auto & table_id = query_context->getInsertionTable(); - if (query_context->getSettingsRef()[Setting::input_format_defaults_for_omitted_fields]) + const auto & table_id = state.query_context->getInsertionTable(); + if (state.query_context->getSettingsRef()[Setting::input_format_defaults_for_omitted_fields]) { if (!table_id.empty()) { - auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, query_context); - sendTableColumns(storage_ptr->getInMemoryMetadataPtr()->getColumns()); + auto storage_ptr = DatabaseCatalog::instance().getTable(table_id, state.query_context); + std::lock_guard lock(out_mutex); + sendTableColumns(state, storage_ptr->getInMemoryMetadataPtr()->getColumns()); } } } - /// Send block to the client - table structure. - sendData(state.io.pipeline.getHeader()); - sendLogs(); + { + std::lock_guard lock(out_mutex); + /// Send block to the client - table structure. + sendData(state, state.io.pipeline.getHeader()); + sendLogs(state); + } } -AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(AsynchronousInsertQueue & insert_queue) + +AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(QueryState & state, AsynchronousInsertQueue & insert_queue) { using PushResult = AsynchronousInsertQueue::PushResult; - startInsertQuery(); - Squashing squashing(state.input_header, 0, query_context->getSettingsRef()[Setting::async_insert_max_data_size]); + startInsertQuery(state); + Squashing squashing(state.input_header, 0, state.query_context->getSettingsRef()[Setting::async_insert_max_data_size]); - while (readDataNext()) + while (receivePacketsExpectData(state)) { squashing.setHeader(state.block_for_insert.cloneEmpty()); auto result_chunk = Squashing::squash(squashing.add({state.block_for_insert.getColumns(), state.block_for_insert.rows()})); @@ -1013,52 +1093,60 @@ AsynchronousInsertQueue::PushResult TCPHandler::processAsyncInsertQuery(Asynchro } } + state.read_all_data = true; + Chunk result_chunk = Squashing::squash(squashing.flush()); if (!result_chunk) { - return insert_queue.pushQueryWithBlock(state.parsed_query, squashing.getHeader(), query_context); + return insert_queue.pushQueryWithBlock(state.parsed_query, squashing.getHeader(), state.query_context); } auto result = squashing.getHeader().cloneWithColumns(result_chunk.detachColumns()); - return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), query_context); + return insert_queue.pushQueryWithBlock(state.parsed_query, std::move(result), state.query_context); } -void TCPHandler::processInsertQuery() + +void TCPHandler::processInsertQuery(QueryState & state) { size_t num_threads = state.io.pipeline.getNumThreads(); auto run_executor = [&](auto & executor, Block processed_data) { - /// Made above the rest of the lines, - /// so that in case of `start` function throws an exception, - /// client receive exception before sending data. - executor.start(); + try + { + /// Made above the rest of the lines, + /// so that in case of `start` function throws an exception, + /// client receive exception before sending data. + executor.start(); - if (processed_data) - executor.push(std::move(processed_data)); - else - startInsertQuery(); + if (processed_data) + executor.push(std::move(processed_data)); + else + startInsertQuery(state); - while (readDataNext()) - executor.push(std::move(state.block_for_insert)); + while (receivePacketsExpectData(state)) + executor.push(std::move(state.block_for_insert)); - if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED) + state.read_all_data = true; + + executor.finish(); + } + catch (...) { executor.cancel(); + throw; } - else - executor.finish(); }; Block processed_block; - const auto & settings = query_context->getSettingsRef(); + const auto & settings = state.query_context->getSettingsRef(); - auto * insert_queue = query_context->tryGetAsynchronousInsertQueue(); + auto * insert_queue = state.query_context->tryGetAsynchronousInsertQueue(); const auto & insert_query = assert_cast(*state.parsed_query); bool async_insert_enabled = settings[Setting::async_insert]; if (insert_query.table_id) - if (auto table = DatabaseCatalog::instance().tryGetTable(insert_query.table_id, query_context)) + if (auto table = DatabaseCatalog::instance().tryGetTable(insert_query.table_id, state.query_context)) async_insert_enabled |= table->areAsynchronousInsertsEnabled(); if (insert_queue && async_insert_enabled && !insert_query.select) @@ -1078,7 +1166,7 @@ void TCPHandler::processInsertQuery() "Deduplication in dependent materialized view cannot work together with async inserts. "\ "Please disable either `deduplicate_blocks_in_dependent_materialized_views` or `async_insert` setting."); - auto result = processAsyncInsertQuery(*insert_queue); + auto result = processAsyncInsertQuery(state, *insert_queue); if (result.status == AsynchronousInsertQueue::PushResult::OK) { /// Reset pipeline because it may hold write lock for some storages. @@ -1097,7 +1185,8 @@ void TCPHandler::processInsertQuery() result.future.get(); } - sendInsertProfileEvents(); + std::lock_guard lock(out_mutex); + sendInsertProfileEvents(state); return; } if (result.status == AsynchronousInsertQueue::PushResult::TOO_MUCH_DATA) @@ -1118,18 +1207,19 @@ void TCPHandler::processInsertQuery() run_executor(executor, std::move(processed_block)); } - sendInsertProfileEvents(); + std::lock_guard lock(out_mutex); + sendInsertProfileEvents(state); } -void TCPHandler::processOrdinaryQuery() +void TCPHandler::processOrdinaryQuery(QueryState & state) { auto & pipeline = state.io.pipeline; - if (query_context->getSettingsRef()[Setting::allow_experimental_query_deduplication]) + if (state.query_context->getSettingsRef()[Setting::allow_experimental_query_deduplication]) { std::lock_guard lock(out_mutex); - sendPartUUIDs(); + sendPartUUIDs(state); } /// Send header-block, to allow client to prepare output format for data to send. @@ -1139,74 +1229,53 @@ void TCPHandler::processOrdinaryQuery() if (header) { std::lock_guard lock(out_mutex); - sendData(header); + sendData(state, header); } } - /// Defer locking to cover a part of the scope below and everything after it - std::unique_lock out_lock(out_mutex, std::defer_lock); - { PullingAsyncPipelineExecutor executor(pipeline); pipeline.setConcurrencyControl(query_context->getSettingsRef()[Setting::use_concurrency_control]); CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread}; - /// The following may happen: - /// * current thread is holding the lock - /// * because of the exception we unwind the stack and call the destructor of `executor` - /// * the destructor calls cancel() and waits for all query threads to finish - /// * at the same time one of the query threads is trying to acquire the lock, e.g. inside `merge_tree_read_task_callback` - /// * deadlock - SCOPE_EXIT({ - if (out_lock.owns_lock()) - out_lock.unlock(); - }); - - Block block; - while (executor.pull(block, interactive_delay / 1000)) + try { - std::unique_lock lock(task_callback_mutex); - - auto cancellation_status = getQueryCancellationStatus(); - if (cancellation_status == CancellationStatus::FULLY_CANCELLED) + Block block; + while (executor.pull(block, interactive_delay / 1000)) { - /// Several callback like callback for parallel reading could be called from inside the pipeline - /// and we have to unlock the mutex from our side to prevent deadlock. - lock.unlock(); - /// A packet was received requesting to stop execution of the request. - executor.cancel(); - break; + receivePacketsExpectCancel(state); + + if (state.stop_read_return_partial_result) + { + executor.cancelReading(); + } + + { + std::lock_guard lock(out_mutex); + + if (after_send_progress.elapsed() / 1000 >= interactive_delay) + { + /// Some time passed and there is a progress. + after_send_progress.restart(); + sendProgress(state); + sendSelectProfileEvents(state); + } + + sendLogs(state); + + if (block) + { + if (!state.io.null_format) + sendData(state, block); + } + } } - if (cancellation_status == CancellationStatus::READ_CANCELLED) - { - executor.cancelReading(); - } - - lock.unlock(); - out_lock.lock(); - - if (after_send_progress.elapsed() / 1000 >= interactive_delay) - { - /// Some time passed and there is a progress. - after_send_progress.restart(); - sendProgress(); - sendSelectProfileEvents(); - } - - sendLogs(); - - if (block) - { - if (!state.io.null_format) - sendData(block); - } - - out_lock.unlock(); } - - /// This lock wasn't acquired before and we make .lock() call here - /// so everything under this line is covered. - out_lock.lock(); + catch (...) + { + executor.cancel(); + throw; + } /** If data has run out, we will send the profiling data and total values to * the last zero block to be able to use @@ -1215,25 +1284,23 @@ void TCPHandler::processOrdinaryQuery() * because we have not read all the data yet, * and there could be ongoing calculations in other threads at the same time. */ - if (getQueryCancellationStatus() != CancellationStatus::FULLY_CANCELLED) + + receivePacketsExpectCancel(state); + { - sendTotals(executor.getTotalsBlock()); - sendExtremes(executor.getExtremesBlock()); - sendProfileInfo(executor.getProfileInfo()); - sendProgress(); - sendLogs(); - sendSelectProfileEvents(); + std::lock_guard lock(out_mutex); + sendTotals(state, executor.getTotalsBlock()); + sendExtremes(state, executor.getExtremesBlock()); + sendProfileInfo(state, executor.getProfileInfo()); + sendProgress(state); + sendLogs(state); + sendSelectProfileEvents(state); + + sendData(state, {}); + + sendProgress(state); } - - if (state.is_connection_closed) - return; - - sendData({}); - last_sent_snapshots.clear(); } - - out_lock.lock(); - sendProgress(); } @@ -1281,8 +1348,8 @@ void TCPHandler::processTablesStatusRequest() } - if (out->isCanceled()) - return; + // if (out->isCanceled()) + // return; writeVarUInt(Protocol::Server::TablesStatusResponse, *out); @@ -1297,30 +1364,33 @@ void TCPHandler::processTablesStatusRequest() response.write(*out, client_tcp_protocol_version); out->finishChunk(); + out->next(); } -void TCPHandler::receiveUnexpectedTablesStatusRequest() + +void TCPHandler::processUnexpectedTablesStatusRequest() { TablesStatusRequest skip_request; skip_request.read(*in, client_tcp_protocol_version); - throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet TablesStatusRequest received from client"); + throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet TablesStatusRequest received from client"); } -void TCPHandler::sendPartUUIDs() + +void TCPHandler::sendPartUUIDs(QueryState & state) { - auto uuids = query_context->getPartUUIDs()->get(); - if (!uuids.empty()) - { - for (const auto & uuid : uuids) - LOG_TRACE(log, "Sending UUID: {}", toString(uuid)); + auto uuids = state.query_context->getPartUUIDs()->get(); + if (uuids.empty()) + return; - writeVarUInt(Protocol::Server::PartUUIDs, *out); - writeVectorBinary(uuids, *out); + for (const auto & uuid : uuids) + LOG_TRACE(log, "Sending UUID: {}", toString(uuid)); - out->finishChunk(); - out->next(); - } + writeVarUInt(Protocol::Server::PartUUIDs, *out); + writeVectorBinary(uuids, *out); + + out->finishChunk(); + out->next(); } @@ -1333,7 +1403,7 @@ void TCPHandler::sendReadTaskRequestAssumeLocked() } -void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement) +void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(QueryState &, InitialAllRangesAnnouncement announcement) { writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out); announcement.serialize(*out, client_parallel_replicas_protocol_version); @@ -1353,7 +1423,7 @@ void TCPHandler::sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest re } -void TCPHandler::sendProfileInfo(const ProfileInfo & info) +void TCPHandler::sendProfileInfo(QueryState &, const ProfileInfo & info) { writeVarUInt(Protocol::Server::ProfileInfo, *out); info.write(*out, client_tcp_protocol_version); @@ -1363,48 +1433,49 @@ void TCPHandler::sendProfileInfo(const ProfileInfo & info) } -void TCPHandler::sendTotals(const Block & totals) +void TCPHandler::sendTotals(QueryState & state, const Block & totals) { - if (totals) - { - initBlockOutput(totals); + if (!totals) + return; - writeVarUInt(Protocol::Server::Totals, *out); - writeStringBinary("", *out); + initBlockOutput(state, totals); - state.block_out->write(totals); - state.maybe_compressed_out->next(); + writeVarUInt(Protocol::Server::Totals, *out); + writeStringBinary("", *out); - out->finishChunk(); - out->next(); - } + state.block_out->write(totals); + state.maybe_compressed_out->next(); + + out->finishChunk(); + out->next(); } -void TCPHandler::sendExtremes(const Block & extremes) +void TCPHandler::sendExtremes(QueryState & state, const Block & extremes) { - if (extremes) - { - initBlockOutput(extremes); + if (!extremes) + return; - writeVarUInt(Protocol::Server::Extremes, *out); - writeStringBinary("", *out); + initBlockOutput(state, extremes); - state.block_out->write(extremes); - state.maybe_compressed_out->next(); + writeVarUInt(Protocol::Server::Extremes, *out); + writeStringBinary("", *out); - out->finishChunk(); - out->next(); - } + state.block_out->write(extremes); + state.maybe_compressed_out->next(); + + out->finishChunk(); + out->next(); } -void TCPHandler::sendProfileEvents() + +void TCPHandler::sendProfileEvents(QueryState & state) { Stopwatch stopwatch; - Block block = ProfileEvents::getProfileEvents(host_name, state.profile_queue, last_sent_snapshots); + Block block = ProfileEvents::getProfileEvents(host_name, state.profile_queue, state.last_sent_snapshots); if (block.rows() != 0) { - initProfileEventsBlockOutput(block); + initProfileEventsBlockOutput(state, block); writeVarUInt(Protocol::Server::ProfileEvents, *out); writeStringBinary("", *out); @@ -1421,30 +1492,33 @@ void TCPHandler::sendProfileEvents() } } -void TCPHandler::sendSelectProfileEvents() + +void TCPHandler::sendSelectProfileEvents(QueryState & state) { if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) return; - sendProfileEvents(); + sendProfileEvents(state); } -void TCPHandler::sendInsertProfileEvents() + +void TCPHandler::sendInsertProfileEvents(QueryState & state) { if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT) return; if (query_kind != ClientInfo::QueryKind::INITIAL_QUERY) return; - sendProfileEvents(); + sendProfileEvents(state); } -void TCPHandler::sendTimezone() + +void TCPHandler::sendTimezone(QueryState & state) { if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_TIMEZONE_UPDATES) return; - const String & tz = query_context->getSettingsRef()[Setting::session_timezone].value; + const String & tz = state.query_context->getSettingsRef()[Setting::session_timezone].value; LOG_DEBUG(log, "TCPHandler::sendTimezone(): {}", tz); writeVarUInt(Protocol::Server::TimezoneUpdate, *out); @@ -1549,6 +1623,7 @@ std::string formatHTTPErrorResponseWhenUserIsConnectedToWrongPort(const Poco::Ut } + std::unique_ptr TCPHandler::makeSession() { auto interface = is_interserver_mode ? ClientInfo::Interface::TCP_INTERSERVER : ClientInfo::Interface::TCP; @@ -1564,6 +1639,7 @@ std::unique_ptr TCPHandler::makeSession() return res; } + void TCPHandler::receiveHello() { /// Receive `hello` packet. @@ -1585,8 +1661,9 @@ void TCPHandler::receiveHello() out->next(); throw Exception(ErrorCodes::CLIENT_HAS_CONNECTED_TO_WRONG_PORT, "Client has connected to wrong port"); } - throw NetException( - ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet from client (expected Hello, got {})", packet_type); + else + throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, + "Unexpected packet from client (expected Hello, got {})", packet_type); } readStringBinary(client_name, *in); @@ -1601,7 +1678,7 @@ void TCPHandler::receiveHello() readStringBinary(password, *in); if (user.empty()) - throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet from client (no user in Hello package)"); + throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet from client (no user in Hello package)"); LOG_DEBUG(log, "Connected {} version {}.{}.{}, revision: {}{}{}.", client_name, @@ -1617,7 +1694,7 @@ void TCPHandler::receiveHello() if (client_tcp_protocol_version < DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2) LOG_WARNING(LogFrequencyLimiter(log, 10), "Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster."); - receiveClusterNameAndSalt(); + processClusterNameAndSalt(); return; } @@ -1715,6 +1792,7 @@ void TCPHandler::receiveHello() session->authenticate(user, password, getClientAddress(client_info)); } + void TCPHandler::receiveAddendum() { if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY) @@ -1734,7 +1812,7 @@ void TCPHandler::receiveAddendum() } -void TCPHandler::receiveUnexpectedHello() +void TCPHandler::processUnexpectedHello() { UInt64 skip_uint_64; String skip_string; @@ -1747,7 +1825,7 @@ void TCPHandler::receiveUnexpectedHello() readStringBinary(skip_string, *in); readStringBinary(skip_string, *in); - throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Hello received from client"); + throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Hello received from client"); } @@ -1793,137 +1871,97 @@ void TCPHandler::sendHello() } -bool TCPHandler::receivePacket() +void TCPHandler::processIgnoredPartUUIDs() { - UInt64 packet_type = 0; - readVarUInt(packet_type, *in); - - switch (packet_type) - { - case Protocol::Client::IgnoredPartUUIDs: - /// Part uuids packet if any comes before query. - if (!state.empty() || state.part_uuids_to_ignore) - receiveUnexpectedIgnoredPartUUIDs(); - receiveIgnoredPartUUIDs(); - return true; - - case Protocol::Client::Query: - if (!state.empty()) - receiveUnexpectedQuery(); - receiveQuery(); - return true; - - case Protocol::Client::Data: - case Protocol::Client::Scalar: - if (state.skipping_data) - return receiveUnexpectedData(false); - if (state.empty()) - receiveUnexpectedData(true); - return receiveData(packet_type == Protocol::Client::Scalar); - - case Protocol::Client::Ping: - writeVarUInt(Protocol::Server::Pong, *out); - out->finishChunk(); - out->next(); - return false; - - case Protocol::Client::Cancel: - decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the query."); - return false; - - case Protocol::Client::Hello: - receiveUnexpectedHello(); - - case Protocol::Client::TablesStatusRequest: - if (!state.empty()) - receiveUnexpectedTablesStatusRequest(); - processTablesStatusRequest(); - out->next(); - return false; - - default: - throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT, "Unknown packet {} from client", toString(packet_type)); - } + readVectorBinary(part_uuids_to_ignore.emplace(), *in); } -void TCPHandler::receiveIgnoredPartUUIDs() -{ - readVectorBinary(state.part_uuids_to_ignore.emplace(), *in); -} - - -void TCPHandler::receiveUnexpectedIgnoredPartUUIDs() +void TCPHandler::processUnexpectedIgnoredPartUUIDs() { std::vector skip_part_uuids; readVectorBinary(skip_part_uuids, *in); - throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet IgnoredPartUUIDs received from client"); + throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet IgnoredPartUUIDs received from client"); } -String TCPHandler::receiveReadTaskResponseAssumeLocked() +String TCPHandler::receiveReadTaskResponseAssumeLocked(QueryState & state) { UInt64 packet_type = 0; readVarUInt(packet_type, *in); - if (packet_type != Protocol::Client::ReadTaskResponse) + + LOG_DEBUG(log, "receiveReadTaskResponseAssumeLocked got {}", Protocol::Client::toString(packet_type)); + + switch (packet_type) { - if (packet_type == Protocol::Client::Cancel) - { - decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the read task."); + case Protocol::Client::Cancel: + processCancel(state); return {}; + + case Protocol::Client::ReadTaskResponse: + { + UInt64 version = 0; + readVarUInt(version, *in); + if (version != DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION) + throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol version for distributed processing mismatched"); + String response; + readStringBinary(response, *in); + return response; } - throw Exception( - ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, - "Received {} packet after requesting read task", - Protocol::Client::toString(packet_type)); + default: + throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Received {} packet after requesting read task", + Protocol::Client::toString(packet_type)); } - UInt64 version; - readVarUInt(version, *in); - if (version != DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol version for distributed processing mismatched"); - String response; - readStringBinary(response, *in); - return response; } -std::optional TCPHandler::receivePartitionMergeTreeReadTaskResponseAssumeLocked() +std::optional TCPHandler::receivePartitionMergeTreeReadTaskResponseAssumeLocked(QueryState & state) { UInt64 packet_type = 0; readVarUInt(packet_type, *in); - if (packet_type != Protocol::Client::MergeTreeReadTaskResponse) + + LOG_DEBUG(log, "receivePartitionMergeTreeReadTaskResponseAssumeLocked got {}", Protocol::Client::toString(packet_type)); + + switch (packet_type) { - if (packet_type == Protocol::Client::Cancel) + case Protocol::Client::Cancel: + processCancel(state); + return {}; + + case Protocol::Client::MergeTreeReadTaskResponse: { - decreaseCancellationStatus("Received 'Cancel' packet from the client, canceling the MergeTree read task."); - return std::nullopt; + ParallelReadResponse response; + response.deserialize(*in); + return response; } - throw Exception( - ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, - "Received {} packet after requesting read task", - Protocol::Client::toString(packet_type)); + default: + throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, + "Received {} packet after requesting read task", + Protocol::Client::toString(packet_type)); } - ParallelReadResponse response; - response.deserialize(*in); - return response; } -void TCPHandler::receiveClusterNameAndSalt() +void TCPHandler::processClusterNameAndSalt() { readStringBinary(cluster, *in); readStringBinary(salt, *in, 32); } -void TCPHandler::receiveQuery() + +void TCPHandler::processQuery(std::optional & state) { UInt64 stage = 0; UInt64 compression = 0; - state.is_empty = false; - readStringBinary(state.query_id, *in); + state.emplace(); + + if (part_uuids_to_ignore.has_value()) + state->part_uuids_to_ignore = std::move(part_uuids_to_ignore); + + readStringBinary(state->query_id, *in); /// In interserver mode, /// initial_user can be empty in case of Distributed INSERT via Buffer/Kafka, @@ -1951,6 +1989,7 @@ void TCPHandler::receiveQuery() auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS : SettingsWriteFormat::BINARY; + Settings passed_settings; passed_settings.read(*in, settings_format); @@ -1962,13 +2001,13 @@ void TCPHandler::receiveQuery() } readVarUInt(stage, *in); - state.stage = QueryProcessingStage::Enum(stage); + state->stage = QueryProcessingStage::Enum(stage); readVarUInt(compression, *in); - state.compression = static_cast(compression); - last_block_in.compression = state.compression; + state->compression = static_cast(compression); + last_block_in.compression = state->compression; - readStringBinary(state.query, *in); + readStringBinary(state->query, *in); Settings passed_params; if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS) @@ -2010,8 +2049,8 @@ void TCPHandler::receiveQuery() if (nonce.has_value()) data += std::to_string(nonce.value()); data += cluster_secret; - data += state.query; - data += state.query_id; + data += state->query; + data += state->query_id; data += client_info.initial_user; std::string calculated_hash = encodeSHA256(data); @@ -2051,17 +2090,19 @@ void TCPHandler::receiveQuery() #endif } - query_context = session->makeQueryContext(client_info); + state->query_context = session->makeQueryContext(client_info); /// Sets the default database if it wasn't set earlier for the session context. if (is_interserver_mode && !default_database.empty()) - query_context->setCurrentDatabase(default_database); + state->query_context->setCurrentDatabase(default_database); - if (state.part_uuids_to_ignore) - query_context->getIgnoredPartUUIDs()->add(*state.part_uuids_to_ignore); + if (state->part_uuids_to_ignore) + state->query_context->getIgnoredPartUUIDs()->add(*state->part_uuids_to_ignore); - query_context->setProgressCallback([this] (const Progress & value) { this->updateProgress(value); }); - query_context->setFileProgressCallback([this](const FileProgress & value) { this->updateProgress(Progress(value)); }); + state->query_context->setProgressCallback( + [this, &state] (const Progress & value) { this->updateProgress(state.value(), value); }); + state->query_context->setFileProgressCallback( + [this, &state](const FileProgress & value) { this->updateProgress(state.value(), Progress(value)); }); /// /// Settings @@ -2076,18 +2117,18 @@ void TCPHandler::receiveQuery() passed_settings.set("allow_experimental_analyzer", false); auto settings_changes = passed_settings.changes(); - query_kind = query_context->getClientInfo().query_kind; + query_kind = state->query_context->getClientInfo().query_kind; if (query_kind == ClientInfo::QueryKind::INITIAL_QUERY) { /// Throw an exception if the passed settings violate the constraints. - query_context->checkSettingsConstraints(settings_changes, SettingSource::QUERY); + state->query_context->checkSettingsConstraints(settings_changes, SettingSource::QUERY); } else { /// Quietly clamp to the constraints if it's not an initial query. - query_context->clampToSettingsConstraints(settings_changes, SettingSource::QUERY); + state->query_context->clampToSettingsConstraints(settings_changes, SettingSource::QUERY); } - query_context->applySettingsChanges(settings_changes); + state->query_context->applySettingsChanges(settings_changes); /// Use the received query id, or generate a random default. It is convenient /// to also generate the default OpenTelemetry trace id at the same time, and @@ -2098,9 +2139,11 @@ void TCPHandler::receiveQuery() /// 2) There is the opentelemetry_start_trace_probability setting that /// controls when we start a new trace. It can be changed via Native protocol, /// so we have to apply the changes first. - query_context->setCurrentQueryId(state.query_id); + state->query_context->setCurrentQueryId(state->query_id); - query_context->addQueryParameters(passed_params.toNameToNameMap()); + state->query_context->addQueryParameters(passed_params.toNameToNameMap()); + + state->allow_partial_result_on_first_cancel = state->query_context->getSettingsRef()[Setting::partial_result_on_first_cancel]; /// For testing hedged requests if (unlikely(sleep_after_receiving_query.totalMilliseconds())) @@ -2110,7 +2153,7 @@ void TCPHandler::receiveQuery() } } -void TCPHandler::receiveUnexpectedQuery() +void TCPHandler::processUnexpectedQuery() { UInt64 skip_uint_64; String skip_string; @@ -2141,12 +2184,12 @@ void TCPHandler::receiveUnexpectedQuery() if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS) skip_settings.read(*in, settings_format); - throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Query received from client"); + throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Query received from client"); } -bool TCPHandler::receiveData(bool scalar) +bool TCPHandler::processData(QueryState & state, bool scalar) { - initBlockInput(); + initBlockInput(state); /// The name of the temporary table for writing data, default to empty string auto temporary_id = StorageID::createEmpty(); @@ -2156,37 +2199,34 @@ bool TCPHandler::receiveData(bool scalar) Block block = state.block_in->read(); if (!block) - { - state.read_all_data = true; return false; - } if (scalar) { /// Scalar value - query_context->addScalar(temporary_id.table_name, block); + state.query_context->addScalar(temporary_id.table_name, block); } else if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input) { /// Data for external tables - auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal); + auto resolved = state.query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal); StoragePtr storage; /// If such a table does not exist, create it. if (resolved) { - storage = DatabaseCatalog::instance().getTable(resolved, query_context); + storage = DatabaseCatalog::instance().getTable(resolved, state.query_context); } else { NamesAndTypesList columns = block.getNamesAndTypesList(); - auto temporary_table = TemporaryTableHolder(query_context, ColumnsDescription{columns}, {}); + auto temporary_table = TemporaryTableHolder(state.query_context, ColumnsDescription{columns}, {}); storage = temporary_table.getTable(); - query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table)); + state.query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table)); } auto metadata_snapshot = storage->getInMemoryMetadataPtr(); /// The data will be written directly to the table. - QueryPipeline temporary_table_out(storage->write(ASTPtr(), metadata_snapshot, query_context, /*async_insert=*/false)); + QueryPipeline temporary_table_out(storage->write(ASTPtr(), metadata_snapshot, state.query_context, /*async_insert=*/false)); PushingPipelineExecutor executor(temporary_table_out); executor.start(); executor.push(block); @@ -2202,11 +2242,12 @@ bool TCPHandler::receiveData(bool scalar) /// INSERT query. state.block_for_insert = block; } + return true; } -bool TCPHandler::receiveUnexpectedData(bool throw_exception) +bool TCPHandler::processUnexpectedData() { String skip_external_table_name; readStringBinary(skip_external_table_name, *in); @@ -2220,16 +2261,11 @@ bool TCPHandler::receiveUnexpectedData(bool throw_exception) auto skip_block_in = std::make_shared(*maybe_compressed_in, client_tcp_protocol_version); bool read_ok = !!skip_block_in->read(); - if (!read_ok) - state.read_all_data = true; - - if (throw_exception) - throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Data received from client"); - return read_ok; } -void TCPHandler::initBlockInput() + +void TCPHandler::initBlockInput(QueryState & state) { if (!state.block_in) { @@ -2255,13 +2291,13 @@ void TCPHandler::initBlockInput() } -void TCPHandler::initBlockOutput(const Block & block) +void TCPHandler::initBlockOutput(QueryState & state, const Block & block) { if (!state.block_out) { state.raw_out = out; - const Settings & query_settings = query_context->getSettingsRef(); + const Settings & query_settings = state.query_context->getSettingsRef(); if (!state.maybe_compressed_out) { std::string method = Poco::toUpper(query_settings[Setting::network_compression_method].toString()); @@ -2294,59 +2330,50 @@ void TCPHandler::initBlockOutput(const Block & block) } } -void TCPHandler::initLogsBlockOutput(const Block & block) + +void TCPHandler::initLogsBlockOutput(QueryState & state, const Block & block) { if (!state.logs_block_out) { /// Use uncompressed stream since log blocks usually contain only one row - const Settings & query_settings = query_context->getSettingsRef(); + const Settings & query_settings = state.query_context->getSettingsRef(); state.logs_block_out = std::make_unique( *out, client_tcp_protocol_version, block.cloneEmpty(), std::nullopt, !query_settings[Setting::low_cardinality_allow_in_native_format]); } } -void TCPHandler::initProfileEventsBlockOutput(const Block & block) +void TCPHandler::initProfileEventsBlockOutput(QueryState & state, const Block & block) { if (!state.profile_events_block_out) { - const Settings & query_settings = query_context->getSettingsRef(); + const Settings & query_settings = state.query_context->getSettingsRef(); state.profile_events_block_out = std::make_unique( *out, client_tcp_protocol_version, block.cloneEmpty(), std::nullopt, !query_settings[Setting::low_cardinality_allow_in_native_format]); } } -void TCPHandler::decreaseCancellationStatus(const std::string & log_message) + +void TCPHandler::processCancel(QueryState & state) { - auto prev_status = magic_enum::enum_name(state.cancellation_status); + LOG_DEBUG(log, "processCancel"); - bool partial_result_on_first_cancel = false; - if (query_context) + if (state.allow_partial_result_on_first_cancel && !state.stop_read_return_partial_result) { - const auto & settings = query_context->getSettingsRef(); - partial_result_on_first_cancel = settings[Setting::partial_result_on_first_cancel]; + state.stop_read_return_partial_result = true; + LOG_INFO(log, "Received 'Cancel' packet from the client, returning partial result."); + return; } - if (partial_result_on_first_cancel && state.cancellation_status == CancellationStatus::NOT_CANCELLED) - { - state.cancellation_status = CancellationStatus::READ_CANCELLED; - } - else - { - state.cancellation_status = CancellationStatus::FULLY_CANCELLED; - } - - auto current_status = magic_enum::enum_name(state.cancellation_status); - LOG_INFO(log, "Change cancellation status from {} to {}. Log message: {}", prev_status, current_status, log_message); + state.read_all_data = true; + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Received 'Cancel' packet from the client, canceling the query."); } -QueryState::CancellationStatus TCPHandler::getQueryCancellationStatus() -{ - if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED || state.sent_all_data) - return CancellationStatus::FULLY_CANCELLED; +void TCPHandler::receivePacketsExpectCancel(QueryState & state) +{ if (after_check_cancelled.elapsed() / 1000 < interactive_delay) - return state.cancellation_status; + return; after_check_cancelled.restart(); @@ -2354,38 +2381,29 @@ QueryState::CancellationStatus TCPHandler::getQueryCancellationStatus() if (in->poll(0)) { if (in->eof()) - { - LOG_INFO(log, "Client has dropped the connection, cancel the query."); - state.cancellation_status = CancellationStatus::FULLY_CANCELLED; - state.is_connection_closed = true; - return CancellationStatus::FULLY_CANCELLED; - } + throw NetException(ErrorCodes::ABORTED, "Client has dropped the connection, cancel the query."); UInt64 packet_type = 0; readVarUInt(packet_type, *in); + LOG_DEBUG(log, "receivePacketsExpectCancel got {}", Protocol::Client::toString(packet_type)); + switch (packet_type) { case Protocol::Client::Cancel: - if (state.empty()) - throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Cancel received from client"); - - decreaseCancellationStatus("Query was cancelled."); - - return state.cancellation_status; + processCancel(state); + break; default: throw NetException(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT, "Unknown packet from client {}", toString(packet_type)); } } - - return state.cancellation_status; } -void TCPHandler::sendData(const Block & block) +void TCPHandler::sendData(QueryState & state, const Block & block) { - initBlockOutput(block); + initBlockOutput(state, block); size_t prev_bytes_written_out = out->count(); size_t prev_bytes_written_compressed_out = state.maybe_compressed_out->count(); @@ -2404,7 +2422,7 @@ void TCPHandler::sendData(const Block & block) writeVarUInt(Protocol::Server::Data, *out); /// For testing hedged requests - if (block.rows() > 0 && query_context->getSettingsRef()[Setting::sleep_in_send_data_ms].totalMilliseconds()) + if (block.rows() > 0 && state.query_context->getSettingsRef()[Setting::sleep_in_send_data_ms].totalMilliseconds()) { /// This strange sequence is needed in case of chunked protocol is enabled, in order for client not to /// hang on receiving of at least packet type - chunk will not be processed unless either chunk footer @@ -2414,7 +2432,7 @@ void TCPHandler::sendData(const Block & block) /// Send external table name (empty name is the main table) writeStringBinary("", *out); out->next(); - std::chrono::milliseconds ms(query_context->getSettingsRef()[Setting::sleep_in_send_data_ms].totalMilliseconds()); + std::chrono::milliseconds ms(state.query_context->getSettingsRef()[Setting::sleep_in_send_data_ms].totalMilliseconds()); std::this_thread::sleep_for(ms); } else @@ -2457,9 +2475,9 @@ void TCPHandler::sendData(const Block & block) } -void TCPHandler::sendLogData(const Block & block) +void TCPHandler::sendLogData(QueryState & state, const Block & block) { - initLogsBlockOutput(block); + initLogsBlockOutput(state, block); if (out->isCanceled()) return; @@ -2474,7 +2492,8 @@ void TCPHandler::sendLogData(const Block & block) out->next(); } -void TCPHandler::sendTableColumns(const ColumnsDescription & columns) + +void TCPHandler::sendTableColumns(QueryState &, const ColumnsDescription & columns) { writeVarUInt(Protocol::Server::TableColumns, *out); @@ -2486,10 +2505,9 @@ void TCPHandler::sendTableColumns(const ColumnsDescription & columns) out->next(); } + void TCPHandler::sendException(const Exception & e, bool with_stack_trace) { - state.io.setAllDataSent(); - if (out->isCanceled()) return; @@ -2501,14 +2519,11 @@ void TCPHandler::sendException(const Exception & e, bool with_stack_trace) } -void TCPHandler::sendEndOfStream() +void TCPHandler::sendEndOfStream(QueryState & state) { state.sent_all_data = true; state.io.setAllDataSent(); - if (out->isCanceled()) - return; - writeVarUInt(Protocol::Server::EndOfStream, *out); out->finishChunk(); @@ -2516,13 +2531,13 @@ void TCPHandler::sendEndOfStream() } -void TCPHandler::updateProgress(const Progress & value) +void TCPHandler::updateProgress(QueryState & state, const Progress & value) { state.progress.incrementPiecewiseAtomically(value); } -void TCPHandler::sendProgress() +void TCPHandler::sendProgress(QueryState & state) { writeVarUInt(Protocol::Server::Progress, *out); auto increment = state.progress.fetchValuesAndResetPiecewiseAtomically(); @@ -2536,7 +2551,7 @@ void TCPHandler::sendProgress() } -void TCPHandler::sendLogs() +void TCPHandler::sendLogs(QueryState & state) { if (!state.logs_queue) return; @@ -2562,7 +2577,7 @@ void TCPHandler::sendLogs() { Block block = InternalTextLogsQueue::getSampleBlock(); block.setColumns(std::move(logs_columns)); - sendLogData(block); + sendLogData(state, block); } } @@ -2572,23 +2587,16 @@ void TCPHandler::run() try { runImpl(); - LOG_DEBUG(log, "Done processing connection."); } - catch (Poco::Exception & e) + catch (...) { - state.cancelOut(); - - /// Timeout - not an error. - if (e.what() == "Timeout"sv) - { - LOG_DEBUG(log, "Poco::Exception. Code: {}, e.code() = {}, e.displayText() = {}, e.what() = {}", ErrorCodes::POCO_EXCEPTION, e.code(), e.displayText(), e.what()); - } - else - throw; + tryLogCurrentException(log, "TCPHandler"); + throw; } } + Poco::Net::SocketAddress TCPHandler::getClientAddress(const ClientInfo & client_info) { /// Extract the last entry from comma separated list of forwarded_for addresses. @@ -2599,4 +2607,5 @@ Poco::Net::SocketAddress TCPHandler::getClientAddress(const ClientInfo & client_ return socket().peerAddress(); } + } diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index a249e4a7bda..42d2ad349c1 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -27,6 +27,7 @@ #include "Server/TCPProtocolStackData.h" #include "Storages/MergeTree/RequestResponse.h" #include "base/types.h" +#include "base/defines.h" namespace CurrentMetrics @@ -53,6 +54,8 @@ struct QueryState /// Identifier of the query. String query_id; + ContextMutablePtr query_context; + QueryProcessingStage::Enum stage = QueryProcessingStage::Complete; Protocol::Compression compression = Protocol::Compression::Disable; @@ -82,18 +85,10 @@ struct QueryState /// Streams of blocks, that are processing the query. BlockIO io; - enum class CancellationStatus: UInt8 - { - FULLY_CANCELLED, - READ_CANCELLED, - NOT_CANCELLED - }; - /// Is request cancelled - CancellationStatus cancellation_status = CancellationStatus::NOT_CANCELLED; - bool is_connection_closed = false; - /// empty or not - bool is_empty = true; + bool allow_partial_result_on_first_cancel = false; + bool stop_read_return_partial_result = false; + /// Data was sent. bool sent_all_data = false; /// Request requires data from the client (INSERT, but not INSERT SELECT). @@ -113,6 +108,9 @@ struct QueryState /// If true, the data packets will be skipped instead of reading. Used to recover after errors. bool skipping_data = false; + bool query_duration_already_logged = false; + + ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots; /// To output progress, the difference after the previous sending of progress. Progress progress; @@ -122,10 +120,7 @@ struct QueryState /// Timeouts setter for current query std::unique_ptr timeout_setter; - void reset() - { - *this = QueryState(); - } + std::mutex mutex; void finalizeOut() { @@ -141,10 +136,7 @@ struct QueryState maybe_compressed_out->cancel(); } - bool empty() const - { - return is_empty; - } + bool isEnanbledPartialResultOnFirstCancel() const; }; @@ -220,9 +212,11 @@ private: Poco::Timespan sleep_after_receiving_query; std::unique_ptr session; - ContextMutablePtr query_context; ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::NO_QUERY; + /// A state got uuids to exclude from a query + std::optional> part_uuids_to_ignore; + /// Streams for reading/writing from/to client connection socket. std::shared_ptr in; std::shared_ptr out; @@ -249,21 +243,12 @@ private: /// `out_mutex` protects `out` (WriteBuffer). /// So it is used for method sendData(), sendProgress(), sendLogs(), etc. std::mutex out_mutex; - /// `task_callback_mutex` protects tasks callbacks. - /// Inside these callbacks we might also change cancellation status, - /// so it also protects cancellation status checks. - std::mutex task_callback_mutex; - - /// At the moment, only one ongoing query in the connection is supported at a time. - QueryState state; /// Last block input parameters are saved to be able to receive unexpected data packet sent after exception. LastBlockInputParameters last_block_in; CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection}; - ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots; - /// It is the name of the server that will be sent to the client. String server_display_name; String host_name; @@ -277,66 +262,67 @@ private: bool receiveProxyHeader(); void receiveHello(); void receiveAddendum(); - bool receivePacket(); - void receiveQuery(); - void receiveIgnoredPartUUIDs(); - String receiveReadTaskResponseAssumeLocked(); - std::optional receivePartitionMergeTreeReadTaskResponseAssumeLocked(); - bool receiveData(bool scalar); - bool readDataNext(); - void readData(); - void skipData(); - void receiveClusterNameAndSalt(); + bool receivePacketsExpectQuery(std::optional & state); + bool receivePacketsExpectData(QueryState & state); + void receivePacketsExpectCancel(QueryState & state); + String receiveReadTaskResponseAssumeLocked(QueryState & state); + std::optional receivePartitionMergeTreeReadTaskResponseAssumeLocked(QueryState & state); - bool receiveUnexpectedData(bool throw_exception = true); - [[noreturn]] void receiveUnexpectedQuery(); - [[noreturn]] void receiveUnexpectedIgnoredPartUUIDs(); - [[noreturn]] void receiveUnexpectedHello(); - [[noreturn]] void receiveUnexpectedTablesStatusRequest(); + //bool receivePacket(std::optional & state); + void processCancel(QueryState & state); + void processQuery(std::optional & state); + void processIgnoredPartUUIDs(); + bool processData(QueryState & state, bool scalar); + void processClusterNameAndSalt(); + + void readData(QueryState & state); + void skipData(QueryState & state); + + bool processUnexpectedData(); + [[noreturn]] void processUnexpectedQuery(); + [[noreturn]] void processUnexpectedIgnoredPartUUIDs(); + [[noreturn]] void processUnexpectedHello(); + [[noreturn]] void processUnexpectedTablesStatusRequest(); /// Process INSERT query - void startInsertQuery(); - void processInsertQuery(); - AsynchronousInsertQueue::PushResult processAsyncInsertQuery(AsynchronousInsertQueue & insert_queue); + void startInsertQuery(QueryState & state); + void processInsertQuery(QueryState & state); + AsynchronousInsertQueue::PushResult processAsyncInsertQuery(QueryState & state, AsynchronousInsertQueue & insert_queue); /// Process a request that does not require the receiving of data blocks from the client - void processOrdinaryQuery(); + void processOrdinaryQuery(QueryState & state); void processTablesStatusRequest(); - void sendHello(); - void sendData(const Block & block); /// Write a block to the network. - void sendLogData(const Block & block); - void sendTableColumns(const ColumnsDescription & columns); - void sendException(const Exception & e, bool with_stack_trace); - void sendProgress(); - void sendLogs(); - void sendEndOfStream(); - void sendPartUUIDs(); - void sendReadTaskRequestAssumeLocked(); - void sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement); - void sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request); - void sendProfileInfo(const ProfileInfo & info); - void sendTotals(const Block & totals); - void sendExtremes(const Block & extremes); - void sendProfileEvents(); - void sendSelectProfileEvents(); - void sendInsertProfileEvents(); - void sendTimezone(); + void sendHello() TSA_REQUIRES(out_mutex); + void sendData(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex); /// Write a block to the network. + void sendLogData(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex); + void sendTableColumns(QueryState & state, const ColumnsDescription & columns)TSA_REQUIRES(out_mutex) ; + void sendException(const Exception & e, bool with_stack_trace) TSA_REQUIRES(out_mutex); + void sendProgress(QueryState & state) TSA_REQUIRES(out_mutex); + void sendLogs(QueryState & state) TSA_REQUIRES(out_mutex); + void sendEndOfStream(QueryState & state) TSA_REQUIRES(out_mutex); + void sendPartUUIDs(QueryState & state) TSA_REQUIRES(out_mutex); + void sendReadTaskRequestAssumeLocked() TSA_REQUIRES(out_mutex); + void sendMergeTreeAllRangesAnnouncementAssumeLocked(QueryState & state, InitialAllRangesAnnouncement announcement) TSA_REQUIRES(out_mutex); + void sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request) TSA_REQUIRES(out_mutex); + void sendProfileInfo(QueryState & state, const ProfileInfo & info) TSA_REQUIRES(out_mutex); + void sendTotals(QueryState & state, const Block & totals) TSA_REQUIRES(out_mutex); + void sendExtremes(QueryState & state, const Block & extremes) TSA_REQUIRES(out_mutex); + void sendProfileEvents(QueryState & state) TSA_REQUIRES(out_mutex); + void sendSelectProfileEvents(QueryState & state) TSA_REQUIRES(out_mutex); + void sendInsertProfileEvents(QueryState & state) TSA_REQUIRES(out_mutex); + void sendTimezone(QueryState & state) TSA_REQUIRES(out_mutex); /// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled. - void initBlockInput(); - void initBlockOutput(const Block & block); - void initLogsBlockOutput(const Block & block); - void initProfileEventsBlockOutput(const Block & block); - - using CancellationStatus = QueryState::CancellationStatus; - - void decreaseCancellationStatus(const std::string & log_message); - CancellationStatus getQueryCancellationStatus(); + void initBlockInput(QueryState & state); + void initBlockOutput(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex); + void initLogsBlockOutput(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex); + void initProfileEventsBlockOutput(QueryState & state, const Block & block) TSA_REQUIRES(out_mutex); /// This function is called from different threads. - void updateProgress(const Progress & value); + void updateProgress(QueryState & state, const Progress & value); + void logQueryDuration(QueryState & state); Poco::Net::SocketAddress getClientAddress(const ClientInfo & client_info); };