diff --git a/dbms/include/DB/Client/ParallelReplicas.h b/dbms/include/DB/Client/ParallelReplicas.h index 2bccdf3833f..cee1df79d60 100644 --- a/dbms/include/DB/Client/ParallelReplicas.h +++ b/dbms/include/DB/Client/ParallelReplicas.h @@ -3,6 +3,8 @@ #include #include #include +#include +#include namespace DB @@ -49,19 +51,24 @@ public: std::string dumpAddresses() const; /// Возвращает количесто реплик. + /// Без блокировки, потому что sendCancel() не меняет это количество. size_t size() const { return replica_map.size(); } /// Проверить, есть ли действительные реплики. + /// Без блокировки, потому что sendCancel() не меняет состояние реплик. bool hasActiveReplicas() const { return active_replica_count > 0; } private: /// Реплики хэшированные по id сокета using ReplicaMap = std::unordered_map; - +private: /// Зарегистрировать реплику. void registerReplica(Connection * connection); + /// Внутренняя версия функции receivePacket без блокировки. + Connection::Packet receivePacketUnlocked(); + /// Получить реплику, на которой можно прочитать данные. ReplicaMap::iterator getReplicaForReading(); @@ -73,7 +80,7 @@ private: /// Пометить реплику как недействительную. void invalidateReplica(ReplicaMap::iterator it); - +private: Settings * settings; ReplicaMap replica_map; @@ -91,6 +98,8 @@ private: bool sent_query = false; /// Отменили запрос bool cancelled = false; + + mutable Poco::FastMutex cancel_mutex; }; } diff --git a/dbms/src/Client/ParallelReplicas.cpp b/dbms/src/Client/ParallelReplicas.cpp index 1fdc53b5314..7f923bf8f45 100644 --- a/dbms/src/Client/ParallelReplicas.cpp +++ b/dbms/src/Client/ParallelReplicas.cpp @@ -43,6 +43,8 @@ ParallelReplicas::ParallelReplicas(IConnectionPool * pool_, Settings * settings_ void ParallelReplicas::sendExternalTablesData(std::vector & data) { + Poco::ScopedLock lock(cancel_mutex); + if (!sent_query) throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR); @@ -61,6 +63,8 @@ void ParallelReplicas::sendExternalTablesData(std::vector & void ParallelReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data) { + Poco::ScopedLock lock(cancel_mutex); + if (sent_query) throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); @@ -97,6 +101,110 @@ void ParallelReplicas::sendQuery(const String & query, const String & query_id, } Connection::Packet ParallelReplicas::receivePacket() +{ + Poco::ScopedLock lock(cancel_mutex); + return receivePacketUnlocked(); +} + +void ParallelReplicas::disconnect() +{ + Poco::ScopedLock lock(cancel_mutex); + + for (auto it = replica_map.begin(); it != replica_map.end(); ++it) + { + Connection * connection = it->second; + if (connection != nullptr) + { + connection->disconnect(); + invalidateReplica(it); + } + } +} + +void ParallelReplicas::sendCancel() +{ + Poco::ScopedLock lock(cancel_mutex); + + if (!sent_query || cancelled) + throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); + + for (auto & e : replica_map) + { + Connection * connection = e.second; + if (connection != nullptr) + connection->sendCancel(); + } + + cancelled = true; +} + +Connection::Packet ParallelReplicas::drain() +{ + Poco::ScopedLock lock(cancel_mutex); + + if (!cancelled) + throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR); + + Connection::Packet res; + res.type = Protocol::Server::EndOfStream; + + while (hasActiveReplicas()) + { + Connection::Packet packet = receivePacketUnlocked(); + + switch (packet.type) + { + case Protocol::Server::Data: + case Protocol::Server::Progress: + case Protocol::Server::ProfileInfo: + case Protocol::Server::Totals: + case Protocol::Server::Extremes: + case Protocol::Server::EndOfStream: + break; + + case Protocol::Server::Exception: + default: + /// Если мы получили исключение или неизвестный пакет, сохраняем его. + res = packet; + break; + } + } + + return res; +} + +std::string ParallelReplicas::dumpAddresses() const +{ + Poco::ScopedLock lock(cancel_mutex); + + bool is_first = true; + std::ostringstream os; + for (auto & e : replica_map) + { + const Connection * connection = e.second; + if (connection != nullptr) + { + os << (is_first ? "" : "; ") << connection->getServerAddress(); + if (is_first) { is_first = false; } + } + } + + return os.str(); +} + +void ParallelReplicas::registerReplica(Connection * connection) +{ + if (connection == nullptr) + throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); + auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); + if (!res.second) + throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); + + if (throttler) + connection->setThrottler(throttler); +} + +Connection::Packet ParallelReplicas::receivePacketUnlocked() { if (!sent_query) throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR); @@ -133,98 +241,6 @@ Connection::Packet ParallelReplicas::receivePacket() return packet; } -void ParallelReplicas::disconnect() -{ - for (auto it = replica_map.begin(); it != replica_map.end(); ++it) - { - Connection * connection = it->second; - if (connection != nullptr) - { - connection->disconnect(); - invalidateReplica(it); - } - } -} - -void ParallelReplicas::sendCancel() -{ - if (!sent_query || cancelled) - throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR); - - for (auto & e : replica_map) - { - Connection * connection = e.second; - if (connection != nullptr) - connection->sendCancel(); - } - - cancelled = true; -} - -Connection::Packet ParallelReplicas::drain() -{ - if (!cancelled) - throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR); - - Connection::Packet res; - res.type = Protocol::Server::EndOfStream; - - while (hasActiveReplicas()) - { - Connection::Packet packet = receivePacket(); - - switch (packet.type) - { - case Protocol::Server::Data: - case Protocol::Server::Progress: - case Protocol::Server::ProfileInfo: - case Protocol::Server::Totals: - case Protocol::Server::Extremes: - case Protocol::Server::EndOfStream: - break; - - case Protocol::Server::Exception: - default: - /// Если мы получили исключение или неизвестный пакет, сохраняем его. - res = packet; - break; - } - } - - return res; -} - -std::string ParallelReplicas::dumpAddresses() const -{ - bool is_first = true; - std::ostringstream os; - for (auto & e : replica_map) - { - const Connection * connection = e.second; - if (connection != nullptr) - { - os << (is_first ? "" : "; ") << connection->getServerAddress(); - if (is_first) { is_first = false; } - } - } - - return os.str(); -} - - -void ParallelReplicas::registerReplica(Connection * connection) -{ - if (connection == nullptr) - throw Exception("Invalid connection specified in parameter.", ErrorCodes::LOGICAL_ERROR); - auto res = replica_map.insert(std::make_pair(connection->socket.impl()->sockfd(), connection)); - if (!res.second) - throw Exception("Invalid set of connections.", ErrorCodes::LOGICAL_ERROR); - - if (throttler) - connection->setThrottler(throttler); -} - - ParallelReplicas::ReplicaMap::iterator ParallelReplicas::getReplicaForReading() { ReplicaMap::iterator it;