diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index 450ab5f46e3..2667cdd344d 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -133,6 +133,9 @@ public: size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; } size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; } + /// Ждать изменение статуса нескольких соединений. Возвращает соединение готовое к чтению. + static Connection * waitForReadEvent(const std::vector & connections, size_t timeout_microseconds = 0); + private: String host; UInt16 port; diff --git a/dbms/include/DB/Client/ConnectionPool.h b/dbms/include/DB/Client/ConnectionPool.h index aece5616f36..6e0acd1354c 100644 --- a/dbms/include/DB/Client/ConnectionPool.h +++ b/dbms/include/DB/Client/ConnectionPool.h @@ -28,7 +28,7 @@ public: typedef PoolBase::Entry Entry; virtual Entry get(Settings * settings = nullptr) = 0; - virtual std::vector getMany(unsigned max_connections, Settings * settings = nullptr) + virtual std::vector getMany(Settings * settings = nullptr) { return std::vector{ get(settings) }; } diff --git a/dbms/include/DB/Client/ConnectionPoolWithFailover.h b/dbms/include/DB/Client/ConnectionPoolWithFailover.h index d468e10f5a9..04c7ed296f1 100644 --- a/dbms/include/DB/Client/ConnectionPoolWithFailover.h +++ b/dbms/include/DB/Client/ConnectionPoolWithFailover.h @@ -57,11 +57,13 @@ public: return Base::get(settings); } - /** Выделяет до указанного количества соединений. */ - std::vector getMany(unsigned max_connections, Settings * settings = nullptr) override + /** Выделяет до указанного количества соединений для работы. + * Соединения предоставляют доступ к разным репликам одного шарда. + */ + std::vector getMany(Settings * settings = nullptr) override { applyLoadBalancing(settings); - return Base::getMany(max_connections, settings); + return Base::getMany(settings); } protected: diff --git a/dbms/include/DB/IO/ReadBuffer.h b/dbms/include/DB/IO/ReadBuffer.h index f1429ac80ab..7b22863e715 100644 --- a/dbms/include/DB/IO/ReadBuffer.h +++ b/dbms/include/DB/IO/ReadBuffer.h @@ -73,6 +73,11 @@ public: return pos == working_buffer.end() && !next(); } + inline bool hasPendingBytes() const + { + return pos != working_buffer.end(); + } + void ignore() { if (!eof()) diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index 3f14786085d..fff521bf5ff 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -80,6 +80,26 @@ void Connection::disconnect() connected = false; } +Connection * Connection::waitForReadEvent(const std::vector & connections, size_t timeout_microseconds) +{ + for (auto & connection : connections) + { + const auto & buffer = static_cast(*(connection->in)); + if (buffer.hasPendingBytes()) + return connection; + } + + Poco::Net::Socket::SocketList readList(connections.size()); + Poco::Net::Socket::SocketList writeList; + Poco::Net::Socket::SocketList exceptList; + + std::transform(connections.begin(), connections.end(), readList.begin(), [](Connection * conn){ return conn->socket; }); + int n = Poco::Net::Socket::select(readList, writeList, exceptList, Poco::Timespan(timeout_microseconds)); + if (n > 0) + return connections[0]; + else + return nullptr; +} void Connection::sendHello() {