mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
dbms: Server: add improvements and the ability of waiting for a read event on several connections. [#METR-14410]
This commit is contained in:
parent
218bfa5316
commit
edbe789d12
@ -133,6 +133,9 @@ public:
|
||||
size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; }
|
||||
size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; }
|
||||
|
||||
/// Ждать изменение статуса нескольких соединений. Возвращает соединение готовое к чтению.
|
||||
static Connection * waitForReadEvent(const std::vector<Connection *> & connections, size_t timeout_microseconds = 0);
|
||||
|
||||
private:
|
||||
String host;
|
||||
UInt16 port;
|
||||
|
@ -28,7 +28,7 @@ public:
|
||||
typedef PoolBase<Connection>::Entry Entry;
|
||||
virtual Entry get(Settings * settings = nullptr) = 0;
|
||||
|
||||
virtual std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr)
|
||||
virtual std::vector<Entry> getMany(Settings * settings = nullptr)
|
||||
{
|
||||
return std::vector<Entry>{ get(settings) };
|
||||
}
|
||||
|
@ -57,11 +57,13 @@ public:
|
||||
return Base::get(settings);
|
||||
}
|
||||
|
||||
/** Выделяет до указанного количества соединений. */
|
||||
std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr) override
|
||||
/** Выделяет до указанного количества соединений для работы.
|
||||
* Соединения предоставляют доступ к разным репликам одного шарда.
|
||||
*/
|
||||
std::vector<Entry> getMany(Settings * settings = nullptr) override
|
||||
{
|
||||
applyLoadBalancing(settings);
|
||||
return Base::getMany(max_connections, settings);
|
||||
return Base::getMany(settings);
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@ -73,6 +73,11 @@ public:
|
||||
return pos == working_buffer.end() && !next();
|
||||
}
|
||||
|
||||
inline bool hasPendingBytes() const
|
||||
{
|
||||
return pos != working_buffer.end();
|
||||
}
|
||||
|
||||
void ignore()
|
||||
{
|
||||
if (!eof())
|
||||
|
@ -80,6 +80,26 @@ void Connection::disconnect()
|
||||
connected = false;
|
||||
}
|
||||
|
||||
Connection * Connection::waitForReadEvent(const std::vector<Connection *> & connections, size_t timeout_microseconds)
|
||||
{
|
||||
for (auto & connection : connections)
|
||||
{
|
||||
const auto & buffer = static_cast<ReadBufferFromPocoSocket &>(*(connection->in));
|
||||
if (buffer.hasPendingBytes())
|
||||
return connection;
|
||||
}
|
||||
|
||||
Poco::Net::Socket::SocketList readList(connections.size());
|
||||
Poco::Net::Socket::SocketList writeList;
|
||||
Poco::Net::Socket::SocketList exceptList;
|
||||
|
||||
std::transform(connections.begin(), connections.end(), readList.begin(), [](Connection * conn){ return conn->socket; });
|
||||
int n = Poco::Net::Socket::select(readList, writeList, exceptList, Poco::Timespan(timeout_microseconds));
|
||||
if (n > 0)
|
||||
return connections[0];
|
||||
else
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void Connection::sendHello()
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user