dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-23 00:54:16 +03:00
parent f618f02d23
commit 6f77a12018
3 changed files with 213 additions and 146 deletions

View File

@ -18,38 +18,35 @@ namespace DB
ShardReplicas(const ShardReplicas &) = delete;
ShardReplicas & operator=(const ShardReplicas &) = delete;
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
/// Отправить запрос ко всем репликам.
/// Отправить запрос на реплики.
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
/// Разорвать соединения ко всем репликам
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
/// Разорвать соединения к репликам
void disconnect();
/// Отменить запросы у всех реплик
/// Отменить запросы к репликам
void sendCancel();
/// Для каждой реплики получить оставшиеся пакеты при отмене запроса.
void drainResidualPackets();
/// Для каждой реплики получить оставшиеся пакеты после отмена запроса.
Connection::Packet drain();
/// Получить адреса всех реплик в виде строки.
/// Получить адреса реплик в виде строки.
std::string dumpAddresses() const;
/// Возвращает количесто реплик.
size_t size() const
{
return replica_hash.size();
}
/// Отправить ко всем репликам всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
size_t size() const { return replica_hash.size(); }
private:
/// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
/// Возвращает соединение на реплику, с которой можно прочитать данные, если такая есть.
Connection * waitForReadEvent();
Connection ** waitForReadEvent();
private:
/// Реплики хэшированные по id сокета
@ -58,5 +55,8 @@ namespace DB
private:
const Settings & settings;
ReplicaHash replica_hash;
size_t active_connection_count = 0;
bool sent_query = false;
bool cancelled = false;
};
}

View File

@ -158,9 +158,10 @@ protected:
{
auto entries = pool->getMany(&settings);
if (entries.size() > 1)
shard_replicas.reset(new ShardReplicas(entries, settings));
else if (entries.size() == 1)
shard_replicas = ext::make_unique<ShardReplicas>(entries, settings);
else
{
/// NOTE IConnectionPool::getMany() всегда возвращает как минимум одно соединение.
use_many_replicas = false;
connection = &*entries[0];
}
@ -272,7 +273,22 @@ protected:
}
if (use_many_replicas)
shard_replicas->drainResidualPackets();
{
Connection::Packet packet = shard_replicas->drain();
switch (packet.type)
{
case Protocol::Server::EndOfStream:
break;
case Protocol::Server::Exception:
got_exception_from_server = true;
packet.exception->rethrow();
break;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
else
{
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.

View File

@ -3,7 +3,8 @@
namespace DB
{
ShardReplicas::ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_) :
settings(settings_)
settings(settings_),
active_connection_count(entries.size())
{
replica_hash.reserve(entries.size());
@ -14,43 +15,29 @@ namespace DB
}
}
Connection * ShardReplicas::waitForReadEvent()
void ShardReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
if (sent_query)
throw Exception("Cannot send external tables data: query already sent.");
Poco::Net::Socket::SocketList read_list;
read_list.reserve(replica_hash.size());
if (data.size() < active_connection_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_hash)
{
Connection * connection = e.second;
read_list.push_back(connection->socket);
if (connection != nullptr)
connection->sendExternalTablesData(*it);
++it;
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
if (n == 0)
return nullptr;
auto & socket = read_list[rand() % n];
auto it = replica_hash.find(socket.impl()->sockfd());
if (it == replica_hash.end())
throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA);
return it->second;
}
Connection::Packet ShardReplicas::receivePacket()
{
Connection * connection = waitForReadEvent();
if (connection == nullptr)
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
Connection::Packet packet = connection->receivePacket();
return packet;
}
void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
if (sent_query)
throw Exception("Query already sent.");
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_hash.size();
UInt64 offset = 0;
@ -58,42 +45,27 @@ namespace DB
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
{
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
query_settings.parallel_replica_offset = offset;
++offset;
}
}
void ShardReplicas::disconnect()
{
for (auto & e : replica_hash)
{
Connection * connection = e.second;
connection->disconnect();
}
sent_query = true;
}
void ShardReplicas::sendCancel()
Connection::Packet ShardReplicas::receivePacket()
{
for (auto & e : replica_hash)
{
Connection * connection = e.second;
connection->sendCancel();
}
}
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.");
void ShardReplicas::drainResidualPackets()
{
bool caught_exceptions = false;
Connection ** connection = waitForReadEvent();
if (connection == nullptr)
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
for (auto & e : replica_hash)
{
Connection * connection = e.second;
bool again = true;
while (again)
{
Connection::Packet packet = connection->receivePacket();
Connection::Packet packet = (*connection)->receivePacket();
switch (packet.type)
{
@ -105,28 +77,92 @@ namespace DB
break;
case Protocol::Server::EndOfStream:
again = false;
continue;
*connection = nullptr;
--active_connection_count;
if (active_connection_count > 0)
{
Connection::Packet empty_packet;
empty_packet.type = Protocol::Server::Data;
return empty_packet;
}
break;
case Protocol::Server::Exception:
// XXX Что делать?
caught_exceptions = true;
again = false;
continue;
default:
// XXX Что делать?
caught_exceptions = true;
again = false;
continue;
*connection = nullptr;
--active_connection_count;
if (!cancelled)
{
sendCancel();
(void) drain();
}
break;
}
return packet;
}
void ShardReplicas::disconnect()
{
for (auto & e : replica_hash)
{
Connection * & connection = e.second;
if (connection != nullptr)
{
connection->disconnect();
connection = nullptr;
--active_connection_count;
}
}
}
if (caught_exceptions)
void ShardReplicas::sendCancel()
{
// XXX Что выкидываем?
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.");
for (auto & e : replica_hash)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ShardReplicas::drain()
{
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.");
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (active_connection_count > 0)
{
Connection::Packet packet = receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
return res;
case Protocol::Server::Exception:
default:
res = packet;
break;
}
}
return res;
}
std::string ShardReplicas::dumpAddresses() const
@ -136,25 +172,40 @@ namespace DB
{
char prefix = '\0';
const Connection * connection = e.second;
if (connection != nullptr)
{
os << prefix << connection->getServerAddress();
if (prefix == '\0')
prefix = ';';
}
}
return os.str();
}
void ShardReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
Connection ** ShardReplicas::waitForReadEvent()
{
if (data.size() != replica_hash.size())
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
Poco::Net::Socket::SocketList read_list;
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
read_list.reserve(active_connection_count);
auto it = data.begin();
for (auto & e : replica_hash)
{
Connection * connection = e.second;
connection->sendExternalTablesData(*it);
++it;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
if (n == 0)
return nullptr;
auto & socket = read_list[rand() % n];
auto it = replica_hash.find(socket.impl()->sockfd());
if (it == replica_hash.end())
throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA);
return &(it->second);
}
}