dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-15 19:55:50 +03:00
parent fe2e42920a
commit 866e733991
2 changed files with 111 additions and 88 deletions

View File

@ -5,59 +5,84 @@
namespace DB namespace DB
{ {
class IConnectionPool; class IConnectionPool;
class ReplicasConnections
{
public:
struct ConnectionInfo
{
ConnectionInfo(Connection * connection_) : connection(connection_) {}
Connection * connection;
int packet_number = 0;
bool can_read = false;
bool is_valid = true;
};
/**
* Множество реплик одного шарда.
*/
class ReplicasConnections final
{
public: public:
ReplicasConnections(IConnectionPool * pool_, Settings * settings_); ReplicasConnections(IConnectionPool * pool_, Settings * settings_);
~ReplicasConnections() = default; ~ReplicasConnections() = default;
ReplicasConnections(const ReplicasConnections &) = delete; ReplicasConnections(const ReplicasConnections &) = delete;
ReplicasConnections & operator=(const ReplicasConnections &) = delete; ReplicasConnections & operator=(const ReplicasConnections &) = delete;
int waitForReadEvent(); /// Получить пакет от какой-нибудь реплики.
ConnectionInfo & pickConnection();
Connection::Packet receivePacket(); Connection::Packet receivePacket();
/// Отправить запрос ко всем репликам.
void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete, void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings_ = nullptr, bool with_pending_data = false); const Settings * settings_ = nullptr, bool with_pending_data = false);
/// Разорвать соединения ко всем репликам
void disconnect(); void disconnect();
/// Отменить запросы у всех реплик
void sendCancel(); void sendCancel();
/// Для каждой реплики получить оставшиеся пакеты при отмене запроса.
void drainResidualPackets(); void drainResidualPackets();
/// Получить адреса всех реплик в виде строки.
std::string dumpAddresses() const; std::string dumpAddresses() const;
size_t size() const; /// Возвращает количесто реплик.
size_t size() const
{
return replica_hash.size();
}
/// Отправить ко всем репликам всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data); void sendExternalTablesData(std::vector<ExternalTablesData> & data);
private: private:
using ConnectionHash = std::unordered_map<int, ConnectionInfo>; /// Описание реплики.
struct Replica
{
Replica(Connection * connection_) : connection(connection_) {}
/// Соединение к реплике
Connection * connection;
/// Номер следующего ожиданного пакета.
int next_packet_number = 0;
/// Есть ли данные, которые можно прочитать?
bool can_read = false;
/// Является ли реплика валидной для чтения?
bool is_valid = true;
};
/// Реплики хэшированные по id сокета
using ReplicaHash = std::unordered_map<int, Replica>;
private:
/// Выбрать реплику, на которой можно прочитать данные.
Replica & pickConnection();
/// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
int waitForReadEvent();
private: private:
Settings * settings; Settings * settings;
Poco::Timespan select_timeout;
ConnectionHash connection_hash; ReplicaHash replica_hash;
size_t valid_connections_count; size_t valid_replicas_count;
/// Номер следующего ожиданного пакета.
int next_packet_number = 0; int next_packet_number = 0;
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
}; };
} }

View File

@ -4,65 +4,67 @@
namespace DB namespace DB
{ {
ReplicasConnections::ReplicasConnections(IConnectionPool * pool_, Settings * settings_) : ReplicasConnections::ReplicasConnections(IConnectionPool * pool_, Settings * settings_) :
settings(settings_), settings(settings_)
select_timeout(settings->poll_interval * 1000000)
{ {
auto entries = pool_->getMany(settings); auto entries = pool_->getMany(settings);
valid_connections_count = entries.size(); valid_replicas_count = entries.size();
connection_hash.reserve(valid_connections_count); replica_hash.reserve(valid_replicas_count);
for (auto & entry : entries) for (auto & entry : entries)
{ {
Connection * connection = &*entry; Connection * connection = &*entry;
connection_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), ConnectionInfo(connection))); replica_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), Replica(connection)));
} }
} }
int ReplicasConnections::waitForReadEvent() int ReplicasConnections::waitForReadEvent()
{ {
if (valid_connections_count == 0) if (valid_replicas_count == 0)
return 0; return 0;
Poco::Net::Socket::SocketList read_list; Poco::Net::Socket::SocketList write_list;
read_list.reserve(valid_connections_count); Poco::Net::Socket::SocketList except_list;
for (auto & e : connection_hash) Poco::Net::Socket::SocketList read_list;
read_list.reserve(valid_replicas_count);
for (auto & e : replica_hash)
{ {
ConnectionInfo & info = e.second; Replica & replica = e.second;
info.can_read = false; replica.can_read = false;
if (info.is_valid) if (replica.is_valid)
read_list.push_back(info.connection->socket); read_list.push_back(replica.connection->socket);
} }
int n = Poco::Net::Socket::select(read_list, write_list, except_list, select_timeout); int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000);
for (const auto & socket : read_list) for (const auto & socket : read_list)
{ {
auto it = connection_hash.find(socket.impl()->sockfd()); auto it = replica_hash.find(socket.impl()->sockfd());
if (it == connection_hash.end()) if (it == replica_hash.end())
throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA); throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA);
ConnectionInfo & info = it->second; Replica & replica = it->second;
info.can_read = true; replica.can_read = true;
} }
return n; return n;
} }
ReplicasConnections::ConnectionInfo & ReplicasConnections::pickConnection() ReplicasConnections::Replica & ReplicasConnections::pickConnection()
{ {
ConnectionInfo * res = nullptr; Replica * res = nullptr;
int n = waitForReadEvent(); int n = waitForReadEvent();
if (n > 0) if (n > 0)
{ {
int max_packet_number = -1; int max_packet_number = -1;
for (auto & e : connection_hash) for (auto & e : replica_hash)
{ {
ConnectionInfo & info = e.second; Replica & replica = e.second;
if (info.can_read && (info.packet_number > max_packet_number)) if (replica.can_read && (replica.next_packet_number > max_packet_number))
{ {
max_packet_number = info.packet_number; max_packet_number = replica.next_packet_number;
res = &info; res = &replica;
} }
} }
} }
@ -77,12 +79,12 @@ namespace DB
{ {
while (true) while (true)
{ {
ConnectionInfo & info = pickConnection(); Replica & replica = pickConnection();
bool retry = false; bool retry = false;
while (info.is_valid) while (replica.is_valid)
{ {
Connection::Packet packet = info.connection->receivePacket(); Connection::Packet packet = replica.connection->receivePacket();
switch (packet.type) switch (packet.type)
{ {
@ -95,9 +97,9 @@ namespace DB
case Protocol::Server::EndOfStream: case Protocol::Server::EndOfStream:
case Protocol::Server::Exception: case Protocol::Server::Exception:
info.is_valid = false; replica.is_valid = false;
--valid_connections_count; --valid_replicas_count;
/// Больше ничего не читаем. Закрываем все оставшиеся валидные соединения, /// Больше ничего не читаем. Отменяем выполнение всех оставшихся запросов,
/// затем получаем оставшиеся пакеты, чтобы не было рассинхронизации с /// затем получаем оставшиеся пакеты, чтобы не было рассинхронизации с
/// репликами. /// репликами.
sendCancel(); sendCancel();
@ -107,22 +109,22 @@ namespace DB
default: default:
/// Мы получили инвалидный пакет от реплики. Повторим попытку /// Мы получили инвалидный пакет от реплики. Повторим попытку
/// c другой реплики, если такая найдется. /// c другой реплики, если такая найдется.
info.is_valid = false; replica.is_valid = false;
--valid_connections_count; --valid_replicas_count;
if (valid_connections_count > 0) if (valid_replicas_count > 0)
retry = true; retry = true;
break; break;
} }
if ((info.packet_number == next_packet_number) && !retry) if ((replica.next_packet_number == next_packet_number) && !retry)
{ {
++info.packet_number; ++replica.next_packet_number;
++next_packet_number; ++next_packet_number;
return packet; return packet;
} }
else else
{ {
++info.packet_number; ++replica.next_packet_number;
retry = false; retry = false;
} }
} }
@ -132,7 +134,7 @@ namespace DB
void ReplicasConnections::sendQuery(const String & query, const String & query_id, UInt64 stage, void ReplicasConnections::sendQuery(const String & query, const String & query_id, UInt64 stage,
const Settings * settings_, bool with_pending_data) const Settings * settings_, bool with_pending_data)
{ {
for (auto & e : connection_hash) for (auto & e : replica_hash)
{ {
Connection * connection = e.second.connection; Connection * connection = e.second.connection;
connection->sendQuery(query, query_id, stage, settings_, with_pending_data); connection->sendQuery(query, query_id, stage, settings_, with_pending_data);
@ -141,12 +143,12 @@ namespace DB
void ReplicasConnections::disconnect() void ReplicasConnections::disconnect()
{ {
for (auto & e : connection_hash) for (auto & e : replica_hash)
{ {
ConnectionInfo & info = e.second; Replica & replica = e.second;
if (info.is_valid) if (replica.is_valid)
{ {
Connection * connection = info.connection; Connection * connection = replica.connection;
connection->disconnect(); connection->disconnect();
} }
} }
@ -154,12 +156,12 @@ namespace DB
void ReplicasConnections::sendCancel() void ReplicasConnections::sendCancel()
{ {
for (auto & e : connection_hash) for (auto & e : replica_hash)
{ {
ConnectionInfo & info = e.second; Replica & replica = e.second;
if (info.is_valid) if (replica.is_valid)
{ {
Connection * connection = info.connection; Connection * connection = replica.connection;
connection->sendCancel(); connection->sendCancel();
} }
} }
@ -169,12 +171,12 @@ namespace DB
{ {
bool caught_exceptions = false; bool caught_exceptions = false;
for (auto & e : connection_hash) for (auto & e : replica_hash)
{ {
ConnectionInfo & info = e.second; Replica & replica = e.second;
if (info.is_valid) if (replica.is_valid)
{ {
Connection * connection = info.connection; Connection * connection = replica.connection;
bool again = true; bool again = true;
while (again) while (again)
@ -195,13 +197,13 @@ namespace DB
continue; continue;
case Protocol::Server::Exception: case Protocol::Server::Exception:
// Accumulate info from packet.exception // Accumulate replica from packet.exception
caught_exceptions = true; caught_exceptions = true;
again = false; again = false;
continue; continue;
default: default:
// Accumulate info (server address) // Accumulate replica (server address)
caught_exceptions = true; caught_exceptions = true;
again = false; again = false;
continue; continue;
@ -212,22 +214,23 @@ namespace DB
if (caught_exceptions) if (caught_exceptions)
{ {
// XXX Что выкидываем?
} }
} }
std::string ReplicasConnections::dumpAddresses() const std::string ReplicasConnections::dumpAddresses() const
{ {
if (valid_connections_count == 0) if (valid_replicas_count == 0)
return ""; return "";
std::ostringstream os; std::ostringstream os;
for (auto & e : connection_hash) for (auto & e : replica_hash)
{ {
char prefix = '\0'; char prefix = '\0';
const ConnectionInfo & info = e.second; const Replica & replica = e.second;
if (info.is_valid) if (replica.is_valid)
{ {
const Connection * connection = info.connection; const Connection * connection = replica.connection;
os << prefix << connection->getServerAddress(); os << prefix << connection->getServerAddress();
if (prefix == '\0') if (prefix == '\0')
prefix = ';'; prefix = ';';
@ -237,18 +240,13 @@ namespace DB
return os.str(); return os.str();
} }
size_t ReplicasConnections::size() const
{
return connection_hash.size();
}
void ReplicasConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data) void ReplicasConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{ {
if (data.size() != connection_hash.size()) if (data.size() != replica_hash.size())
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin(); auto it = data.begin();
for (auto & e : connection_hash) for (auto & e : replica_hash)
{ {
Connection * connection = e.second.connection; Connection * connection = e.second.connection;
connection->sendExternalTablesData(*it); connection->sendExternalTablesData(*it);