diff --git a/dbms/include/DB/Client/ShardReplicas.h b/dbms/include/DB/Client/ShardReplicas.h index aac66856166..41d1ad61d06 100644 --- a/dbms/include/DB/Client/ShardReplicas.h +++ b/dbms/include/DB/Client/ShardReplicas.h @@ -18,38 +18,35 @@ namespace DB ShardReplicas(const ShardReplicas &) = delete; ShardReplicas & operator=(const ShardReplicas &) = delete; - /// Получить пакет от какой-нибудь реплики. - Connection::Packet receivePacket(); + /// Отправить на реплики всё содержимое внешних таблиц. + void sendExternalTablesData(std::vector & data); - /// Отправить запрос ко всем репликам. + /// Отправить запрос на реплики. void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false); - /// Разорвать соединения ко всем репликам + /// Получить пакет от какой-нибудь реплики. + Connection::Packet receivePacket(); + + /// Разорвать соединения к репликам void disconnect(); - /// Отменить запросы у всех реплик + /// Отменить запросы к репликам void sendCancel(); - /// Для каждой реплики получить оставшиеся пакеты при отмене запроса. - void drainResidualPackets(); + /// Для каждой реплики получить оставшиеся пакеты после отмена запроса. + Connection::Packet drain(); - /// Получить адреса всех реплик в виде строки. + /// Получить адреса реплик в виде строки. std::string dumpAddresses() const; /// Возвращает количесто реплик. - size_t size() const - { - return replica_hash.size(); - } - - /// Отправить ко всем репликам всё содержимое внешних таблиц. - void sendExternalTablesData(std::vector & data); + size_t size() const { return replica_hash.size(); } private: /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. /// Возвращает соединение на реплику, с которой можно прочитать данные, если такая есть. - Connection * waitForReadEvent(); + Connection ** waitForReadEvent(); private: /// Реплики хэшированные по id сокета @@ -58,5 +55,8 @@ namespace DB private: const Settings & settings; ReplicaHash replica_hash; + size_t active_connection_count = 0; + bool sent_query = false; + bool cancelled = false; }; } diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 0a58abf3a2f..a55d931c4a6 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -158,9 +158,10 @@ protected: { auto entries = pool->getMany(&settings); if (entries.size() > 1) - shard_replicas.reset(new ShardReplicas(entries, settings)); - else if (entries.size() == 1) + shard_replicas = ext::make_unique(entries, settings); + else { + /// NOTE IConnectionPool::getMany() всегда возвращает как минимум одно соединение. use_many_replicas = false; connection = &*entries[0]; } @@ -272,7 +273,22 @@ protected: } if (use_many_replicas) - shard_replicas->drainResidualPackets(); + { + Connection::Packet packet = shard_replicas->drain(); + switch (packet.type) + { + case Protocol::Server::EndOfStream: + break; + + case Protocol::Server::Exception: + got_exception_from_server = true; + packet.exception->rethrow(); + break; + + default: + throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); + } + } else { /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером. diff --git a/dbms/src/Client/ShardReplicas.cpp b/dbms/src/Client/ShardReplicas.cpp index b961e2861f0..9c8510aa44f 100644 --- a/dbms/src/Client/ShardReplicas.cpp +++ b/dbms/src/Client/ShardReplicas.cpp @@ -3,7 +3,8 @@ namespace DB { ShardReplicas::ShardReplicas(std::vector & entries, const Settings & settings_) : - settings(settings_) + settings(settings_), + active_connection_count(entries.size()) { replica_hash.reserve(entries.size()); @@ -14,18 +15,187 @@ namespace DB } } - Connection * ShardReplicas::waitForReadEvent() + void ShardReplicas::sendExternalTablesData(std::vector & data) { - Poco::Net::Socket::SocketList write_list; - Poco::Net::Socket::SocketList except_list; + if (sent_query) + throw Exception("Cannot send external tables data: query already sent."); - Poco::Net::Socket::SocketList read_list; - read_list.reserve(replica_hash.size()); + if (data.size() < active_connection_count) + throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); + + auto it = data.begin(); + for (auto & e : replica_hash) + { + Connection * connection = e.second; + if (connection != nullptr) + connection->sendExternalTablesData(*it); + ++it; + } + } + + void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) + { + if (sent_query) + throw Exception("Query already sent."); + + Settings query_settings = settings; + query_settings.parallel_replicas_count = replica_hash.size(); + UInt64 offset = 0; for (auto & e : replica_hash) { Connection * connection = e.second; - read_list.push_back(connection->socket); + if (connection != nullptr) + { + connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); + query_settings.parallel_replica_offset = offset; + ++offset; + } + } + + sent_query = true; + } + + Connection::Packet ShardReplicas::receivePacket() + { + if (!sent_query) + throw Exception("Cannot receive packets: no query sent."); + + Connection ** connection = waitForReadEvent(); + if (connection == nullptr) + throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); + + Connection::Packet packet = (*connection)->receivePacket(); + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + break; + + case Protocol::Server::EndOfStream: + *connection = nullptr; + --active_connection_count; + if (active_connection_count > 0) + { + Connection::Packet empty_packet; + empty_packet.type = Protocol::Server::Data; + return empty_packet; + } + break; + + case Protocol::Server::Exception: + default: + *connection = nullptr; + --active_connection_count; + if (!cancelled) + { + sendCancel(); + (void) drain(); + } + break; + } + + return packet; + } + + void ShardReplicas::disconnect() + { + for (auto & e : replica_hash) + { + Connection * & connection = e.second; + if (connection != nullptr) + { + connection->disconnect(); + connection = nullptr; + --active_connection_count; + } + } + } + + void ShardReplicas::sendCancel() + { + if (!sent_query || cancelled) + throw Exception("Cannot cancel. Either no query sent or already cancelled."); + + for (auto & e : replica_hash) + { + Connection * connection = e.second; + if (connection != nullptr) + connection->sendCancel(); + } + + cancelled = true; + } + + Connection::Packet ShardReplicas::drain() + { + if (!cancelled) + throw Exception("Cannot drain connections: cancel first."); + + Connection::Packet res; + res.type = Protocol::Server::EndOfStream; + + while (active_connection_count > 0) + { + Connection::Packet packet = receivePacket(); + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + break; + + case Protocol::Server::EndOfStream: + return res; + + case Protocol::Server::Exception: + default: + res = packet; + break; + } + } + + return res; + } + + std::string ShardReplicas::dumpAddresses() const + { + std::ostringstream os; + for (auto & e : replica_hash) + { + char prefix = '\0'; + const Connection * connection = e.second; + if (connection != nullptr) + { + os << prefix << connection->getServerAddress(); + if (prefix == '\0') + prefix = ';'; + } + } + + return os.str(); + } + + Connection ** ShardReplicas::waitForReadEvent() + { + Poco::Net::Socket::SocketList read_list; + Poco::Net::Socket::SocketList write_list; + Poco::Net::Socket::SocketList except_list; + + read_list.reserve(active_connection_count); + + for (auto & e : replica_hash) + { + Connection * connection = e.second; + if (connection != nullptr) + read_list.push_back(connection->socket); } int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000); @@ -36,125 +206,6 @@ namespace DB auto it = replica_hash.find(socket.impl()->sockfd()); if (it == replica_hash.end()) throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA); - return it->second; - } - - Connection::Packet ShardReplicas::receivePacket() - { - Connection * connection = waitForReadEvent(); - if (connection == nullptr) - throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA); - - Connection::Packet packet = connection->receivePacket(); - return packet; - } - - void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) - { - Settings query_settings = settings; - query_settings.parallel_replicas_count = replica_hash.size(); - UInt64 offset = 0; - - for (auto & e : replica_hash) - { - Connection * connection = e.second; - connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data); - query_settings.parallel_replica_offset = offset; - ++offset; - } - } - - void ShardReplicas::disconnect() - { - for (auto & e : replica_hash) - { - Connection * connection = e.second; - connection->disconnect(); - } - } - - void ShardReplicas::sendCancel() - { - for (auto & e : replica_hash) - { - Connection * connection = e.second; - connection->sendCancel(); - } - } - - void ShardReplicas::drainResidualPackets() - { - bool caught_exceptions = false; - - for (auto & e : replica_hash) - { - Connection * connection = e.second; - bool again = true; - - while (again) - { - Connection::Packet packet = connection->receivePacket(); - - switch (packet.type) - { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - break; - - case Protocol::Server::EndOfStream: - again = false; - continue; - - case Protocol::Server::Exception: - // XXX Что делать? - caught_exceptions = true; - again = false; - continue; - - default: - // XXX Что делать? - caught_exceptions = true; - again = false; - continue; - } - } - } - - if (caught_exceptions) - { - // XXX Что выкидываем? - } - } - - std::string ShardReplicas::dumpAddresses() const - { - std::ostringstream os; - for (auto & e : replica_hash) - { - char prefix = '\0'; - const Connection * connection = e.second; - os << prefix << connection->getServerAddress(); - if (prefix == '\0') - prefix = ';'; - } - - return os.str(); - } - - void ShardReplicas::sendExternalTablesData(std::vector & data) - { - if (data.size() != replica_hash.size()) - throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); - - auto it = data.begin(); - for (auto & e : replica_hash) - { - Connection * connection = e.second; - connection->sendExternalTablesData(*it); - ++it; - } + return &(it->second); } }