dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-14 13:06:30 +03:00
parent f026502223
commit e18d33dabf
6 changed files with 200 additions and 35 deletions

View File

@ -26,6 +26,8 @@ namespace DB
using Poco::SharedPtr; using Poco::SharedPtr;
class ReplicasConnections;
/// Поток блоков читающих из таблицы и ее имя /// Поток блоков читающих из таблицы и ее имя
typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData; typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData;
/// Вектор пар, описывающих таблицы /// Вектор пар, описывающих таблицы
@ -40,6 +42,8 @@ typedef std::vector<ExternalTableData> ExternalTablesData;
*/ */
class Connection : private boost::noncopyable class Connection : private boost::noncopyable
{ {
friend class ReplicasConnections;
public: public:
Connection(const String & host_, UInt16 port_, const String & default_database_, Connection(const String & host_, UInt16 port_, const String & default_database_,
const String & user_, const String & password_, const String & user_, const String & password_,
@ -130,9 +134,6 @@ public:
size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; } size_t outBytesCount() const { return !out.isNull() ? out->count() : 0; }
size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; } size_t inBytesCount() const { return !in.isNull() ? in->count() : 0; }
/// Ждать изменение статуса нескольких соединений. Возвращает соединение готовое к чтению.
static Connection * waitForReadEvent(const std::vector<Connection *> & connections, size_t timeout_microseconds = 0);
private: private:
String host; String host;
UInt16 port; UInt16 port;

View File

@ -0,0 +1,50 @@
#pragma once
#include <DB/Client/Connection.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{
class ReplicasConnections
{
public:
struct ConnectionInfo
{
ConnectionInfo(Connection * connection_) : connection(connection_) {}
Connection * connection;
int packet_number = 0;
bool can_read = false;
bool is_valid = true;
};
public:
ReplicasConnections(IConnectionPool * pool_, Settings * settings_, size_t timeout_microseconds_ = 0);
~ReplicasConnections() = default;
ReplicasConnections(const ReplicasConnections &) = delete;
ReplicasConnections & operator=(const ReplicasConnections &) = delete;
int waitForReadEvent();
ConnectionInfo & pickConnection();
Connection::Packet receivePacket();
void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings_ = nullptr, bool with_pending_data = false);
private:
using Connections = std::map<Poco::Net::StreamSocket, ConnectionInfo>;
private:
IConnectionPool * pool;
Settings * settings;
size_t timeout_microseconds;
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
Connections connections;
int next_packet_number = 0;
};
}

View File

@ -8,6 +8,7 @@
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Client/ConnectionPool.h> #include <DB/Client/ConnectionPool.h>
#include <DB/Client/ReplicasConnections.h>
namespace DB namespace DB
@ -24,10 +25,12 @@ private:
{ {
send_settings = true; send_settings = true;
settings = *settings_; settings = *settings_;
use_many_replicas = (pool != nullptr) && UInt64(settings.max_parallel_replicas) > 1;
} }
else else
send_settings = false; send_settings = false;
} }
public: public:
/// Принимает готовое соединение. /// Принимает готовое соединение.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
@ -116,28 +119,45 @@ protected:
else else
res.push_back(std::make_pair(input[0], table.first)); res.push_back(std::make_pair(input[0], table.first));
} }
connection->sendExternalTablesData(res); if (use_many_replicas)
{
/// XXX Какой из этих вариантов правильный?
/// 1. Выбрать одно соединение, например connection[0], и к нему применить sendExternalTablesData(res)?
/// 2. Отправить res по всем соединениям? <- this one!!!
//replicas_connections->sendExternalTablesData(res);
}
else
connection->sendExternalTablesData(res);
} }
Block readImpl() override Block readImpl() override
{ {
if (!sent_query) if (!sent_query)
{ {
/// Если надо - достаём соединение из пула. if (use_many_replicas)
if (pool)
{ {
pool_entry = pool->get(send_settings ? &settings : nullptr); replicas_connections.reset(new ReplicasConnections(pool, &settings));
connection = &*pool_entry; replicas_connections->sendQuery(query, "", stage, &settings, true);
}
else
{
/// Если надо - достаём соединение из пула.
if (pool)
{
pool_entry = pool->get(send_settings ? &settings : nullptr);
connection = &*pool_entry;
}
connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true);
} }
connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true);
sendExternalTables(); sendExternalTables();
sent_query = true; sent_query = true;
} }
while (true) while (true)
{ {
Connection::Packet packet = connection->receivePacket(); Connection::Packet packet = use_many_replicas ? replicas_connections->receivePacket() : connection->receivePacket();
switch (packet.type) switch (packet.type)
{ {
@ -247,6 +267,8 @@ private:
ConnectionPool::Entry pool_entry; ConnectionPool::Entry pool_entry;
Connection * connection = nullptr; Connection * connection = nullptr;
std::unique_ptr<ReplicasConnections> replicas_connections;
const String query; const String query;
bool send_settings; bool send_settings;
Settings settings; Settings settings;
@ -255,6 +277,8 @@ private:
QueryProcessingStage::Enum stage; QueryProcessingStage::Enum stage;
Context context; Context context;
bool use_many_replicas = false;
/// Отправили запрос (это делается перед получением первого блока). /// Отправили запрос (это делается перед получением первого блока).
bool sent_query = false; bool sent_query = false;

View File

@ -73,11 +73,6 @@ public:
return pos == working_buffer.end() && !next(); return pos == working_buffer.end() && !next();
} }
inline bool hasPendingBytes() const
{
return pos != working_buffer.end();
}
void ignore() void ignore()
{ {
if (!eof()) if (!eof())

View File

@ -80,26 +80,6 @@ void Connection::disconnect()
connected = false; connected = false;
} }
Connection * Connection::waitForReadEvent(const std::vector<Connection *> & connections, size_t timeout_microseconds)
{
for (auto & connection : connections)
{
const auto & buffer = static_cast<ReadBufferFromPocoSocket &>(*(connection->in));
if (buffer.hasPendingBytes())
return connection;
}
Poco::Net::Socket::SocketList readList(connections.size());
Poco::Net::Socket::SocketList writeList;
Poco::Net::Socket::SocketList exceptList;
std::transform(connections.begin(), connections.end(), readList.begin(), [](Connection * conn){ return conn->socket; });
int n = Poco::Net::Socket::select(readList, writeList, exceptList, Poco::Timespan(timeout_microseconds));
if (n > 0)
return connections[0];
else
return nullptr;
}
void Connection::sendHello() void Connection::sendHello()
{ {

View File

@ -0,0 +1,115 @@
#include <DB/Client/ReplicasConnections.h>
namespace DB
{
ReplicasConnections::ReplicasConnections(IConnectionPool * pool_, Settings * settings_, size_t timeout_microseconds_) :
pool(pool_),
settings(settings_),
timeout_microseconds(timeout_microseconds_)
{
auto entries = pool->getMany(settings);
for (auto & entry : entries)
{
Connection * connection = &*entry;
connections.insert(std::make_pair(connection->socket, ConnectionInfo(connection)));
}
}
int ReplicasConnections::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list(connections.size());
auto it = read_list.begin();
for (auto & e : connections)
{
ConnectionInfo & info = e.second;
info.can_read = false;
if (info.is_valid)
{
*it = e.first;
++it;
}
}
if (read_list.empty())
return 0;
int n = Poco::Net::Socket::select(read_list, write_list, except_list, Poco::Timespan(timeout_microseconds));
for (const auto & socket : read_list)
{
auto place = connections.find(socket);
ConnectionInfo & info = place->second;
info.can_read = true;
}
return n;
}
ReplicasConnections::ConnectionInfo & ReplicasConnections::pickConnection()
{
int n = waitForReadEvent();
if (n == 0)
throw Exception("", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
int max_packet_number = -1;
ConnectionInfo * res = nullptr;
for (auto & e : connections) {
ConnectionInfo & info = e.second;
if (info.can_read &&
(info.packet_number > max_packet_number)) {
max_packet_number = info.packet_number;
res = &info;
}
}
if (res == nullptr)
throw Exception("", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
return *res;
}
Connection::Packet ReplicasConnections::receivePacket()
{
while (true)
{
ConnectionInfo & info = pickConnection();
while (info.is_valid)
{
Connection::Packet packet = info.connection->receivePacket();
if (info.packet_number == next_packet_number)
{
++next_packet_number;
return packet;
}
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
++info.packet_number;
break;
default:
info.is_valid = false;
break;
}
}
}
}
void ReplicasConnections::sendQuery(const String & query, const String & query_id, UInt64 stage,
const Settings * settings_, bool with_pending_data)
{
for (auto & e : connections)
{
Connection * connection = e.second.connection;
connection->sendQuery(query, query_id, stage, settings_, with_pending_data);
}
}
}