dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-18 18:52:09 +03:00
parent 15eb461e16
commit a2f93f6326
2 changed files with 85 additions and 209 deletions

View File

@ -47,41 +47,16 @@ namespace DB
void sendExternalTablesData(std::vector<ExternalTablesData> & data); void sendExternalTablesData(std::vector<ExternalTablesData> & data);
private: private:
/// Описание реплики. /// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
struct Replica /// Возвращает соединение на реплику, с которой можно прочитать данные, если такая есть.
{ Connection * waitForReadEvent();
Replica(Connection * connection_) : connection(connection_) {}
/// Соединение к реплике
Connection * connection;
/// Номер следующего ожидаемого пакета.
int next_packet_number = 0;
/// Есть ли данные, которые можно прочитать?
bool can_read = false;
/// Является ли реплика валидной для чтения?
bool is_valid = true;
};
/// Реплики хэшированные по id сокета
using ReplicaHash = std::unordered_map<int, Replica>;
private: private:
/// Выбрать реплику, на которой можно прочитать данные. /// Реплики хэшированные по id сокета
Replica & pickReplica(); using ReplicaHash = std::unordered_map<int, Connection *>;
/// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
int waitForReadEvent();
private: private:
const Settings & settings; const Settings & settings;
ReplicaHash replica_hash; ReplicaHash replica_hash;
size_t valid_replicas_count;
/// Номер следующего ожидаемого пакета.
int next_packet_number = 0;
}; };
} }

View File

@ -5,84 +5,95 @@ namespace DB
ShardReplicas::ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_) : ShardReplicas::ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_) :
settings(settings_) settings(settings_)
{ {
valid_replicas_count = entries.size(); replica_hash.reserve(entries.size());
replica_hash.reserve(valid_replicas_count);
for (auto & entry : entries) for (auto & entry : entries)
{ {
Connection * connection = &*entry; Connection * connection = &*entry;
replica_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), Replica(connection))); replica_hash.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
} }
} }
int ShardReplicas::waitForReadEvent() Connection * ShardReplicas::waitForReadEvent()
{ {
if (valid_replicas_count == 0)
return 0;
Poco::Net::Socket::SocketList write_list; Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list; Poco::Net::Socket::SocketList except_list;
Poco::Net::Socket::SocketList read_list; Poco::Net::Socket::SocketList read_list;
read_list.reserve(valid_replicas_count); read_list.reserve(replica_hash.size());
for (auto & e : replica_hash) for (auto & e : replica_hash)
{ {
Replica & replica = e.second; Connection * connection = e.second;
replica.can_read = false; read_list.push_back(connection->socket);
if (replica.is_valid)
read_list.push_back(replica.connection->socket);
} }
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000); int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
if (n == 0)
return nullptr;
for (const auto & socket : read_list) auto & socket = read_list[rand() % n];
{ auto it = replica_hash.find(socket.impl()->sockfd());
auto it = replica_hash.find(socket.impl()->sockfd()); if (it == replica_hash.end())
if (it == replica_hash.end()) throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA);
throw Exception("Unexpected replica", ErrorCodes::UNEXPECTED_REPLICA); return it->second;
Replica & replica = it->second;
replica.can_read = true;
}
return n;
}
ShardReplicas::Replica & ShardReplicas::pickReplica()
{
Replica * res = nullptr;
int n = waitForReadEvent();
if (n > 0)
{
int max_packet_number = -1;
for (auto & e : replica_hash)
{
Replica & replica = e.second;
if (replica.can_read && (replica.next_packet_number > max_packet_number))
{
max_packet_number = replica.next_packet_number;
res = &replica;
}
}
}
if (res == nullptr)
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
return *res;
} }
Connection::Packet ShardReplicas::receivePacket() Connection::Packet ShardReplicas::receivePacket()
{ {
while (true) Connection * connection = waitForReadEvent();
{ if (connection == nullptr)
Replica & replica = pickReplica(); throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
bool retry = false;
while (replica.is_valid) Connection::Packet packet = connection->receivePacket();
return packet;
}
void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_hash.size();
UInt64 offset = 0;
for (auto & e : replica_hash)
{
Connection * connection = e.second;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
query_settings.parallel_replica_offset = offset;
++offset;
}
}
void ShardReplicas::disconnect()
{
for (auto & e : replica_hash)
{
Connection * connection = e.second;
connection->disconnect();
}
}
void ShardReplicas::sendCancel()
{
for (auto & e : replica_hash)
{
Connection * connection = e.second;
connection->sendCancel();
}
}
void ShardReplicas::drainResidualPackets()
{
bool caught_exceptions = false;
for (auto & e : replica_hash)
{
Connection * connection = e.second;
bool again = true;
while (again)
{ {
Connection::Packet packet = replica.connection->receivePacket(); Connection::Packet packet = connection->receivePacket();
switch (packet.type) switch (packet.type)
{ {
@ -94,123 +105,20 @@ namespace DB
break; break;
case Protocol::Server::EndOfStream: case Protocol::Server::EndOfStream:
again = false;
continue;
case Protocol::Server::Exception: case Protocol::Server::Exception:
replica.is_valid = false; // XXX Что делать?
--valid_replicas_count; caught_exceptions = true;
/// Больше ничего не читаем. Отменяем выполнение всех оставшихся запросов, again = false;
/// затем получаем оставшиеся пакеты, чтобы не было рассинхронизации с continue;
/// репликами.
sendCancel();
drainResidualPackets();
break;
default: default:
/// Мы получили инвалидный пакет от реплики. Повторим попытку // XXX Что делать?
/// c другой реплики, если такая найдется. caught_exceptions = true;
replica.is_valid = false; again = false;
--valid_replicas_count; continue;
if (valid_replicas_count > 0)
retry = true;
break;
}
if ((replica.next_packet_number == next_packet_number) && !retry)
{
++replica.next_packet_number;
++next_packet_number;
return packet;
}
else
{
++replica.next_packet_number;
retry = false;
}
}
}
}
void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_hash.size();
UInt64 offset = 0;
for (auto & e : replica_hash)
{
Connection * connection = e.second.connection;
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
query_settings.parallel_replica_offset = offset;
++offset;
}
}
void ShardReplicas::disconnect()
{
for (auto & e : replica_hash)
{
Replica & replica = e.second;
if (replica.is_valid)
{
Connection * connection = replica.connection;
connection->disconnect();
}
}
}
void ShardReplicas::sendCancel()
{
for (auto & e : replica_hash)
{
Replica & replica = e.second;
if (replica.is_valid)
{
Connection * connection = replica.connection;
connection->sendCancel();
}
}
}
void ShardReplicas::drainResidualPackets()
{
bool caught_exceptions = false;
for (auto & e : replica_hash)
{
Replica & replica = e.second;
if (replica.is_valid)
{
Connection * connection = replica.connection;
bool again = true;
while (again)
{
Connection::Packet packet = connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break;
case Protocol::Server::EndOfStream:
again = false;
continue;
case Protocol::Server::Exception:
// XXX Что делать?
caught_exceptions = true;
again = false;
continue;
default:
// XXX Что делать?
caught_exceptions = true;
again = false;
continue;
}
} }
} }
} }
@ -223,21 +131,14 @@ namespace DB
std::string ShardReplicas::dumpAddresses() const std::string ShardReplicas::dumpAddresses() const
{ {
if (valid_replicas_count == 0)
return "";
std::ostringstream os; std::ostringstream os;
for (auto & e : replica_hash) for (auto & e : replica_hash)
{ {
char prefix = '\0'; char prefix = '\0';
const Replica & replica = e.second; const Connection * connection = e.second;
if (replica.is_valid) os << prefix << connection->getServerAddress();
{ if (prefix == '\0')
const Connection * connection = replica.connection; prefix = ';';
os << prefix << connection->getServerAddress();
if (prefix == '\0')
prefix = ';';
}
} }
return os.str(); return os.str();
@ -251,7 +152,7 @@ namespace DB
auto it = data.begin(); auto it = data.begin();
for (auto & e : replica_hash) for (auto & e : replica_hash)
{ {
Connection * connection = e.second.connection; Connection * connection = e.second;
connection->sendExternalTablesData(*it); connection->sendExternalTablesData(*it);
++it; ++it;
} }