diff --git a/dbms/include/DB/Client/ReplicasConnections.h b/dbms/include/DB/Client/ReplicasConnections.h index d5cfe3b0aa7..b27a287ad00 100644 --- a/dbms/include/DB/Client/ReplicasConnections.h +++ b/dbms/include/DB/Client/ReplicasConnections.h @@ -5,59 +5,84 @@ namespace DB { class IConnectionPool; - - class ReplicasConnections - { - public: - struct ConnectionInfo - { - ConnectionInfo(Connection * connection_) : connection(connection_) {} - Connection * connection; - int packet_number = 0; - bool can_read = false; - bool is_valid = true; - }; + /** + * Множество реплик одного шарда. + */ + class ReplicasConnections final + { public: ReplicasConnections(IConnectionPool * pool_, Settings * settings_); ~ReplicasConnections() = default; ReplicasConnections(const ReplicasConnections &) = delete; - ReplicasConnections & operator=(const ReplicasConnections &) = delete; - int waitForReadEvent(); - - ConnectionInfo & pickConnection(); - + /// Получить пакет от какой-нибудь реплики. Connection::Packet receivePacket(); + /// Отправить запрос ко всем репликам. void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete, const Settings * settings_ = nullptr, bool with_pending_data = false); + /// Разорвать соединения ко всем репликам void disconnect(); + /// Отменить запросы у всех реплик void sendCancel(); + /// Для каждой реплики получить оставшиеся пакеты при отмене запроса. void drainResidualPackets(); + /// Получить адреса всех реплик в виде строки. std::string dumpAddresses() const; - size_t size() const; + /// Возвращает количесто реплик. + size_t size() const + { + return replica_hash.size(); + } + /// Отправить ко всем репликам всё содержимое внешних таблиц. void sendExternalTablesData(std::vector & data); private: - using ConnectionHash = std::unordered_map; + /// Описание реплики. + struct Replica + { + Replica(Connection * connection_) : connection(connection_) {} + + /// Соединение к реплике + Connection * connection; + + /// Номер следующего ожиданного пакета. + int next_packet_number = 0; + + /// Есть ли данные, которые можно прочитать? + bool can_read = false; + + /// Является ли реплика валидной для чтения? + bool is_valid = true; + }; + + /// Реплики хэшированные по id сокета + using ReplicaHash = std::unordered_map; + + private: + /// Выбрать реплику, на которой можно прочитать данные. + Replica & pickConnection(); + + /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. + int waitForReadEvent(); private: Settings * settings; - Poco::Timespan select_timeout; - ConnectionHash connection_hash; - size_t valid_connections_count; + + ReplicaHash replica_hash; + size_t valid_replicas_count; + + /// Номер следующего ожиданного пакета. int next_packet_number = 0; - Poco::Net::Socket::SocketList write_list; - Poco::Net::Socket::SocketList except_list; }; } diff --git a/dbms/src/Client/ReplicasConnections.cpp b/dbms/src/Client/ReplicasConnections.cpp index 425646a713a..77bfa8c907e 100644 --- a/dbms/src/Client/ReplicasConnections.cpp +++ b/dbms/src/Client/ReplicasConnections.cpp @@ -4,65 +4,67 @@ namespace DB { ReplicasConnections::ReplicasConnections(IConnectionPool * pool_, Settings * settings_) : - settings(settings_), - select_timeout(settings->poll_interval * 1000000) + settings(settings_) { auto entries = pool_->getMany(settings); - valid_connections_count = entries.size(); - connection_hash.reserve(valid_connections_count); + valid_replicas_count = entries.size(); + replica_hash.reserve(valid_replicas_count); for (auto & entry : entries) { Connection * connection = &*entry; - connection_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), ConnectionInfo(connection))); + replica_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), Replica(connection))); } } int ReplicasConnections::waitForReadEvent() { - if (valid_connections_count == 0) + if (valid_replicas_count == 0) return 0; - Poco::Net::Socket::SocketList read_list; - read_list.reserve(valid_connections_count); + Poco::Net::Socket::SocketList write_list; + Poco::Net::Socket::SocketList except_list; - for (auto & e : connection_hash) + Poco::Net::Socket::SocketList read_list; + read_list.reserve(valid_replicas_count); + + for (auto & e : replica_hash) { - ConnectionInfo & info = e.second; - info.can_read = false; - if (info.is_valid) - read_list.push_back(info.connection->socket); + Replica & replica = e.second; + replica.can_read = false; + if (replica.is_valid) + read_list.push_back(replica.connection->socket); } - int n = Poco::Net::Socket::select(read_list, write_list, except_list, select_timeout); + int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000); for (const auto & socket : read_list) { - auto it = connection_hash.find(socket.impl()->sockfd()); - if (it == connection_hash.end()) + auto it = replica_hash.find(socket.impl()->sockfd()); + if (it == replica_hash.end()) throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA); - ConnectionInfo & info = it->second; - info.can_read = true; + Replica & replica = it->second; + replica.can_read = true; } return n; } - ReplicasConnections::ConnectionInfo & ReplicasConnections::pickConnection() + ReplicasConnections::Replica & ReplicasConnections::pickConnection() { - ConnectionInfo * res = nullptr; + Replica * res = nullptr; int n = waitForReadEvent(); if (n > 0) { int max_packet_number = -1; - for (auto & e : connection_hash) + for (auto & e : replica_hash) { - ConnectionInfo & info = e.second; - if (info.can_read && (info.packet_number > max_packet_number)) + Replica & replica = e.second; + if (replica.can_read && (replica.next_packet_number > max_packet_number)) { - max_packet_number = info.packet_number; - res = &info; + max_packet_number = replica.next_packet_number; + res = &replica; } } } @@ -77,12 +79,12 @@ namespace DB { while (true) { - ConnectionInfo & info = pickConnection(); + Replica & replica = pickConnection(); bool retry = false; - while (info.is_valid) + while (replica.is_valid) { - Connection::Packet packet = info.connection->receivePacket(); + Connection::Packet packet = replica.connection->receivePacket(); switch (packet.type) { @@ -95,9 +97,9 @@ namespace DB case Protocol::Server::EndOfStream: case Protocol::Server::Exception: - info.is_valid = false; - --valid_connections_count; - /// Больше ничего не читаем. Закрываем все оставшиеся валидные соединения, + replica.is_valid = false; + --valid_replicas_count; + /// Больше ничего не читаем. Отменяем выполнение всех оставшихся запросов, /// затем получаем оставшиеся пакеты, чтобы не было рассинхронизации с /// репликами. sendCancel(); @@ -107,22 +109,22 @@ namespace DB default: /// Мы получили инвалидный пакет от реплики. Повторим попытку /// c другой реплики, если такая найдется. - info.is_valid = false; - --valid_connections_count; - if (valid_connections_count > 0) + replica.is_valid = false; + --valid_replicas_count; + if (valid_replicas_count > 0) retry = true; break; } - if ((info.packet_number == next_packet_number) && !retry) + if ((replica.next_packet_number == next_packet_number) && !retry) { - ++info.packet_number; + ++replica.next_packet_number; ++next_packet_number; return packet; } else { - ++info.packet_number; + ++replica.next_packet_number; retry = false; } } @@ -132,7 +134,7 @@ namespace DB void ReplicasConnections::sendQuery(const String & query, const String & query_id, UInt64 stage, const Settings * settings_, bool with_pending_data) { - for (auto & e : connection_hash) + for (auto & e : replica_hash) { Connection * connection = e.second.connection; connection->sendQuery(query, query_id, stage, settings_, with_pending_data); @@ -141,12 +143,12 @@ namespace DB void ReplicasConnections::disconnect() { - for (auto & e : connection_hash) + for (auto & e : replica_hash) { - ConnectionInfo & info = e.second; - if (info.is_valid) + Replica & replica = e.second; + if (replica.is_valid) { - Connection * connection = info.connection; + Connection * connection = replica.connection; connection->disconnect(); } } @@ -154,12 +156,12 @@ namespace DB void ReplicasConnections::sendCancel() { - for (auto & e : connection_hash) + for (auto & e : replica_hash) { - ConnectionInfo & info = e.second; - if (info.is_valid) + Replica & replica = e.second; + if (replica.is_valid) { - Connection * connection = info.connection; + Connection * connection = replica.connection; connection->sendCancel(); } } @@ -169,12 +171,12 @@ namespace DB { bool caught_exceptions = false; - for (auto & e : connection_hash) + for (auto & e : replica_hash) { - ConnectionInfo & info = e.second; - if (info.is_valid) + Replica & replica = e.second; + if (replica.is_valid) { - Connection * connection = info.connection; + Connection * connection = replica.connection; bool again = true; while (again) @@ -195,13 +197,13 @@ namespace DB continue; case Protocol::Server::Exception: - // Accumulate info from packet.exception + // Accumulate replica from packet.exception caught_exceptions = true; again = false; continue; default: - // Accumulate info (server address) + // Accumulate replica (server address) caught_exceptions = true; again = false; continue; @@ -212,22 +214,23 @@ namespace DB if (caught_exceptions) { + // XXX Что выкидываем? } } std::string ReplicasConnections::dumpAddresses() const { - if (valid_connections_count == 0) + if (valid_replicas_count == 0) return ""; std::ostringstream os; - for (auto & e : connection_hash) + for (auto & e : replica_hash) { char prefix = '\0'; - const ConnectionInfo & info = e.second; - if (info.is_valid) + const Replica & replica = e.second; + if (replica.is_valid) { - const Connection * connection = info.connection; + const Connection * connection = replica.connection; os << prefix << connection->getServerAddress(); if (prefix == '\0') prefix = ';'; @@ -237,18 +240,13 @@ namespace DB return os.str(); } - size_t ReplicasConnections::size() const - { - return connection_hash.size(); - } - void ReplicasConnections::sendExternalTablesData(std::vector & data) { - if (data.size() != connection_hash.size()) + if (data.size() != replica_hash.size()) throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); auto it = data.begin(); - for (auto & e : connection_hash) + for (auto & e : replica_hash) { Connection * connection = e.second.connection; connection->sendExternalTablesData(*it);