dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-15 21:03:59 +03:00
parent 48fbd5c93a
commit f256bae7cf
4 changed files with 35 additions and 35 deletions

View File

@ -26,7 +26,7 @@ namespace DB
using Poco::SharedPtr; using Poco::SharedPtr;
class ReplicasConnections; class ShardReplicas;
/// Поток блоков читающих из таблицы и ее имя /// Поток блоков читающих из таблицы и ее имя
typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData; typedef std::pair<BlockInputStreamPtr, std::string> ExternalTableData;
@ -42,7 +42,7 @@ typedef std::vector<ExternalTableData> ExternalTablesData;
*/ */
class Connection : private boost::noncopyable class Connection : private boost::noncopyable
{ {
friend class ReplicasConnections; friend class ShardReplicas;
public: public:
Connection(const String & host_, UInt16 port_, const String & default_database_, Connection(const String & host_, UInt16 port_, const String & default_database_,

View File

@ -9,15 +9,15 @@ namespace DB
/** /**
* Множество реплик одного шарда. * Множество реплик одного шарда.
*/ */
class ReplicasConnections final class ShardReplicas final
{ {
public: public:
ReplicasConnections(IConnectionPool * pool_, Settings * settings_); ShardReplicas(IConnectionPool * pool_, Settings * settings_);
~ReplicasConnections() = default; ~ShardReplicas() = default;
ReplicasConnections(const ReplicasConnections &) = delete; ShardReplicas(const ShardReplicas &) = delete;
ReplicasConnections & operator=(const ReplicasConnections &) = delete; ShardReplicas & operator=(const ShardReplicas &) = delete;
/// Получить пакет от какой-нибудь реплики. /// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket(); Connection::Packet receivePacket();
@ -71,7 +71,7 @@ namespace DB
private: private:
/// Выбрать реплику, на которой можно прочитать данные. /// Выбрать реплику, на которой можно прочитать данные.
Replica & pickConnection(); Replica & pickReplica();
/// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах. /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
int waitForReadEvent(); int waitForReadEvent();

View File

@ -8,7 +8,7 @@
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <DB/Client/ConnectionPool.h> #include <DB/Client/ConnectionPool.h>
#include <DB/Client/ReplicasConnections.h> #include <DB/Client/ShardReplicas.h>
namespace DB namespace DB
@ -87,7 +87,7 @@ public:
{ {
std::string addresses; std::string addresses;
if (use_many_replicas) if (use_many_replicas)
addresses = replicas_connections->dumpAddresses(); addresses = shard_replicas->dumpAddresses();
else else
addresses = connection->getServerAddress(); addresses = connection->getServerAddress();
@ -95,7 +95,7 @@ public:
/// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос. /// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос.
if (use_many_replicas) if (use_many_replicas)
replicas_connections->sendCancel(); shard_replicas->sendCancel();
else else
connection->sendCancel(); connection->sendCancel();
@ -112,7 +112,7 @@ public:
if (sent_query && !finished) if (sent_query && !finished)
{ {
if (use_many_replicas) if (use_many_replicas)
replicas_connections->disconnect(); shard_replicas->disconnect();
else else
connection->disconnect(); connection->disconnect();
} }
@ -122,7 +122,7 @@ protected:
/// Отправить на удаленные сервера все временные таблицы /// Отправить на удаленные сервера все временные таблицы
void sendExternalTables() void sendExternalTables()
{ {
size_t count = use_many_replicas ? replicas_connections->size() : 1; size_t count = use_many_replicas ? shard_replicas->size() : 1;
std::vector<ExternalTablesData> instances; std::vector<ExternalTablesData> instances;
instances.reserve(count); instances.reserve(count);
@ -145,7 +145,7 @@ protected:
} }
if (use_many_replicas) if (use_many_replicas)
replicas_connections->sendExternalTablesData(instances); shard_replicas->sendExternalTablesData(instances);
else else
connection->sendExternalTablesData(instances[0]); connection->sendExternalTablesData(instances[0]);
} }
@ -156,8 +156,8 @@ protected:
{ {
if (use_many_replicas) if (use_many_replicas)
{ {
replicas_connections.reset(new ReplicasConnections(pool, &settings)); shard_replicas.reset(new ShardReplicas(pool, &settings));
replicas_connections->sendQuery(query, "", stage, &settings, true); shard_replicas->sendQuery(query, "", stage, &settings, true);
} }
else else
{ {
@ -177,7 +177,7 @@ protected:
while (true) while (true)
{ {
Connection::Packet packet = use_many_replicas ? replicas_connections->receivePacket() : connection->receivePacket(); Connection::Packet packet = use_many_replicas ? shard_replicas->receivePacket() : connection->receivePacket();
switch (packet.type) switch (packet.type)
{ {
@ -248,7 +248,7 @@ protected:
{ {
std::string addresses; std::string addresses;
if (use_many_replicas) if (use_many_replicas)
addresses = replicas_connections->dumpAddresses(); addresses = shard_replicas->dumpAddresses();
else else
addresses = connection->getServerAddress(); addresses = connection->getServerAddress();
@ -257,13 +257,13 @@ protected:
was_cancelled = true; was_cancelled = true;
if (use_many_replicas) if (use_many_replicas)
replicas_connections->sendCancel(); shard_replicas->sendCancel();
else else
connection->sendCancel(); connection->sendCancel();
} }
if (use_many_replicas) if (use_many_replicas)
replicas_connections->drainResidualPackets(); shard_replicas->drainResidualPackets();
else else
{ {
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером. /// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.
@ -302,7 +302,7 @@ private:
ConnectionPool::Entry pool_entry; ConnectionPool::Entry pool_entry;
Connection * connection = nullptr; Connection * connection = nullptr;
std::unique_ptr<ReplicasConnections> replicas_connections; std::unique_ptr<ShardReplicas> shard_replicas;
const String query; const String query;
bool send_settings; bool send_settings;

View File

@ -1,9 +1,9 @@
#include <DB/Client/ReplicasConnections.h> #include <DB/Client/ShardReplicas.h>
#include <DB/Client/ConnectionPool.h> #include <DB/Client/ConnectionPool.h>
namespace DB namespace DB
{ {
ReplicasConnections::ReplicasConnections(IConnectionPool * pool_, Settings * settings_) : ShardReplicas::ShardReplicas(IConnectionPool * pool_, Settings * settings_) :
settings(settings_) settings(settings_)
{ {
auto entries = pool_->getMany(settings); auto entries = pool_->getMany(settings);
@ -17,7 +17,7 @@ namespace DB
} }
} }
int ReplicasConnections::waitForReadEvent() int ShardReplicas::waitForReadEvent()
{ {
if (valid_replicas_count == 0) if (valid_replicas_count == 0)
return 0; return 0;
@ -50,7 +50,7 @@ namespace DB
return n; return n;
} }
ReplicasConnections::Replica & ReplicasConnections::pickConnection() ShardReplicas::Replica & ShardReplicas::pickReplica()
{ {
Replica * res = nullptr; Replica * res = nullptr;
@ -75,11 +75,11 @@ namespace DB
return *res; return *res;
} }
Connection::Packet ReplicasConnections::receivePacket() Connection::Packet ShardReplicas::receivePacket()
{ {
while (true) while (true)
{ {
Replica & replica = pickConnection(); Replica & replica = pickReplica();
bool retry = false; bool retry = false;
while (replica.is_valid) while (replica.is_valid)
@ -131,7 +131,7 @@ namespace DB
} }
} }
void ReplicasConnections::sendQuery(const String & query, const String & query_id, UInt64 stage, void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage,
const Settings * settings_, bool with_pending_data) const Settings * settings_, bool with_pending_data)
{ {
for (auto & e : replica_hash) for (auto & e : replica_hash)
@ -141,7 +141,7 @@ namespace DB
} }
} }
void ReplicasConnections::disconnect() void ShardReplicas::disconnect()
{ {
for (auto & e : replica_hash) for (auto & e : replica_hash)
{ {
@ -154,7 +154,7 @@ namespace DB
} }
} }
void ReplicasConnections::sendCancel() void ShardReplicas::sendCancel()
{ {
for (auto & e : replica_hash) for (auto & e : replica_hash)
{ {
@ -167,7 +167,7 @@ namespace DB
} }
} }
void ReplicasConnections::drainResidualPackets() void ShardReplicas::drainResidualPackets()
{ {
bool caught_exceptions = false; bool caught_exceptions = false;
@ -197,13 +197,13 @@ namespace DB
continue; continue;
case Protocol::Server::Exception: case Protocol::Server::Exception:
// Accumulate replica from packet.exception // XXX Что делать?
caught_exceptions = true; caught_exceptions = true;
again = false; again = false;
continue; continue;
default: default:
// Accumulate replica (server address) // XXX Что делать?
caught_exceptions = true; caught_exceptions = true;
again = false; again = false;
continue; continue;
@ -218,7 +218,7 @@ namespace DB
} }
} }
std::string ReplicasConnections::dumpAddresses() const std::string ShardReplicas::dumpAddresses() const
{ {
if (valid_replicas_count == 0) if (valid_replicas_count == 0)
return ""; return "";
@ -240,7 +240,7 @@ namespace DB
return os.str(); return os.str();
} }
void ReplicasConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data) void ShardReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{ {
if (data.size() != replica_hash.size()) if (data.size() != replica_hash.size())
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);