From 7fdc23c5f237adbe5c10f9fc2e8fc1c561d12043 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Wed, 25 Feb 2015 15:43:06 +0300 Subject: [PATCH] dbms: Server: fix race conditions [#METR-14410] --- .../DB/DataStreams/RemoteBlockInputStream.h | 78 ++++++++++--------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 18803fea6f8..f134781ee89 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -22,6 +22,12 @@ class RemoteBlockInputStream : public IProfilingBlockInputStream private: void init(const Settings * settings_) { + sent_query.store(false, std::memory_order_release); + finished.store(false, std::memory_order_release); + got_exception_from_replica.store(false, std::memory_order_release); + got_unknown_packet_from_replica.store(false, std::memory_order_release); + was_cancelled.store(false, std::memory_order_release); + if (settings_) { send_settings = true; @@ -83,14 +89,13 @@ public: if (!__sync_bool_compare_and_swap(&is_cancelled, false, true)) return; - if (isQueryInProgress() && !hasThrownException()) + if (hasNoQueryInProgress() || hasThrownException()) + return; + + if (tryCancel()) { std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query"); - - /// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос. - was_cancelled = true; - parallel_replicas->sendCancel(); } } @@ -136,13 +141,14 @@ protected: Block readImpl() override { - if (!sent_query) + if (!sent_query.load(std::memory_order_acquire)) { createParallelReplicas(); - setEstablishedState(); + established = true; parallel_replicas->sendQuery(query, "", stage, true); + established = false; + sent_query.store(true, std::memory_order_release); sendExternalTables(); - setSentQueryState(); } while (true) @@ -158,14 +164,14 @@ protected: break; /// Если блок пустой - получим другие пакеты до EndOfStream. case Protocol::Server::Exception: - got_exception_from_replica = true; + got_exception_from_replica.store(true, std::memory_order_release); packet.exception->rethrow(); break; case Protocol::Server::EndOfStream: if (!parallel_replicas->hasActiveReplicas()) { - finished = true; + finished.store(true, std::memory_order_release); return Block(); } break; @@ -197,7 +203,7 @@ protected: break; default: - got_unknown_packet_from_replica = true; + got_unknown_packet_from_replica.store(true, std::memory_order_release); throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } @@ -220,13 +226,10 @@ protected: */ /// Отправим просьбу прервать выполнение запроса, если ещё не отправляли. - if (!was_cancelled) + if (tryCancel()) { std::string addresses = parallel_replicas->dumpAddresses(); LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read"); - - was_cancelled = true; - parallel_replicas->sendCancel(); } /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами. @@ -234,16 +237,16 @@ protected: switch (packet.type) { case Protocol::Server::EndOfStream: - finished = true; + finished.store(true, std::memory_order_release); break; case Protocol::Server::Exception: - got_exception_from_replica = true; + got_exception_from_replica.store(true, std::memory_order_release); packet.exception->rethrow(); break; default: - got_unknown_packet_from_replica = true; + got_unknown_packet_from_replica.store(true, std::memory_order_release); throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } } @@ -261,19 +264,19 @@ protected: /// Возвращает true, если запрос отправлен, а ещё не выполнен. bool isQueryInProgress() const { - return sent_query && !finished && !was_cancelled; + return sent_query.load(std::memory_order_acquire) && !finished.load(std::memory_order_acquire) && !was_cancelled.load(std::memory_order_acquire); } /// Возвращает true, если никакой запрос не отправлен или один запрос уже выполнен. bool hasNoQueryInProgress() const { - return !sent_query || finished; + return !sent_query.load(std::memory_order_acquire) || finished.load(std::memory_order_acquire); } /// Возвращает true, если исключение было выкинуто. bool hasThrownException() const { - return got_exception_from_replica || got_unknown_packet_from_replica; + return got_exception_from_replica.load(std::memory_order_acquire) || got_unknown_packet_from_replica.load(std::memory_order_acquire); } private: @@ -284,17 +287,18 @@ private: return instance; } - /// Перейти в состояние established (т.е. соединения с репликами установлены). - void NO_INLINE setEstablishedState() + /// Отправить запрос на отмену всех соединений к репликам, если такой запрос ещё не был отправлен. + bool tryCancel() { - established = true; - } - - /// Перейти в состояние sent_query (т.е. запрос отправлен). - void NO_INLINE setSentQueryState() - { - established = false; - sent_query = true; + bool old_val = false; + bool new_val = true; + if (was_cancelled.compare_exchange_strong(old_val, new_val, std::memory_order_release, std::memory_order_relaxed)) + { + parallel_replicas->sendCancel(); + return true; + } + else + return false; } private: @@ -315,33 +319,33 @@ private: Context context; /// Установили соединения с репликами, но ещё не отправили запрос. - bool established = false; + volatile bool established = false; /// Отправили запрос (это делается перед получением первого блока). - bool sent_query = false; + std::atomic sent_query; /** Получили все данные от всех реплик, до пакета EndOfStream. * Если при уничтожении объекта, ещё не все данные считаны, * то для того, чтобы не было рассинхронизации, на реплики отправляются просьбы прервать выполнение запроса, * и после этого считываются все пакеты до EndOfStream. */ - bool finished = false; + std::atomic finished; /** На каждую реплику была отправлена просьба прервать выполнение запроса, так как данные больше не нужны. * Это может быть из-за того, что данных достаточно (например, при использовании LIMIT), * или если на стороне клиента произошло исключение. */ - bool was_cancelled = false; + std::atomic was_cancelled; /** С одной репилки было получено исключение. В этом случае получать больше пакетов или * просить прервать запрос на этой реплике не нужно. */ - bool got_exception_from_replica = false; + std::atomic got_exception_from_replica; /** С одной реплики был получен неизвестный пакет. В этом случае получать больше пакетов или * просить прервать запрос на этой реплике не нужно. */ - bool got_unknown_packet_from_replica = false; + std::atomic got_unknown_packet_from_replica; Logger * log = &Logger::get("RemoteBlockInputStream"); };