dbms: Server: add improvements and the ability of waiting for a read event on several connections. [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-12 15:08:11 +03:00
parent 2359d02472
commit 537c2d0040
5 changed files with 34 additions and 4 deletions

View File

@ -134,6 +134,9 @@ public:
size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; }
size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; }
/// Ждать изменение статуса нескольких соединений. Возвращает соединение готовое к чтению.
static Connection * waitForReadEvent(const std::vector<Connection *> & connections, size_t timeout_microseconds = 0);
private:
String host;
UInt16 port;

View File

@ -28,7 +28,7 @@ public:
typedef PoolBase<Connection>::Entry Entry;
virtual Entry get(Settings * settings = nullptr) = 0;
virtual std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr)
virtual std::vector<Entry> getMany(Settings * settings = nullptr)
{
return std::vector<Entry>{ get(settings) };
}

View File

@ -57,11 +57,13 @@ public:
return Base::get(settings);
}
/** Выделяет до указанного количества соединений. */
std::vector<Entry> getMany(unsigned max_connections, Settings * settings = nullptr) override
/** Выделяет до указанного количества соединений для работы.
* Соединения предоставляют доступ к разным репликам одного шарда.
*/
std::vector<Entry> getMany(Settings * settings = nullptr) override
{
applyLoadBalancing(settings);
return Base::getMany(max_connections, settings);
return Base::getMany(settings);
}
protected:

View File

@ -73,6 +73,11 @@ public:
return pos == working_buffer.end() && !next();
}
inline bool hasPendingBytes() const
{
return pos != working_buffer.end();
}
void ignore()
{
if (!eof())

View File

@ -80,6 +80,26 @@ void Connection::disconnect()
connected = false;
}
Connection * Connection::waitForReadEvent(const std::vector<Connection *> & connections, size_t timeout_microseconds)
{
for (auto & connection : connections)
{
const auto & buffer = static_cast<ReadBufferFromPocoSocket &>(*(connection->in));
if (buffer.hasPendingBytes())
return connection;
}
Poco::Net::Socket::SocketList readList(connections.size());
Poco::Net::Socket::SocketList writeList;
Poco::Net::Socket::SocketList exceptList;
std::transform(connections.begin(), connections.end(), readList.begin(), [](Connection * conn){ return conn->socket; });
int n = Poco::Net::Socket::select(readList, writeList, exceptList, Poco::Timespan(timeout_microseconds));
if (n > 0)
return connections[0];
else
return nullptr;
}
void Connection::sendHello()
{