From 2c6d3efc615596a723d78383cb3c6512f2f50e1f Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Thu, 15 Jan 2015 15:09:26 +0300 Subject: [PATCH] dbms: Server: queries with several replicas: development [#METR-14410] --- dbms/include/DB/Client/ReplicasConnections.h | 8 ++ .../DB/DataStreams/RemoteBlockInputStream.h | 94 ++++++++++---- dbms/src/Client/ReplicasConnections.cpp | 116 ++++++++++++++++-- 3 files changed, 185 insertions(+), 33 deletions(-) diff --git a/dbms/include/DB/Client/ReplicasConnections.h b/dbms/include/DB/Client/ReplicasConnections.h index c4e1f736db5..ed4403e24ee 100644 --- a/dbms/include/DB/Client/ReplicasConnections.h +++ b/dbms/include/DB/Client/ReplicasConnections.h @@ -36,6 +36,14 @@ namespace DB void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete, const Settings * settings_ = nullptr, bool with_pending_data = false); + void disconnect(); + + void sendCancel(); + + void drainResidualPackets(); + + std::string dumpAddresses() const; + private: using ConnectionHash = std::unordered_map; diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index d65f3a9c62c..250ba1dfde5 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -85,10 +85,20 @@ public: if (sent_query && !was_cancelled && !finished && !got_exception_from_server) { - LOG_TRACE(log, "(" + connection->getServerAddress() + ") Cancelling query"); + std::string addresses; + if (use_many_replicas) + addresses = replicas_connections->dumpAddresses(); + else + addresses = connection->getServerAddress(); + + LOG_TRACE(log, "(" + addresses + ") Cancelling query"); /// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос. - connection->sendCancel(); + if (use_many_replicas) + replicas_connections->sendCancel(); + else + connection->sendCancel(); + was_cancelled = true; } } @@ -100,7 +110,12 @@ public: * чтобы оно не осталось висеть в рассихронизированном состоянии. */ if (sent_query && !finished) - connection->disconnect(); + { + if (use_many_replicas) + replicas_connections->disconnect(); + else + connection->disconnect(); + } } protected: @@ -167,11 +182,27 @@ protected: case Protocol::Server::Exception: got_exception_from_server = true; + + if (use_many_replicas) + { + // Cancel and drain all the remaining connections. + replicas_connections->sendCancel(); + replicas_connections->drainResidualPackets(); + } + packet.exception->rethrow(); break; case Protocol::Server::EndOfStream: finished = true; + + if (use_many_replicas) + { + // Cancel and drain all the remaining connections. + replicas_connections->sendCancel(); + replicas_connections->drainResidualPackets(); + } + return Block(); case Protocol::Server::Progress: @@ -224,36 +255,51 @@ protected: /// Отправим просьбу прервать выполнение запроса, если ещё не отправляли. if (!was_cancelled) { - LOG_TRACE(log, "(" + connection->getServerAddress() + ") Cancelling query because enough data has been read"); + std::string addresses; + if (use_many_replicas) + addresses = replicas_connections->dumpAddresses(); + else + addresses = connection->getServerAddress(); + + LOG_TRACE(log, "(" + addresses + ") Cancelling query because enough data has been read"); was_cancelled = true; - connection->sendCancel(); + + if (use_many_replicas) + replicas_connections->sendCancel(); + else + connection->sendCancel(); } - /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером. - while (true) + if (use_many_replicas) + replicas_connections->drainResidualPackets(); + else { - Connection::Packet packet = connection->receivePacket(); - - switch (packet.type) + /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером. + while (true) { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - break; + Connection::Packet packet = connection->receivePacket(); - case Protocol::Server::EndOfStream: - return; + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + break; - case Protocol::Server::Exception: - got_exception_from_server = true; - packet.exception->rethrow(); - break; + case Protocol::Server::EndOfStream: + return; - default: - throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); + case Protocol::Server::Exception: + got_exception_from_server = true; + packet.exception->rethrow(); + break; + + default: + throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); + } } } diff --git a/dbms/src/Client/ReplicasConnections.cpp b/dbms/src/Client/ReplicasConnections.cpp index 7a19dcc8ff9..2d1110c85c2 100644 --- a/dbms/src/Client/ReplicasConnections.cpp +++ b/dbms/src/Client/ReplicasConnections.cpp @@ -83,13 +83,6 @@ namespace DB { Connection::Packet packet = info.connection->receivePacket(); - if (info.packet_number == next_packet_number) - { - ++info.packet_number; - ++next_packet_number; - return packet; - } - switch (packet.type) { case Protocol::Server::Data: @@ -97,8 +90,6 @@ namespace DB case Protocol::Server::ProfileInfo: case Protocol::Server::Totals: case Protocol::Server::Extremes: - case Protocol::Server::Exception: - ++info.packet_number; break; default: @@ -106,6 +97,15 @@ namespace DB --valid_connections_count; break; } + + if (info.packet_number == next_packet_number) + { + ++info.packet_number; + ++next_packet_number; + return packet; + } + else + ++info.packet_number; } } } @@ -119,4 +119,102 @@ namespace DB connection->sendQuery(query, query_id, stage, settings_, with_pending_data); } } + + void ReplicasConnections::disconnect() + { + for (auto & e : connection_hash) + { + ConnectionInfo & info = e.second; + if (info.is_valid) + { + Connection * connection = info.connection; + connection->disconnect(); + } + } + } + + void ReplicasConnections::sendCancel() + { + for (auto & e : connection_hash) + { + ConnectionInfo & info = e.second; + if (info.is_valid) + { + Connection * connection = info.connection; + connection->sendCancel(); + } + } + } + + void ReplicasConnections::drainResidualPackets() + { + bool caught_exceptions = false; + + for (auto & e : connection_hash) + { + ConnectionInfo & info = e.second; + if (info.is_valid) + { + Connection * connection = info.connection; + bool again = true; + + while (again) + { + Connection::Packet packet = connection->receivePacket(); + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + break; + + case Protocol::Server::EndOfStream: + again = false; + continue; + + case Protocol::Server::Exception: + // Accumulate info from packet.exception + caught_exceptions = true; + again = false; + continue; + + default: + // Accumulate info (server address) + caught_exceptions = true; + again = false; + continue; + } + } + } + } + + if (caught_exceptions) + { + } + } + + std::string ReplicasConnections::dumpAddresses() const + { + if (valid_connections_count == 0) + return ""; + + std::ostringstream os; + for (auto & e : connection_hash) + { + char prefix = '\0'; + const ConnectionInfo & info = e.second; + if (info.is_valid) + { + const Connection * connection = info.connection; + os << prefix << connection->getServerAddress(); + if (prefix == '\0') + prefix = ';'; + } + } + + return os.str(); + } }