dbms: Server: queries with several replicas: fixes [#METR-14410]

This commit is contained in:
Alexey Arno 2015-02-07 20:12:29 +03:00
parent 7d5f75a804
commit ba96a87097
3 changed files with 50 additions and 45 deletions

View File

@ -36,8 +36,9 @@ namespace DB
/// Отменить запросы к репликам
void sendCancel();
/// Для каждой реплики получить оставшиеся пакеты после отмена запроса.
/// Возвращает либо последнее полученное исключение либо пакет EndOfStream.
/// На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception.
/// Возвращает EndOfStream, если не было получено никакого исключения. В противном
/// случае возвращает последний полученный пакет типа Exception.
Connection::Packet drain();
/// Получить адреса реплик в виде строки.
@ -46,26 +47,26 @@ namespace DB
/// Возвращает количесто реплик.
size_t size() const { return replica_map.size(); }
/// Проверить, есть ли действительные соединения к репликам.
bool hasActiveConnections() const { return active_connection_count > 0; }
/// Проверить, есть ли действительные реплики.
bool hasActiveReplicas() const { return active_replica_count > 0; }
private:
/// Реплики хэшированные по id сокета
using ReplicaMap = std::unordered_map<int, Connection *>;
private:
/// Зарегистрировать соединение к реплике.
void addConnection(Connection * connection);
/// Зарегистрировать реплику.
void registerReplica(Connection * connection);
/// Получить соединение к реплике, на которой можно прочитать данные.
ReplicaMap::iterator getConnection();
/// Получить реплику, на которой можно прочитать данные.
ReplicaMap::iterator getReplicaForReading();
/// Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
/// Возвращает соединение к такой реплике, если оно найдётся.
/// Возвращает одну такую реплику, если она найдётся.
ReplicaMap::iterator waitForReadEvent();
// Пометить соединение как недействительное.
void invalidateConnection(ReplicaMap::iterator it);
// Пометить реплику как недействительную.
void invalidateReplica(ReplicaMap::iterator it);
private:
Settings * settings;
@ -75,7 +76,7 @@ namespace DB
ConnectionPool::Entry pool_entry;
/// Текущее количество действительных соединений к репликам.
size_t active_connection_count = 0;
size_t active_replica_count = 0;
/// Запрос выполняется параллельно на нескольких репликах.
bool supports_parallel_execution;
/// Отправили запрос

View File

@ -14,7 +14,7 @@
namespace DB
{
/** Позволяет выполнить запрос (SELECT) на удалённом сервере и получить результат.
/** Позволяет выполнить запрос (SELECT) на удалённых репликах одного шарда и получить результат.
*/
class RemoteBlockInputStream : public IProfilingBlockInputStream
{
@ -87,7 +87,7 @@ public:
std::string addresses = parallel_replicas->dumpAddresses();
LOG_TRACE(log, "(" + addresses + ") Cancelling query");
/// Если запрошено прервать запрос - попросим удалённый сервер тоже прервать запрос.
/// Если запрошено прервать запрос - попросим удалённые реплики тоже прервать запрос.
parallel_replicas->sendCancel();
was_cancelled = true;
}
@ -96,15 +96,15 @@ public:
~RemoteBlockInputStream() override
{
/** Если прервались в середине цикла общения с сервером, то закрываем соединение,
* чтобы оно не осталось висеть в рассихронизированном состоянии.
/** Если прервались в середине цикла общения с репликами, то закрываем соединения,
* чтобы они не остались висеть в рассихронизированном состоянии.
*/
if (sent_query && !finished)
parallel_replicas->disconnect();
}
protected:
/// Отправить на удаленные сервера все временные таблицы
/// Отправить на удаленные реплики все временные таблицы
void sendExternalTables()
{
size_t count = parallel_replicas->size();
@ -161,7 +161,7 @@ protected:
break;
case Protocol::Server::EndOfStream:
if (!parallel_replicas->hasActiveConnections())
if (!parallel_replicas->hasActiveReplicas())
{
finished = true;
return Block();
@ -226,7 +226,7 @@ protected:
parallel_replicas->sendCancel();
}
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединении с сервером.
/// Получим оставшиеся пакеты, чтобы не было рассинхронизации в соединениях с репликами.
Connection::Packet packet = parallel_replicas->drain();
switch (packet.type)
{
@ -244,6 +244,7 @@ protected:
}
}
/// Создать структуру для общения с репликами одного шарда, на которых должен выполниться запрос.
void createParallelReplicas()
{
Settings * parallel_replicas_settings = send_settings ? &settings : nullptr;
@ -253,6 +254,9 @@ protected:
parallel_replicas = ext::make_unique<ParallelReplicas>(pool, parallel_replicas_settings);
}
/// Если был получен пакет типа Exception или неизвестного типа, отменить запросы на всех
/// репликах. Читать и пропускать все оставшиеся пакеты до EndOfStream или Exception, чтобы
/// не было рассинхронизации в соединениях с репликами.
void abort()
{
std::string addresses = parallel_replicas->dumpAddresses();
@ -280,20 +284,20 @@ private:
/// Отправили запрос (это делается перед получением первого блока).
bool sent_query = false;
/** Получили все данные от сервера, до пакета EndOfStream.
/** Получили все данные от всех реплик, до пакета EndOfStream.
* Если при уничтожении объекта, ещё не все данные считаны,
* то для того, чтобы не было рассинхронизации, на сервер отправляется просьба прервать выполнение запроса,
* то для того, чтобы не было рассинхронизации, на реплики отправляются просьбы прервать выполнение запроса,
* и после этого считываются все пакеты до EndOfStream.
*/
bool finished = false;
/** На сервер была отправлена просьба прервать выполенение запроса, так как данные больше не нужны.
/** На каждую реплику была отправлена просьба прервать выполенение запроса, так как данные больше не нужны.
* Это может быть из-за того, что данных достаточно (например, при использовании LIMIT),
* или если на стороне клиента произошло исключение.
*/
bool was_cancelled = false;
/// С сервера было получено исключение. В этом случае получать больше пакетов или просить прервать запрос не нужно.
/// С одной репилки было получено исключение. В этом случае получать больше пакетов или просить прервать запрос не нужно.
bool got_exception_from_server = false;
Logger * log = &Logger::get("RemoteBlockInputStream");

View File

@ -5,10 +5,10 @@ namespace DB
{
ParallelReplicas::ParallelReplicas(Connection * connection_, Settings * settings_)
: settings(settings_),
active_connection_count(1),
active_replica_count(1),
supports_parallel_execution(false)
{
addConnection(connection_);
registerReplica(connection_);
}
ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_)
@ -21,23 +21,23 @@ namespace DB
if (has_many_replicas)
{
pool_entries = pool_->getMany(settings);
active_connection_count = pool_entries.size();
supports_parallel_execution = (active_connection_count > 1);
active_replica_count = pool_entries.size();
supports_parallel_execution = (active_replica_count > 1);
if (active_connection_count == 0)
if (active_replica_count == 0)
throw Exception("No connection available", ErrorCodes::LOGICAL_ERROR);
replica_map.reserve(active_connection_count);
replica_map.reserve(active_replica_count);
for (auto & entry : pool_entries)
addConnection(&*entry);
registerReplica(&*entry);
}
else
{
active_connection_count = 1;
active_replica_count = 1;
supports_parallel_execution = false;
pool_entry = pool_->get(settings);
addConnection(&*pool_entry);
registerReplica(&*pool_entry);
}
}
@ -46,7 +46,7 @@ namespace DB
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
if (data.size() < active_connection_count)
if (data.size() < active_replica_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
@ -67,7 +67,7 @@ namespace DB
if (supports_parallel_execution)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = active_connection_count;
query_settings.parallel_replicas_count = active_replica_count;
UInt64 offset = 0;
for (auto & e : replica_map)
@ -100,10 +100,10 @@ namespace DB
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
if (!hasActiveConnections())
if (!hasActiveReplicas())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
auto it = getConnection();
auto it = getReplicaForReading();
if (it == replica_map.end())
throw Exception("No available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
@ -120,13 +120,13 @@ namespace DB
break;
case Protocol::Server::EndOfStream:
invalidateConnection(it);
invalidateReplica(it);
break;
case Protocol::Server::Exception:
default:
connection->disconnect();
invalidateConnection(it);
invalidateReplica(it);
break;
}
@ -141,7 +141,7 @@ namespace DB
if (connection != nullptr)
{
connection->disconnect();
invalidateConnection(it);
invalidateReplica(it);
}
}
}
@ -169,7 +169,7 @@ namespace DB
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (hasActiveConnections())
while (hasActiveReplicas())
{
Connection::Packet packet = receivePacket();
@ -211,7 +211,7 @@ namespace DB
return os.str();
}
void ParallelReplicas::addConnection(Connection * connection)
void ParallelReplicas::registerReplica(Connection * connection)
{
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR);
@ -220,7 +220,7 @@ namespace DB
throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR);
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getConnection()
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading()
{
ReplicaMap::iterator it;
@ -239,7 +239,7 @@ namespace DB
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_connection_count);
read_list.reserve(active_replica_count);
/// Сначала проверяем, есть ли данные, которые уже лежат в буфере
/// хоть одного соединения.
@ -272,9 +272,9 @@ namespace DB
return replica_map.find(socket.impl()->sockfd());
}
void ParallelReplicas::invalidateConnection(ParallelReplicas::ReplicaMap::iterator it)
void ParallelReplicas::invalidateReplica(ParallelReplicas::ReplicaMap::iterator it)
{
it->second = nullptr;
--active_connection_count;
--active_replica_count;
}
}