dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-14 13:06:30 +03:00
parent e26f52ae6e
commit cd4bbadb12
4 changed files with 200 additions and 7 deletions

View File

@ -26,6 +26,8 @@ namespace DB
using Poco::SharedPtr;
class ReplicasConnections;
/// Поток блоков читающих из таблицы и ее имя
typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData;
/// Вектор пар, описывающих таблицы
@ -40,6 +42,8 @@ typedef std::vector<ExternalTableData> ExternalTablesData;
*/
class Connection : private boost::noncopyable
{
friend class ReplicasConnections;
public:
Connection(const String & host_, UInt16 port_, const String & default_database_,
const String & user_, const String & password_,

View File

@ -0,0 +1,50 @@
#pragma once
#include <DB/Client/Connection.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{
class ReplicasConnections
{
public:
struct ConnectionInfo
{
ConnectionInfo(Connection * connection_) : connection(connection_) {}
Connection * connection;
int packet_number = 0;
bool can_read = false;
bool is_valid = true;
};
public:
ReplicasConnections(IConnectionPool * pool_, Settings * settings_, size_t timeout_microseconds_ = 0);
~ReplicasConnections() = default;
ReplicasConnections(const ReplicasConnections &) = delete;
ReplicasConnections & operator=(const ReplicasConnections &) = delete;
int waitForReadEvent();
ConnectionInfo & pickConnection();
Connection::Packet receivePacket();
void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings_ = nullptr, bool with_pending_data = false);
private:
using Connections = std::map<Poco::Net::StreamSocket, ConnectionInfo>;
private:
IConnectionPool * pool;
Settings * settings;
size_t timeout_microseconds;
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
Connections connections;
int next_packet_number = 0;
};
}

View File

@ -8,6 +8,7 @@
#include <DB/Interpreters/Context.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/ReplicasConnections.h>
namespace DB
@ -24,10 +25,12 @@ private:
{
send_settings = true;
settings = *settings_;
use_many_replicas = (pool != nullptr) && UInt64(settings.max_parallel_replicas) > 1;
}
else
send_settings = false;
}
public:
/// Принимает готовое соединение.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
@ -116,28 +119,45 @@ protected:
else
res.push_back(std::make_pair(input[0], table.first));
}
connection->sendExternalTablesData(res);
if (use_many_replicas)
{
/// XXX Какой из этих вариантов правильный?
/// 1. Выбрать одно соединение, например connection[0], и к нему применить sendExternalTablesData(res)?
/// 2. Отправить res по всем соединениям? <- this one!!!
//replicas_connections->sendExternalTablesData(res);
}
else
connection->sendExternalTablesData(res);
}
Block readImpl() override
{
if (!sent_query)
{
/// Если надо - достаём соединение из пула.
if (pool)
if (use_many_replicas)
{
pool_entry = pool->get(send_settings ? &settings : nullptr);
connection = &*pool_entry;
replicas_connections.reset(new ReplicasConnections(pool, &settings));
replicas_connections->sendQuery(query, "", stage, &settings, true);
}
else
{
/// Если надо - достаём соединение из пула.
if (pool)
{
pool_entry = pool->get(send_settings ? &settings : nullptr);
connection = &*pool_entry;
}
connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true);
}
connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true);
sendExternalTables();
sent_query = true;
}
while (true)
{
Connection::Packet packet = connection->receivePacket();
Connection::Packet packet = use_many_replicas ? replicas_connections->receivePacket() : connection->receivePacket();
switch (packet.type)
{
@ -247,6 +267,8 @@ private:
ConnectionPool::Entry pool_entry;
Connection * connection = nullptr;
std::unique_ptr<ReplicasConnections> replicas_connections;
const String query;
bool send_settings;
Settings settings;
@ -255,6 +277,8 @@ private:
QueryProcessingStage::Enum stage;
Context context;
bool use_many_replicas = false;
/// Отправили запрос (это делается перед получением первого блока).
bool sent_query = false;

View File

@ -0,0 +1,115 @@
#include <DB/Client/ReplicasConnections.h>
namespace DB
{
ReplicasConnections::ReplicasConnections(IConnectionPool * pool_, Settings * settings_, size_t timeout_microseconds_) :
pool(pool_),
settings(settings_),
timeout_microseconds(timeout_microseconds_)
{
auto entries = pool->getMany(settings);
for (auto & entry : entries)
{
Connection * connection = &*entry;
connections.insert(std::make_pair(connection->socket, ConnectionInfo(connection)));
}
}
int ReplicasConnections::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list(connections.size());
auto it = read_list.begin();
for (auto & e : connections)
{
ConnectionInfo & info = e.second;
info.can_read = false;
if (info.is_valid)
{
*it = e.first;
++it;
}
}
if (read_list.empty())
return 0;
int n = Poco::Net::Socket::select(read_list, write_list, except_list, Poco::Timespan(timeout_microseconds));
for (const auto & socket : read_list)
{
auto place = connections.find(socket);
ConnectionInfo & info = place->second;
info.can_read = true;
}
return n;
}
ReplicasConnections::ConnectionInfo & ReplicasConnections::pickConnection()
{
int n = waitForReadEvent();
if (n == 0)
throw Exception("", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
int max_packet_number = -1;
ConnectionInfo * res = nullptr;
for (auto & e : connections) {
ConnectionInfo & info = e.second;
if (info.can_read &&
(info.packet_number > max_packet_number)) {
max_packet_number = info.packet_number;
res = &info;
}
}
if (res == nullptr)
throw Exception("", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
return *res;
}
Connection::Packet ReplicasConnections::receivePacket()
{
while (true)
{
ConnectionInfo & info = pickConnection();
while (info.is_valid)
{
Connection::Packet packet = info.connection->receivePacket();
if (info.packet_number == next_packet_number)
{
++next_packet_number;
return packet;
}
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
++info.packet_number;
break;
default:
info.is_valid = false;
break;
}
}
}
}
void ReplicasConnections::sendQuery(const String & query, const String & query_id, UInt64 stage,
const Settings * settings_, bool with_pending_data)
{
for (auto & e : connections)
{
Connection * connection = e.second.connection;
connection->sendQuery(query, query_id, stage, settings_, with_pending_data);
}
}
}