mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-23 10:10:50 +00:00
dbms: Server: add improvements and the ability of waiting for a read event on several connections. [#METR-14410]
This commit is contained in:
parent
5513fe2b7a
commit
148ea7e610
@ -130,6 +130,9 @@ public:
|
|||||||
size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; }
|
size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; }
|
||||||
size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; }
|
size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; }
|
||||||
|
|
||||||
|
/// Ждать изменение статуса нескольких соединений. Возвращает соединение готовое к чтению.
|
||||||
|
static Connection * waitForReadEvent(const std::vector<Connection *> & connections, size_t timeout_microseconds = 0);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
String host;
|
String host;
|
||||||
UInt16 port;
|
UInt16 port;
|
||||||
|
@ -28,7 +28,7 @@ public:
|
|||||||
typedef PoolBase<Connection>::Entry Entry;
|
typedef PoolBase<Connection>::Entry Entry;
|
||||||
virtual Entry get(Settings * settings = nullptr) = 0;
|
virtual Entry get(Settings * settings = nullptr) = 0;
|
||||||
|
|
||||||
virtual std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr)
|
virtual std::vector<Entry> getMany(Settings * settings = nullptr)
|
||||||
{
|
{
|
||||||
return std::vector<Entry>{ get(settings) };
|
return std::vector<Entry>{ get(settings) };
|
||||||
}
|
}
|
||||||
|
@ -57,11 +57,13 @@ public:
|
|||||||
return Base::get(settings);
|
return Base::get(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Выделяет до указанного количества соединений. */
|
/** Выделяет до указанного количества соединений для работы.
|
||||||
std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr) override
|
* Соединения предоставляют доступ к разным репликам одного шарда.
|
||||||
|
*/
|
||||||
|
std::vector<Entry> getMany(Settings * settings = nullptr) override
|
||||||
{
|
{
|
||||||
applyLoadBalancing(settings);
|
applyLoadBalancing(settings);
|
||||||
return Base::getMany(max_connections, settings);
|
return Base::getMany(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
@ -73,6 +73,11 @@ public:
|
|||||||
return pos == working_buffer.end() && !next();
|
return pos == working_buffer.end() && !next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline bool hasPendingBytes() const
|
||||||
|
{
|
||||||
|
return pos != working_buffer.end();
|
||||||
|
}
|
||||||
|
|
||||||
void ignore()
|
void ignore()
|
||||||
{
|
{
|
||||||
if (!eof())
|
if (!eof())
|
||||||
|
@ -80,6 +80,26 @@ void Connection::disconnect()
|
|||||||
connected = false;
|
connected = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Connection * Connection::waitForReadEvent(const std::vector<Connection *> & connections, size_t timeout_microseconds)
|
||||||
|
{
|
||||||
|
for (auto & connection : connections)
|
||||||
|
{
|
||||||
|
const auto & buffer = static_cast<ReadBufferFromPocoSocket &>(*(connection->in));
|
||||||
|
if (buffer.hasPendingBytes())
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
Poco::Net::Socket::SocketList readList(connections.size());
|
||||||
|
Poco::Net::Socket::SocketList writeList;
|
||||||
|
Poco::Net::Socket::SocketList exceptList;
|
||||||
|
|
||||||
|
std::transform(connections.begin(), connections.end(), readList.begin(), [](Connection * conn){ return conn->socket; });
|
||||||
|
int n = Poco::Net::Socket::select(readList, writeList, exceptList, Poco::Timespan(timeout_microseconds));
|
||||||
|
if (n > 0)
|
||||||
|
return connections[0];
|
||||||
|
else
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
void Connection::sendHello()
|
void Connection::sendHello()
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user