diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index d6f073912ae..5c9a5bfce49 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -26,6 +26,8 @@ namespace DB using Poco::SharedPtr; +class ReplicasConnections; + /// Поток блоков читающих из таблицы и ее имя typedef std::pair ExternalTableData; /// Вектор пар, описывающих таблицы @@ -40,6 +42,8 @@ typedef std::vector ExternalTablesData; */ class Connection : private boost::noncopyable { + friend class ReplicasConnections; + public: Connection(const String & host_, UInt16 port_, const String & default_database_, const String & user_, const String & password_, @@ -130,9 +134,6 @@ public: size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; } size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; } - /// Ждать изменение статуса нескольких соединений. Возвращает соединение готовое к чтению. - static Connection * waitForReadEvent(const std::vector & connections, size_t timeout_microseconds = 0); - private: String host; UInt16 port; diff --git a/dbms/include/DB/Client/ReplicasConnections.h b/dbms/include/DB/Client/ReplicasConnections.h new file mode 100644 index 00000000000..e808756681e --- /dev/null +++ b/dbms/include/DB/Client/ReplicasConnections.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include + +namespace DB +{ + class ReplicasConnections + { + public: + struct ConnectionInfo + { + ConnectionInfo(Connection * connection_) : connection(connection_) {} + Connection * connection; + int packet_number = 0; + bool can_read = false; + bool is_valid = true; + }; + + public: + ReplicasConnections(IConnectionPool * pool_, Settings * settings_, size_t timeout_microseconds_ = 0); + + ~ReplicasConnections() = default; + + ReplicasConnections(const ReplicasConnections &) = delete; + + ReplicasConnections & operator=(const ReplicasConnections &) = delete; + + int waitForReadEvent(); + + ConnectionInfo & pickConnection(); + + Connection::Packet receivePacket(); + + void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete, + const Settings * settings_ = nullptr, bool with_pending_data = false); + + private: + using Connections = std::map; + + private: + IConnectionPool * pool; + Settings * settings; + size_t timeout_microseconds; + Poco::Net::Socket::SocketList write_list; + Poco::Net::Socket::SocketList except_list; + Connections connections; + int next_packet_number = 0; + }; +} diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index d31f1e68ea7..d161b8aa170 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -8,6 +8,7 @@ #include #include +#include namespace DB @@ -24,10 +25,12 @@ private: { send_settings = true; settings = *settings_; + use_many_replicas = (pool != nullptr) && UInt64(settings.max_parallel_replicas) > 1; } else send_settings = false; } + public: /// Принимает готовое соединение. RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, @@ -116,28 +119,45 @@ protected: else res.push_back(std::make_pair(input[0], table.first)); } - connection->sendExternalTablesData(res); + if (use_many_replicas) + { + /// XXX Какой из этих вариантов правильный? + /// 1. Выбрать одно соединение, например connection[0], и к нему применить sendExternalTablesData(res)? + /// 2. Отправить res по всем соединениям? <- this one!!! + //replicas_connections->sendExternalTablesData(res); + } + else + connection->sendExternalTablesData(res); } Block readImpl() override { if (!sent_query) { - /// Если надо - достаём соединение из пула. - if (pool) + if (use_many_replicas) { - pool_entry = pool->get(send_settings ? &settings : nullptr); - connection = &*pool_entry; + replicas_connections.reset(new ReplicasConnections(pool, &settings)); + replicas_connections->sendQuery(query, "", stage, &settings, true); + } + else + { + /// Если надо - достаём соединение из пула. + if (pool) + { + pool_entry = pool->get(send_settings ? &settings : nullptr); + connection = &*pool_entry; + } + + connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true); } - connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true); sendExternalTables(); sent_query = true; } while (true) { - Connection::Packet packet = connection->receivePacket(); + Connection::Packet packet = use_many_replicas ? replicas_connections->receivePacket() : connection->receivePacket(); switch (packet.type) { @@ -247,6 +267,8 @@ private: ConnectionPool::Entry pool_entry; Connection * connection = nullptr; + std::unique_ptr replicas_connections; + const String query; bool send_settings; Settings settings; @@ -255,6 +277,8 @@ private: QueryProcessingStage::Enum stage; Context context; + bool use_many_replicas = false; + /// Отправили запрос (это делается перед получением первого блока). bool sent_query = false; diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index 7b22863e715..f1429ac80ab 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -73,11 +73,6 @@ public: return pos == working_buffer.end() && !next(); } - inline bool hasPendingBytes() const - { - return pos != working_buffer.end(); - } - void ignore() { if (!eof()) diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index fff521bf5ff..3f14786085d 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -80,26 +80,6 @@ void Connection::disconnect() connected = false; } -Connection * Connection::waitForReadEvent(const std::vector & connections, size_t timeout_microseconds) -{ - for (auto & connection : connections) - { - const auto & buffer = static_cast(*(connection->in)); - if (buffer.hasPendingBytes()) - return connection; - } - - Poco::Net::Socket::SocketList readList(connections.size()); - Poco::Net::Socket::SocketList writeList; - Poco::Net::Socket::SocketList exceptList; - - std::transform(connections.begin(), connections.end(), readList.begin(), [](Connection * conn){ return conn->socket; }); - int n = Poco::Net::Socket::select(readList, writeList, exceptList, Poco::Timespan(timeout_microseconds)); - if (n > 0) - return connections[0]; - else - return nullptr; -} void Connection::sendHello() { diff --git a/dbms/src/Client/ReplicasConnections.cpp b/dbms/src/Client/ReplicasConnections.cpp new file mode 100644 index 00000000000..fad763bea31 --- /dev/null +++ b/dbms/src/Client/ReplicasConnections.cpp @@ -0,0 +1,115 @@ +#include + +namespace DB +{ + ReplicasConnections::ReplicasConnections(IConnectionPool * pool_, Settings * settings_, size_t timeout_microseconds_) : + pool(pool_), + settings(settings_), + timeout_microseconds(timeout_microseconds_) + { + auto entries = pool->getMany(settings); + for (auto & entry : entries) + { + Connection * connection = &*entry; + connections.insert(std::make_pair(connection->socket, ConnectionInfo(connection))); + } + } + + int ReplicasConnections::waitForReadEvent() + { + Poco::Net::Socket::SocketList read_list(connections.size()); + + auto it = read_list.begin(); + for (auto & e : connections) + { + ConnectionInfo & info = e.second; + info.can_read = false; + if (info.is_valid) + { + *it = e.first; + ++it; + } + } + + if (read_list.empty()) + return 0; + + int n = Poco::Net::Socket::select(read_list, write_list, except_list, Poco::Timespan(timeout_microseconds)); + + for (const auto & socket : read_list) + { + auto place = connections.find(socket); + ConnectionInfo & info = place->second; + info.can_read = true; + } + + return n; + } + + ReplicasConnections::ConnectionInfo & ReplicasConnections::pickConnection() + { + int n = waitForReadEvent(); + if (n == 0) + throw Exception("", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); + + int max_packet_number = -1; + ConnectionInfo * res = nullptr; + + for (auto & e : connections) { + ConnectionInfo & info = e.second; + if (info.can_read && + (info.packet_number > max_packet_number)) { + max_packet_number = info.packet_number; + res = &info; + } + } + if (res == nullptr) + throw Exception("", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); + + return *res; + } + + Connection::Packet ReplicasConnections::receivePacket() + { + while (true) + { + ConnectionInfo & info = pickConnection(); + + while (info.is_valid) + { + Connection::Packet packet = info.connection->receivePacket(); + + if (info.packet_number == next_packet_number) + { + ++next_packet_number; + return packet; + } + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + ++info.packet_number; + break; + + default: + info.is_valid = false; + break; + } + } + } + } + + void ReplicasConnections::sendQuery(const String & query, const String & query_id, UInt64 stage, + const Settings * settings_, bool with_pending_data) + { + for (auto & e : connections) + { + Connection * connection = e.second.connection; + connection->sendQuery(query, query_id, stage, settings_, with_pending_data); + } + } +}