diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index 975a123a64d..ec691394539 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -26,7 +26,7 @@ namespace DB using Poco::SharedPtr; -class ReplicasConnections; +class ShardReplicas; /// Поток блоков читающих из таблицы и ее имя typedef std::pair ExternalTableData; @@ -42,7 +42,7 @@ typedef std::vector ExternalTablesData; */ class Connection : private boost::noncopyable { - friend class ReplicasConnections; + friend class ShardReplicas; public: Connection(const String & host_, UInt16 port_, const String & default_database_, diff --git a/dbms/include/DB/Client/ReplicasConnections.h b/dbms/include/DB/Client/ShardReplicas.h similarity index 89% rename from dbms/include/DB/Client/ReplicasConnections.h rename to dbms/include/DB/Client/ShardReplicas.h index b27a287ad00..cb60c087c26 100644 --- a/dbms/include/DB/Client/ReplicasConnections.h +++ b/dbms/include/DB/Client/ShardReplicas.h @@ -9,15 +9,15 @@ namespace DB /** * Множество реплик одного шарда. */ - class ReplicasConnections final + class ShardReplicas final { public: - ReplicasConnections(IConnectionPool * pool_, Settings * settings_); + ShardReplicas(IConnectionPool * pool_, Settings * settings_); - ~ReplicasConnections() = default; + ~ShardReplicas() = default; - ReplicasConnections(const ReplicasConnections &) = delete; - ReplicasConnections & operator=(const ReplicasConnections &) = delete; + ShardReplicas(const ShardReplicas &) = delete; + ShardReplicas & operator=(const ShardReplicas &) = delete; /// Получить пакет от какой-нибудь реплики. Connection::Packet receivePacket(); @@ -71,7 +71,7 @@ namespace DB private: /// Выбрать реплику, на которой можно прочитать данные. - Replica & pickConnection(); + Replica & pickReplica(); /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. int waitForReadEvent(); diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index cea246d923c..a04af5e11bb 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -8,7 +8,7 @@ #include #include -#include +#include namespace DB @@ -87,7 +87,7 @@ public: { std::string addresses; if (use_many_replicas) - addresses = replicas_connections->dumpAddresses(); + addresses = shard_replicas->dumpAddresses(); else addresses = connection->getServerAddress(); @@ -95,7 +95,7 @@ public: /// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос. if (use_many_replicas) - replicas_connections->sendCancel(); + shard_replicas->sendCancel(); else connection->sendCancel(); @@ -112,7 +112,7 @@ public: if (sent_query && !finished) { if (use_many_replicas) - replicas_connections->disconnect(); + shard_replicas->disconnect(); else connection->disconnect(); } @@ -122,7 +122,7 @@ protected: /// Отправить на удаленные сервера все временные таблицы void sendExternalTables() { - size_t count = use_many_replicas ? replicas_connections->size() : 1; + size_t count = use_many_replicas ? shard_replicas->size() : 1; std::vector instances; instances.reserve(count); @@ -145,7 +145,7 @@ protected: } if (use_many_replicas) - replicas_connections->sendExternalTablesData(instances); + shard_replicas->sendExternalTablesData(instances); else connection->sendExternalTablesData(instances[0]); } @@ -156,8 +156,8 @@ protected: { if (use_many_replicas) { - replicas_connections.reset(new ReplicasConnections(pool, &settings)); - replicas_connections->sendQuery(query, "", stage, &settings, true); + shard_replicas.reset(new ShardReplicas(pool, &settings)); + shard_replicas->sendQuery(query, "", stage, &settings, true); } else { @@ -177,7 +177,7 @@ protected: while (true) { - Connection::Packet packet = use_many_replicas ? replicas_connections->receivePacket() : connection->receivePacket(); + Connection::Packet packet = use_many_replicas ? shard_replicas->receivePacket() : connection->receivePacket(); switch (packet.type) { @@ -248,7 +248,7 @@ protected: { std::string addresses; if (use_many_replicas) - addresses = replicas_connections->dumpAddresses(); + addresses = shard_replicas->dumpAddresses(); else addresses = connection->getServerAddress(); @@ -257,13 +257,13 @@ protected: was_cancelled = true; if (use_many_replicas) - replicas_connections->sendCancel(); + shard_replicas->sendCancel(); else connection->sendCancel(); } if (use_many_replicas) - replicas_connections->drainResidualPackets(); + shard_replicas->drainResidualPackets(); else { /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером. @@ -302,7 +302,7 @@ private: ConnectionPool::Entry pool_entry; Connection * connection = nullptr; - std::unique_ptr replicas_connections; + std::unique_ptr shard_replicas; const String query; bool send_settings; diff --git a/dbms/src/Client/ReplicasConnections.cpp b/dbms/src/Client/ShardReplicas.cpp similarity index 87% rename from dbms/src/Client/ReplicasConnections.cpp rename to dbms/src/Client/ShardReplicas.cpp index 77bfa8c907e..75949a63e11 100644 --- a/dbms/src/Client/ReplicasConnections.cpp +++ b/dbms/src/Client/ShardReplicas.cpp @@ -1,9 +1,9 @@ -#include +#include #include namespace DB { - ReplicasConnections::ReplicasConnections(IConnectionPool * pool_, Settings * settings_) : + ShardReplicas::ShardReplicas(IConnectionPool * pool_, Settings * settings_) : settings(settings_) { auto entries = pool_->getMany(settings); @@ -17,7 +17,7 @@ namespace DB } } - int ReplicasConnections::waitForReadEvent() + int ShardReplicas::waitForReadEvent() { if (valid_replicas_count == 0) return 0; @@ -50,7 +50,7 @@ namespace DB return n; } - ReplicasConnections::Replica & ReplicasConnections::pickConnection() + ShardReplicas::Replica & ShardReplicas::pickReplica() { Replica * res = nullptr; @@ -75,11 +75,11 @@ namespace DB return *res; } - Connection::Packet ReplicasConnections::receivePacket() + Connection::Packet ShardReplicas::receivePacket() { while (true) { - Replica & replica = pickConnection(); + Replica & replica = pickReplica(); bool retry = false; while (replica.is_valid) @@ -131,7 +131,7 @@ namespace DB } } - void ReplicasConnections::sendQuery(const String & query, const String & query_id, UInt64 stage, + void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, const Settings * settings_, bool with_pending_data) { for (auto & e : replica_hash) @@ -141,7 +141,7 @@ namespace DB } } - void ReplicasConnections::disconnect() + void ShardReplicas::disconnect() { for (auto & e : replica_hash) { @@ -154,7 +154,7 @@ namespace DB } } - void ReplicasConnections::sendCancel() + void ShardReplicas::sendCancel() { for (auto & e : replica_hash) { @@ -167,7 +167,7 @@ namespace DB } } - void ReplicasConnections::drainResidualPackets() + void ShardReplicas::drainResidualPackets() { bool caught_exceptions = false; @@ -197,13 +197,13 @@ namespace DB continue; case Protocol::Server::Exception: - // Accumulate replica from packet.exception + // XXX Что делать? caught_exceptions = true; again = false; continue; default: - // Accumulate replica (server address) + // XXX Что делать? caught_exceptions = true; again = false; continue; @@ -218,7 +218,7 @@ namespace DB } } - std::string ReplicasConnections::dumpAddresses() const + std::string ShardReplicas::dumpAddresses() const { if (valid_replicas_count == 0) return ""; @@ -240,7 +240,7 @@ namespace DB return os.str(); } - void ReplicasConnections::sendExternalTablesData(std::vector & data) + void ShardReplicas::sendExternalTablesData(std::vector & data) { if (data.size() != replica_hash.size()) throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);