ClickHouse/dbms/include/DB/Interpreters/Cluster.h

162 lines
5.6 KiB
C++
Raw Normal View History

2014-02-22 18:53:42 +00:00
#pragma once
2013-12-07 16:51:29 +00:00
#include <map>
#include <DB/Interpreters/Settings.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/ConnectionPoolWithFailover.h>
#include <Poco/Net/SocketAddress.h>
namespace DB
{
2015-05-28 21:41:28 +00:00
2013-12-07 16:51:29 +00:00
/// Cluster содержит пулы соединений до каждого из узлов
/// С локальными узлами соединение не устанавливается, а выполяется запрос напрямую.
/// Поэтому храним только количество локальных узлов
/// В конфиге кластер включает в себя узлы <node> или <shard>
class Cluster
2013-12-07 16:51:29 +00:00
{
public:
Cluster(const Settings & settings, const String & cluster_name);
2013-12-07 16:51:29 +00:00
2015-01-13 00:56:43 +00:00
/// Построить кластер по именам шардов и реплик. Локальные обрабатываются так же как удаленные.
Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
const String & username, const String & password);
Cluster(const Cluster &) = delete;
Cluster & operator=(const Cluster &) = delete;
2013-12-07 16:51:29 +00:00
2015-04-30 12:43:16 +00:00
/// используеться для выставления ограничения на размер таймаута
static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit);
public:
2013-12-07 16:51:29 +00:00
struct Address
{
/** In configuration file,
* addresses are located either in <node> elements:
2013-12-07 16:51:29 +00:00
* <node>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, <default_database> if needed -->
2013-12-07 16:51:29 +00:00
* </node>
* ...
* or in <shard> and inside in <replica> elements:
2013-12-07 16:51:29 +00:00
* <shard>
* <replica>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, <default_database> if needed -->
2013-12-10 09:35:30 +00:00
* </replica>
2013-12-07 16:51:29 +00:00
* </shard>
*/
2015-05-28 21:41:28 +00:00
Poco::Net::SocketAddress resolved_address;
String host_name;
2015-05-28 21:41:28 +00:00
UInt16 port;
2013-12-07 16:51:29 +00:00
String user;
String password;
String default_database; /// this database is selected when no database is specified for Distributed table
2015-04-30 12:43:16 +00:00
UInt32 replica_num;
2013-12-07 16:51:29 +00:00
Address(const String & config_prefix);
Address(const String & host_port_, const String & user_, const String & password_);
2013-12-07 16:51:29 +00:00
};
using Addresses = std::vector<Address>;
using AddressesWithFailover = std::vector<Addresses>;
struct ShardInfo
{
public:
bool isLocal() const { return !local_addresses.empty(); }
bool hasRemoteConnections() const { return pool.get() != nullptr; }
size_t getLocalNodeCount() const { return local_addresses.size(); }
public:
/// contains names of directories for asynchronous write to StorageDistributed
std::vector<std::string> dir_names;
UInt32 shard_num; /// Номер шарда, начиная с 1.
int weight;
Addresses local_addresses;
mutable ConnectionPoolPtr pool;
};
using ShardsInfo = std::vector<ShardInfo>;
2013-12-07 16:51:29 +00:00
String getHashOfAddresses() const { return hash_of_addresses; }
const ShardsInfo & getShardsInfo() const { return shards_info; }
const Addresses & getShardsAddresses() const { return addresses; }
const AddressesWithFailover & getShardsWithFailoverAddresses() const { return addresses_with_failover; }
2013-12-07 16:51:29 +00:00
const ShardInfo & getAnyShardInfo() const
{
if (shards_info.empty())
throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR);
return shards_info.front();
}
/// Количество удалённых шардов.
size_t getRemoteShardCount() const { return remote_shard_count; }
2013-12-07 16:51:29 +00:00
/// Количество узлов clickhouse сервера, расположенных локально
/// к локальным узлам обращаемся напрямую.
size_t getLocalShardCount() const { return local_shard_count; }
/// Количество всех шардов.
size_t getShardCount() const { return shards_info.size(); }
/// Получить подкластер, состоящий из одного шарда - index по счёту (с нуля) шарда данного кластера.
std::unique_ptr<Cluster> getClusterWithSingleShard(size_t index) const;
private:
using SlotToShard = std::vector<size_t>;
SlotToShard slot_to_shard;
public:
const SlotToShard & getSlotToShard() const { return slot_to_shard; }
2015-04-30 12:43:16 +00:00
private:
void initMisc();
/// Hash list of addresses and ports.
/// We need it in order to be able to perform resharding requests
/// on tables that have the distributed engine.
void calculateHashOfAddresses();
/// Для реализации getClusterWithSingleShard.
Cluster(const Cluster & from, size_t index);
String hash_of_addresses;
/// Описание шардов кластера.
ShardsInfo shards_info;
/// Любой удалённый шард.
ShardInfo * any_remote_shard_info = nullptr;
/// Непустым является либо addresses, либо addresses_with_failover.
/// Размер и порядок элементов в соответствующем массиве соответствует shards_info.
/// Массив шардов. Каждый шард - адреса одного сервера.
2013-12-07 16:51:29 +00:00
Addresses addresses;
/// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными).
2013-12-07 16:51:29 +00:00
AddressesWithFailover addresses_with_failover;
size_t remote_shard_count = 0;
size_t local_shard_count = 0;
2013-12-07 16:51:29 +00:00
};
2016-03-04 02:40:48 +00:00
2016-03-01 17:47:53 +00:00
class Clusters
2013-12-07 16:51:29 +00:00
{
2016-03-01 17:47:53 +00:00
public:
Clusters(const Settings & settings, const String & config_name = "remote_servers");
2016-03-01 17:47:53 +00:00
Clusters(const Clusters &) = delete;
Clusters & operator=(const Clusters &) = delete;
public:
using Impl = std::map<String, Cluster>;
public:
Impl impl;
2013-12-07 16:51:29 +00:00
};
2013-12-07 16:51:29 +00:00
}