ClickHouse/dbms/include/DB/Client/MultiplexedConnections.h

170 lines
7.6 KiB
C++
Raw Normal View History

#pragma once
#include <DB/Common/Throttler.h>
#include <DB/Client/Connection.h>
#include <DB/Client/ConnectionPool.h>
#include <Poco/ScopedLock.h>
#include <mutex>
namespace DB
{
2015-11-06 17:44:01 +00:00
/** Для получения данных сразу из нескольких реплик (соединений) из одного или нексольких шардов
* в рамках одного потока. В качестве вырожденного случая, может также работать с одним соединением.
* Предполагается, что все функции кроме sendCancel всегда выполняются в одном потоке.
*
* Интерфейс почти совпадает с Connection.
*/
2015-11-06 17:44:01 +00:00
class MultiplexedConnections final : private boost::noncopyable
{
public:
/// Принимает готовое соединение.
2015-11-06 17:44:01 +00:00
MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_);
2015-10-12 14:53:16 +00:00
/** Принимает пул, из которого нужно будет достать одно или несколько соединений.
* Если флаг append_extra_info установлен, к каждому полученному блоку прилагается
* дополнительная информация.
* Если флаг get_all_replicas установлен, достаются все соединения.
*/
2015-11-06 17:44:01 +00:00
MultiplexedConnections(IConnectionPool * pool_, const Settings * settings_, ThrottlerPtr throttler_,
2016-03-01 17:47:53 +00:00
bool append_extra_info = false, PoolMode pool_mode_ = PoolMode::GET_MANY);
2015-11-06 17:44:01 +00:00
/** Принимает пулы, один для каждого шарда, из которих нужно будет достать одно или несколько
* соединений.
* Если флаг append_extra_info установлен, к каждому полученному блоку прилагается
* дополнительная информация.
* Если флаг do_broadcast установлен, достаются все соединения.
*/
MultiplexedConnections(ConnectionPools & pools_, const Settings * settings_, ThrottlerPtr throttler_,
2016-03-01 17:47:53 +00:00
bool append_extra_info = false, PoolMode pool_mode_ = PoolMode::GET_MANY);
/// Отправить на реплики всё содержимое внешних таблиц.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
/// Отправить запрос на реплики.
void sendQuery(
const String & query,
const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete,
const ClientInfo * client_info = nullptr,
bool with_pending_data = false);
/// Получить пакет от какой-нибудь реплики.
Connection::Packet receivePacket();
2015-10-12 14:53:16 +00:00
/// Получить информацию про последний полученный пакет.
BlockExtraInfo getBlockExtraInfo() const;
/// Разорвать все действующие соединения.
void disconnect();
2015-03-06 01:06:11 +00:00
/// Отправить на реплики просьбу отменить выполнение запроса
void sendCancel();
/** На каждой реплике читать и пропускать все пакеты до EndOfStream или Exception.
* Возвращает EndOfStream, если не было получено никакого исключения. В противном
* случае возвращает последний полученный пакет типа Exception.
*/
Connection::Packet drain();
/// Получить адреса реплик в виде строки.
std::string dumpAddresses() const;
/// Возвращает количесто реплик.
/// Без блокировки, потому что sendCancel() не меняет это количество.
size_t size() const { return replica_map.size(); }
/// Проверить, есть ли действительные реплики.
/// Без блокировки, потому что sendCancel() не меняет состояние реплик.
2015-11-06 17:44:01 +00:00
bool hasActiveConnections() const { return active_connection_total_count > 0; }
private:
2015-11-06 17:44:01 +00:00
/// Соединения 1-го шарда, затем соединения 2-го шарда, и т.д.
using Connections = std::vector<Connection *>;
/// Состояние соединений одного шарда.
struct ShardState
{
/// Количество выделенных соединений, т.е. реплик, для этого шарда.
size_t allocated_connection_count;
/// Текущее количество действительных соединений к репликам этого шарда.
size_t active_connection_count;
};
/// Описание одной реплики.
struct ReplicaState
{
/// Индекс соединения.
size_t connection_index;
/// Владелец этой реплики.
ShardState * shard_state;
};
/// Реплики хэшированные по id сокета.
using ReplicaMap = std::unordered_map<int, ReplicaState>;
/// Состояние каждого шарда.
using ShardStates = std::vector<ShardState>;
private:
2015-11-06 17:44:01 +00:00
void initFromShard(IConnectionPool * pool);
/// Зарегистрировать шарды.
void registerShards();
/// Зарегистрировать реплики одного шарда.
void registerReplicas(size_t index_begin, size_t index_end, ShardState & shard_state);
/// Внутренняя версия функции receivePacket без блокировки.
Connection::Packet receivePacketUnlocked();
2015-09-17 20:00:19 +00:00
/// Внутренняя версия функции dumpAddresses без блокировки.
std::string dumpAddressesUnlocked() const;
/// Получить реплику, на которой можно прочитать данные.
ReplicaMap::iterator getReplicaForReading();
/** Проверить, есть ли данные, которые можно прочитать на каких-нибудь репликах.
* Возвращает одну такую реплику, если она найдётся.
*/
ReplicaMap::iterator waitForReadEvent();
/// Пометить реплику как недействительную.
void invalidateReplica(ReplicaMap::iterator it);
private:
const Settings * settings;
2015-11-06 17:44:01 +00:00
Connections connections;
ReplicaMap replica_map;
2015-11-06 17:44:01 +00:00
ShardStates shard_states;
/// Если не nullptr, то используется, чтобы ограничить сетевой трафик.
ThrottlerPtr throttler;
std::vector<ConnectionPool::Entry> pool_entries;
2015-10-12 14:53:16 +00:00
/// Соединение, c которого был получен последний блок.
Connection * current_connection;
/// Информация про последний полученный блок, если поддерживается.
std::unique_ptr<BlockExtraInfo> block_extra_info;
/// Текущее количество действительных соединений к репликам.
2015-11-06 17:44:01 +00:00
size_t active_connection_total_count = 0;
/// Запрос выполняется параллельно на нескольких репликах.
bool supports_parallel_execution;
/// Отправили запрос
bool sent_query = false;
/// Отменили запрос
bool cancelled = false;
2016-03-01 17:47:53 +00:00
PoolMode pool_mode = PoolMode::GET_MANY;
2015-11-06 17:44:01 +00:00
/// Мьютекс для того, чтобы функция sendCancel могла выполняться безопасно
/// в отдельном потоке.
mutable std::mutex cancel_mutex;
};
}