From 77f262f8a833fe38c80452a2807ea9e2e4de3825 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Mon, 9 Feb 2015 13:51:36 +0300 Subject: [PATCH] dbms: Server: queries with several replicas: fixes [#METR-14410] --- .../DB/DataStreams/RemoteBlockInputStream.h | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 2d281eb9675..5d37b757df3 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -82,14 +82,14 @@ public: if (!__sync_bool_compare_and_swap(&is_cancelled, false, true)) return; - if (isInProgress() && !hasThrownException() && !was_cancelled) + if (isQueryInProgress() && !hasThrownException()) { std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query"); /// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос. - parallel_replicas->sendCancel(); was_cancelled = true; + parallel_replicas->sendCancel(); } } @@ -100,8 +100,14 @@ public: * все соединения, затем читаем и пропускаем оставшиеся пакеты чтобы * эти соединения не остались висеть в рассихронизированном состоянии. */ - if (isInProgress()) - abort(); + if (isQueryInProgress()) + { + std::string addresses = parallel_replicas->dumpAddresses(); + LOG_TRACE(log, "(" + addresses + ") Aborting query"); + + parallel_replicas->sendCancel(); + (void) parallel_replicas->drain(); + } } protected: @@ -177,7 +183,7 @@ protected: */ progressImpl(packet.progress); - if (!was_cancelled && isInProgress() && isCancelled()) + if (isQueryInProgress() && isCancelled()) cancel(); break; @@ -210,7 +216,7 @@ protected: * - получили с одной реплики неизвестный пакет; * - то больше читать ничего не нужно. */ - if (!isInProgress() || hasThrownException()) + if (hasNoQueryInProgress() || hasThrownException()) return; /** Если ещё прочитали не все данные, но они больше не нужны. @@ -256,22 +262,16 @@ protected: parallel_replicas = ext::make_unique(pool, parallel_replicas_settings); } - /** Отменить запросы на всех репликах. Читать и пропускать все оставшиеся пакеты - * до EndOfStream или Exception, чтоб не было рассинхронизации в соединениях с репликами. - */ - void abort() + /// Возвращает true, если запрос отправлен, а ещё не выполнен. + bool isQueryInProgress() const { - std::string addresses = parallel_replicas->dumpAddresses(); - LOG_TRACE(log, "(" + addresses + ") Aborting query"); - - parallel_replicas->sendCancel(); - (void) parallel_replicas->drain(); + return sent_query && !finished && !was_cancelled; } - /// Возвращает true, если запрос отправлен, а ещё не выполнен. - bool isInProgress() const + /// Возвращает true, если никакой запрос не отправлен или один запрос уже выполнен. + bool hasNoQueryInProgress() const { - return sent_query && !finished; + return !sent_query || finished; } /// Возвращает true, если исключение было выкинуто.