From bf7e62ff56dbd3836dbad437b6f59fb86984b61e Mon Sep 17 00:00:00 2001 From: avogar Date: Thu, 23 Mar 2023 19:52:37 +0000 Subject: [PATCH] Refactor RemoteQueryExecutor, make it more thread safe --- src/Client/ConnectionPoolWithFailover.cpp | 22 +++++- src/Client/HedgedConnections.cpp | 2 +- src/Client/HedgedConnectionsFactory.cpp | 2 +- src/QueryPipeline/RemoteQueryExecutor.cpp | 82 +++++++++++++---------- src/QueryPipeline/RemoteQueryExecutor.h | 19 +++--- 5 files changed, 79 insertions(+), 48 deletions(-) diff --git a/src/Client/ConnectionPoolWithFailover.cpp b/src/Client/ConnectionPoolWithFailover.cpp index 1f850a46ab6..f1bbe9de22b 100644 --- a/src/Client/ConnectionPoolWithFailover.cpp +++ b/src/Client/ConnectionPoolWithFailover.cpp @@ -214,8 +214,28 @@ ConnectionPoolWithFailover::tryGetEntry( const QualifiedTableName * table_to_check, AsyncCallback async_callback) { + if (async_callback) + { + ConnectionEstablisherAsync connection_establisher_async(&pool, &timeouts, settings, log, table_to_check); + while (true) + { + connection_establisher_async.resume(); + if (connection_establisher_async.isFinished()) + break; + + async_callback( + connection_establisher_async.getFileDescriptor(), + 0, + AsyncEventTimeoutType::NONE, + "Connection establisher file descriptor", + AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR); + } + + fail_message = connection_establisher_async.getFailMessage(); + return connection_establisher_async.getResult(); + } + ConnectionEstablisher connection_establisher(&pool, &timeouts, settings, log, table_to_check); - connection_establisher.setAsyncCallback(std::move(async_callback)); TryResult result; connection_establisher.run(result, fail_message); return result; diff --git a/src/Client/HedgedConnections.cpp b/src/Client/HedgedConnections.cpp index 8b4b3bbfabc..fe3acd7cc7b 100644 --- a/src/Client/HedgedConnections.cpp +++ b/src/Client/HedgedConnections.cpp @@ -388,7 +388,7 @@ int HedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback) { events_count = epoll.getManyReady(1, &event, blocking); if (!events_count && async_callback) - async_callback(epoll.getFileDescriptor(), 0, AsyncEventTimeoutType::NONE, epoll.getDescription(), EPOLLIN | EPOLLPRI | EPOLLERR); + async_callback(epoll.getFileDescriptor(), 0, AsyncEventTimeoutType::NONE, epoll.getDescription(), AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR); } return event.data.fd; } diff --git a/src/Client/HedgedConnectionsFactory.cpp b/src/Client/HedgedConnectionsFactory.cpp index c188bebaee0..eb2a33b1ccc 100644 --- a/src/Client/HedgedConnectionsFactory.cpp +++ b/src/Client/HedgedConnectionsFactory.cpp @@ -266,7 +266,7 @@ int HedgedConnectionsFactory::getReadyFileDescriptor(bool blocking, AsyncCallbac { events_count = epoll.getManyReady(1, &event, !static_cast(async_callback)); if (!events_count && async_callback) - async_callback(epoll.getFileDescriptor(), 0, AsyncEventTimeoutType::NONE, epoll.getDescription(), EPOLLIN | EPOLLPRI | EPOLLERR); + async_callback(epoll.getFileDescriptor(), 0, AsyncEventTimeoutType::NONE, epoll.getDescription(), AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR); } return event.data.fd; } diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index eab1bb8bd7e..619bdadca34 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -245,10 +245,6 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As established = true; - /// Query could be cancelled during creating connections. No need to send a query. - if (was_cancelled) - return; - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); ClientInfo modified_client_info = context->getClientInfo(); modified_client_info.query_kind = query_kind; @@ -271,11 +267,12 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As int RemoteQueryExecutor::sendQueryAsync() { + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return -1; + if (!read_context) - { - std::lock_guard lock(was_cancelled_mutex); read_context = std::make_unique(*this, /*suspend_when_query_sent*/ true); - } /// If query already sent, do nothing. Note that we cannot use sent_query flag here, /// because we can still be in process of sending scalars or external tables. @@ -322,8 +319,10 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read() return anything; if (got_duplicated_part_uuids) - return restartQueryWithoutDuplicatedUUIDs(); + break; } + + return restartQueryWithoutDuplicatedUUIDs(); } RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() @@ -332,38 +331,38 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() if (!read_context || (resent_query && recreate_read_context)) { std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return ReadResult(Block()); + read_context = std::make_unique(*this); recreate_read_context = false; } while (true) { + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return ReadResult(Block()); + read_context->resume(); if (needToSkipUnavailableShard()) return ReadResult(Block()); - if (read_context->isCancelled()) - return ReadResult(Block()); - /// Check if packet is not ready yet. if (read_context->isInProgress()) return ReadResult(read_context->getFileDescriptor()); - /// We need to check that query was not cancelled again, - /// to avoid the race between cancel() thread and read() thread. - /// (since cancel() thread will steal the fiber and may update the packet). - if (was_cancelled) - return ReadResult(Block()); - auto anything = processPacket(read_context->getPacket()); if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken) return anything; if (got_duplicated_part_uuids) - return restartQueryWithoutDuplicatedUUIDs(); + break; } + + return restartQueryWithoutDuplicatedUUIDs(); #else return read(); #endif @@ -372,13 +371,19 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() RemoteQueryExecutor::ReadResult RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs() { - /// Cancel previous query and disconnect before retry. - cancel(); - connections->disconnect(); - - /// Only resend once, otherwise throw an exception - if (!resent_query) { + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return ReadResult(Block()); + + /// Cancel previous query and disconnect before retry. + cancelUnlocked(); + connections->disconnect(); + + /// Only resend once, otherwise throw an exception + if (resent_query) + throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate uuids while processing query"); + if (log) LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts"); @@ -386,13 +391,14 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::restartQueryWithoutDuplicat recreate_read_context = true; sent_query = false; got_duplicated_part_uuids = false; - /// Consecutive read will implicitly send query first. - if (!read_context) - return read(); - else - return readAsync(); + was_cancelled = false; } - throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate uuids while processing query"); + + /// Consecutive read will implicitly send query first. + if (!read_context) + return read(); + else + return readAsync(); } RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet) @@ -534,6 +540,8 @@ void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRan void RemoteQueryExecutor::finish() { + std::lock_guard guard(was_cancelled_mutex); + /** If one of: * - nothing started to do; * - received all packets before EndOfStream; @@ -590,6 +598,12 @@ void RemoteQueryExecutor::finish() } void RemoteQueryExecutor::cancel() +{ + std::lock_guard guard(was_cancelled_mutex); + cancelUnlocked(); +} + +void RemoteQueryExecutor::cancelUnlocked() { { std::lock_guard lock(external_tables_mutex); @@ -676,10 +690,6 @@ void RemoteQueryExecutor::sendExternalTables() void RemoteQueryExecutor::tryCancel(const char * reason) { - /// Flag was_cancelled is atomic because it is checked in read(), - /// in case of packet had been read by fiber (async_socket_for_remote). - std::lock_guard guard(was_cancelled_mutex); - if (was_cancelled) return; @@ -688,8 +698,8 @@ void RemoteQueryExecutor::tryCancel(const char * reason) if (read_context) read_context->cancel(); - /// Query could be cancelled during connection creation, we should check - /// if connections were already created. + /// Query could be cancelled during connection creation or query sending, + /// we should check if connections were already created and query were sent. if (connections && sent_query) { connections->sendCancel(); diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 2e731f9ed42..d3e05b2ae31 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -94,8 +94,8 @@ public: int sendQueryAsync(); /// Query is resent to a replica, the query itself can be modified. - std::atomic resent_query { false }; - std::atomic recreate_read_context { false }; + bool resent_query { false }; + bool recreate_read_context { false }; struct ReadResult { @@ -228,39 +228,39 @@ private: std::mutex external_tables_mutex; /// Connections to replicas are established, but no queries are sent yet - std::atomic established { false }; + bool established = false; /// Query is sent (used before getting first block) - std::atomic sent_query { false }; + bool sent_query { false }; /** All data from all replicas are received, before EndOfStream packet. * To prevent desynchronization, if not all data is read before object * destruction, it's required to send cancel query request to replicas and * read all packets before EndOfStream */ - std::atomic finished { false }; + bool finished = false; /** Cancel query request was sent to all replicas because data is not needed anymore * This behaviour may occur when: * - data size is already satisfactory (when using LIMIT, for example) * - an exception was thrown from client side */ - std::atomic was_cancelled { false }; + bool was_cancelled = false; std::mutex was_cancelled_mutex; /** An exception from replica was received. No need in receiving more packets or * requesting to cancel query execution */ - std::atomic got_exception_from_replica { false }; + bool got_exception_from_replica = false; /** Unknown packet was received from replica. No need in receiving more packets or * requesting to cancel query execution */ - std::atomic got_unknown_packet_from_replica { false }; + bool got_unknown_packet_from_replica = false; /** Got duplicated uuids from replica */ - std::atomic got_duplicated_part_uuids{ false }; + bool got_duplicated_part_uuids = false; /// Parts uuids, collected from remote replicas std::mutex duplicated_part_uuids_mutex; @@ -291,6 +291,7 @@ private: ReadResult restartQueryWithoutDuplicatedUUIDs(); /// If wasn't sent yet, send request to cancel all connections to replicas + void cancelUnlocked(); void tryCancel(const char * reason); /// Returns true if query was sent