dbms: Server: fixed race conditions in ParallelReplicas [#METR-14410]

This commit is contained in:
Alexey Arno 2015-02-25 20:15:31 +03:00
parent 08eb0d0769
commit 30eee9b656
2 changed files with 119 additions and 94 deletions

View File

@ -3,6 +3,8 @@
#include <DB/Common/Throttler.h>
#include <DB/Client/Connection.h>
#include <DB/Client/ConnectionPool.h>
#include <Poco/ScopedLock.h>
#include <Poco/Mutex.h>
namespace DB
@ -49,19 +51,24 @@ public:
std::string dumpAddresses() const;
/// Возвращает количесто реплик.
/// Без блокировки, потому что sendCancel() не меняет это количество.
size_t size() const { return replica_map.size(); }
/// Проверить, есть ли действительные реплики.
/// Без блокировки, потому что sendCancel() не меняет состояние реплик.
bool hasActiveReplicas() const { return active_replica_count > 0; }
private:
/// Реплики хэшированные по id сокета
using ReplicaMap = std::unordered_map<int, Connection *>;
private:
/// Зарегистрировать реплику.
void registerReplica(Connection * connection);
/// Внутренняя версия функции receivePacket без блокировки.
Connection::Packet receivePacketUnlocked();
/// Получить реплику, на которой можно прочитать данные.
ReplicaMap::iterator getReplicaForReading();
@ -73,7 +80,7 @@ private:
/// Пометить реплику как недействительную.
void invalidateReplica(ReplicaMap::iterator it);
private:
Settings * settings;
ReplicaMap replica_map;
@ -91,6 +98,8 @@ private:
bool sent_query = false;
/// Отменили запрос
bool cancelled = false;
mutable Poco::FastMutex cancel_mutex;
};
}

View File

@ -43,6 +43,8 @@ ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_
void ParallelReplicas::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
@ -61,6 +63,8 @@ void ParallelReplicas::sendExternalTablesData(std::vector<ExternalTablesData> &
void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
@ -97,6 +101,110 @@ void ParallelReplicas::sendQuery(const String & query, const String & query_id,
}
Connection::Packet ParallelReplicas::receivePacket()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
return receivePacketUnlocked();
}
void ParallelReplicas::disconnect()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
for (auto it = replica_map.begin(); it != replica_map.end(); ++it)
{
Connection * connection = it->second;
if (connection != nullptr)
{
connection->disconnect();
invalidateReplica(it);
}
}
}
void ParallelReplicas::sendCancel()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ParallelReplicas::drain()
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (hasActiveReplicas())
{
Connection::Packet packet = receivePacketUnlocked();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::EndOfStream:
break;
case Protocol::Server::Exception:
default:
/// Если мы получили исключение или неизвестный пакет, сохраняем его.
res = packet;
break;
}
}
return res;
}
std::string ParallelReplicas::dumpAddresses() const
{
Poco::ScopedLock<Poco::FastMutex> lock(cancel_mutex);
bool is_first = true;
std::ostringstream os;
for (auto & e : replica_map)
{
const Connection * connection = e.second;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getServerAddress();
if (is_first) { is_first = false; }
}
}
return os.str();
}
void ParallelReplicas::registerReplica(Connection * connection)
{
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR);
auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
if (!res.second)
throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR);
if (throttler)
connection->setThrottler(throttler);
}
Connection::Packet ParallelReplicas::receivePacketUnlocked()
{
if (!sent_query)
throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
@ -133,98 +241,6 @@ Connection::Packet ParallelReplicas::receivePacket()
return packet;
}
void ParallelReplicas::disconnect()
{
for (auto it = replica_map.begin(); it != replica_map.end(); ++it)
{
Connection * connection = it->second;
if (connection != nullptr)
{
connection->disconnect();
invalidateReplica(it);
}
}
}
void ParallelReplicas::sendCancel()
{
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (auto & e : replica_map)
{
Connection * connection = e.second;
if (connection != nullptr)
connection->sendCancel();
}
cancelled = true;
}
Connection::Packet ParallelReplicas::drain()
{
if (!cancelled)
throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
Connection::Packet res;
res.type = Protocol::Server::EndOfStream;
while (hasActiveReplicas())
{
Connection::Packet packet = receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::EndOfStream:
break;
case Protocol::Server::Exception:
default:
/// Если мы получили исключение или неизвестный пакет, сохраняем его.
res = packet;
break;
}
}
return res;
}
std::string ParallelReplicas::dumpAddresses() const
{
bool is_first = true;
std::ostringstream os;
for (auto & e : replica_map)
{
const Connection * connection = e.second;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getServerAddress();
if (is_first) { is_first = false; }
}
}
return os.str();
}
void ParallelReplicas::registerReplica(Connection * connection)
{
if (connection == nullptr)
throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR);
auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection));
if (!res.second)
throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR);
if (throttler)
connection->setThrottler(throttler);
}
ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading()
{
ReplicaMap::iterator it;