From 8c0a540350d579ccd78555b7915dc3500a4d332f Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Mon, 9 Feb 2015 00:34:43 +0300 Subject: [PATCH] dbms: Server: queries with several replicas: fixes [#METR-14410] --- dbms/include/DB/Client/ParallelReplicas.h | 3 -- .../DB/DataStreams/RemoteBlockInputStream.h | 48 +++++++++++-------- dbms/src/Client/ParallelReplicas.cpp | 13 ----- 3 files changed, 28 insertions(+), 36 deletions(-) diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index a13de8a7baf..47b4ade5a1f 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -29,9 +29,6 @@ namespace DB /// Получить пакет от какой-нибудь реплики. Connection::Packet receivePacket(); - /// Разорвать соединения к репликам - void disconnect(); - /// Отменить запросы к репликам void sendCancel(); diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index f7b2e0ae4e3..2d281eb9675 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -82,7 +82,7 @@ public: if (!__sync_bool_compare_and_swap(&is_cancelled, false, true)) return; - if (sent_query && !was_cancelled && !finished && !got_exception_from_replica) + if (isInProgress() && !hasThrownException() && !was_cancelled) { std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query"); @@ -96,11 +96,12 @@ public: ~RemoteBlockInputStream() override { - /** Если прервались в середине цикла общения с репликами, то закрываем соединения, - * чтобы они не остались висеть в рассихронизированном состоянии. + /** Если прервались в середине цикла общения с репликами, то прервываем + * все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы + * эти соединения не остались висеть в рассихронизированном состоянии. */ - if (sent_query && !finished) - parallel_replicas->disconnect(); + if (isInProgress()) + abort(); } protected: @@ -156,7 +157,6 @@ protected: case Protocol::Server::Exception: got_exception_from_replica = true; - abort(); packet.exception->rethrow(); break; @@ -177,7 +177,7 @@ protected: */ progressImpl(packet.progress); - if (!was_cancelled && !finished && isCancelled()) + if (!was_cancelled && isInProgress() && isCancelled()) cancel(); break; @@ -196,7 +196,6 @@ protected: default: got_unknown_packet_from_replica = true; - abort(); throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } @@ -207,10 +206,11 @@ protected: /** Если одно из: * - ничего не начинали делать; * - получили все пакеты до EndOfStream; - * - получили с сервера эксепшен; + * - получили с одной реплики эксепшен; + * - получили с одной реплики неизвестный пакет; * - то больше читать ничего не нужно. */ - if (!sent_query || finished || got_exception_from_replica) + if (!isInProgress() || hasThrownException()) return; /** Если ещё прочитали не все данные, но они больше не нужны. @@ -256,20 +256,28 @@ protected: parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); } - /** Если с одной реплики был получен пакет Exception или неизвестный пакет, отменить запросы на всех - * остальных репликах. Читать и пропускать все оставшиеся пакеты до EndOfStream или Exception, чтобы - * не было рассинхронизации в соединениях с репликами. + /** Отменить запросы на всех репликах. Читать и пропускать все оставшиеся пакеты + * до EndOfStream или Exception, чтоб не было рассинхронизации в соединениях с репликами. */ void abort() { - if (got_exception_from_replica || got_unknown_packet_from_replica) - { - std::string addresses = parallel_replicas->dumpAddresses(); - LOG_TRACE(log, "(" + addresses + ") Aborting query"); + std::string addresses = parallel_replicas->dumpAddresses(); + LOG_TRACE(log, "(" + addresses + ") Aborting query"); - parallel_replicas->sendCancel(); - (void) parallel_replicas->drain(); - } + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); + } + + /// Возвращает true, если запрос отправлен, а ещё не выполнен. + bool isInProgress() const + { + return sent_query && !finished; + } + + /// Возвращает true, если исключение было выкинуто. + bool hasThrownException() const + { + return got_exception_from_replica || got_unknown_packet_from_replica; } private: diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index 24624ba0585..a1a0dafe43e 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -133,19 +133,6 @@ namespace DB return packet; } - void ParallelReplicas::disconnect() - { - for (auto it = replica_map.begin(); it != replica_map.end(); ++it) - { - Connection * connection = it->second; - if (connection != nullptr) - { - connection->disconnect(); - invalidateReplica(it); - } - } - } - void ParallelReplicas::sendCancel() { if (!sent_query || cancelled)