ClickHouse/dbms/src/Interpreters/Cluster.h

212 lines
7.6 KiB
C++
Raw Normal View History

2014-02-22 18:53:42 +00:00
#pragma once
2013-12-07 16:51:29 +00:00
#include <map>
#include <Interpreters/Settings.h>
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
2013-12-07 16:51:29 +00:00
#include <Poco/Net/SocketAddress.h>
namespace DB
{
2015-05-28 21:41:28 +00:00
2017-06-02 21:37:28 +00:00
/// Cluster contains connection pools to each node
/// With the local nodes, the connection is not established, but the request is executed directly.
/// Therefore we store only the number of local nodes
/// In the config, the cluster includes nodes <node> or <shard>
class Cluster
2013-12-07 16:51:29 +00:00
{
public:
Cluster(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name);
2013-12-07 16:51:29 +00:00
2017-12-01 17:13:14 +00:00
/// Construct a cluster by the names of shards and replicas.
/// Local are treated as well as remote ones if treat_local_as_remote is true.
2017-09-07 17:55:02 +00:00
/// 'clickhouse_port' - port that this server instance listen for queries.
/// This parameter is needed only to check that some address is local (points to ourself).
Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
2017-12-01 17:13:14 +00:00
const String & username, const String & password,
2019-01-17 17:55:44 +00:00
UInt16 clickhouse_port, bool treat_local_as_remote, bool secure = false);
Cluster(const Cluster &) = delete;
Cluster & operator=(const Cluster &) = delete;
2013-12-07 16:51:29 +00:00
2017-06-02 21:37:28 +00:00
/// is used to set a limit on the size of the timeout
static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit);
2015-04-30 12:43:16 +00:00
public:
struct Address
{
/** In configuration file,
* addresses are located either in <node> elements:
* <node>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, <default_database> if needed -->
* </node>
* ...
* or in <shard> and inside in <replica> elements:
* <shard>
* <replica>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, <default_database>. <secure> if needed -->
* </replica>
* </shard>
*/
String host_name;
UInt16 port;
String user;
String password;
/// This database is selected when no database is specified for Distributed table
String default_database;
/// The locality is determined at the initialization, and is not changed even if DNS is changed
bool is_local;
2018-12-28 17:11:52 +00:00
bool user_specified = false;
Protocol::Compression compression = Protocol::Compression::Enable;
Protocol::Secure secure = Protocol::Secure::Disable;
Address() = default;
Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
2019-01-17 17:55:44 +00:00
Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port, bool secure_ = false);
2017-07-28 16:14:49 +00:00
/// Returns 'escaped_host_name:port'
String toString() const;
2017-07-28 16:14:49 +00:00
/// Returns 'host_name:port'
String readableString() const;
static String toString(const String & host_name, UInt16 port);
static void fromString(const String & host_port_string, String & host_name, UInt16 & port);
/// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
String toStringFull() const;
2018-12-02 02:17:08 +00:00
static void fromFullString(const String & address_full_string, Address & address);
/// Returns initially resolved address
Poco::Net::SocketAddress getResolvedAddress() const
{
return initially_resolved_address;
}
2018-12-02 02:17:08 +00:00
bool operator==(const Address & other) const;
private:
Poco::Net::SocketAddress initially_resolved_address;
};
using Addresses = std::vector<Address>;
using AddressesWithFailover = std::vector<Addresses>;
struct ShardInfo
{
public:
bool isLocal() const { return !local_addresses.empty(); }
bool hasRemoteConnections() const { return local_addresses.size() != per_replica_pools.size(); }
size_t getLocalNodeCount() const { return local_addresses.size(); }
bool hasInternalReplication() const { return has_internal_replication; }
public:
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
std::string dir_name_for_internal_replication;
2017-04-25 15:21:03 +00:00
/// Number of the shard, the indexation begins with 1
UInt32 shard_num;
UInt32 weight;
Addresses local_addresses;
/// nullptr if there are no remote addresses
ConnectionPoolWithFailoverPtr pool;
/// Connection pool for each replica, contains nullptr for local replicas
ConnectionPoolPtrs per_replica_pools;
2018-09-01 20:28:46 +00:00
bool has_internal_replication = false;
};
using ShardsInfo = std::vector<ShardInfo>;
String getHashOfAddresses() const { return hash_of_addresses; }
const ShardsInfo & getShardsInfo() const { return shards_info; }
const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; }
const ShardInfo & getAnyShardInfo() const
{
if (shards_info.empty())
throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR);
return shards_info.front();
}
2017-06-02 21:37:28 +00:00
/// The number of remote shards.
size_t getRemoteShardCount() const { return remote_shard_count; }
2017-06-02 21:37:28 +00:00
/// The number of clickhouse nodes located locally
/// we access the local nodes directly.
size_t getLocalShardCount() const { return local_shard_count; }
2017-06-02 21:37:28 +00:00
/// The number of all shards.
size_t getShardCount() const { return shards_info.size(); }
2017-06-02 21:37:28 +00:00
/// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster.
std::unique_ptr<Cluster> getClusterWithSingleShard(size_t index) const;
/// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster.
2018-11-21 04:04:24 +00:00
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
private:
using SlotToShard = std::vector<UInt64>;
SlotToShard slot_to_shard;
public:
const SlotToShard & getSlotToShard() const { return slot_to_shard; }
2015-04-30 12:43:16 +00:00
private:
void initMisc();
/// For getClusterWithMultipleShards implementation.
2018-11-21 04:04:53 +00:00
Cluster(const Cluster & from, const std::vector<size_t> & indices);
String hash_of_addresses;
2017-06-02 21:37:28 +00:00
/// Description of the cluster shards.
ShardsInfo shards_info;
2017-06-02 21:37:28 +00:00
/// Any remote shard.
ShardInfo * any_remote_shard_info = nullptr;
2017-06-02 21:37:28 +00:00
/// Non-empty is either addresses or addresses_with_failover.
/// The size and order of the elements in the corresponding array corresponds to shards_info.
2017-06-02 21:37:28 +00:00
/// An array of shards. For each shard, an array of replica addresses (servers that are considered identical).
AddressesWithFailover addresses_with_failover;
2013-12-07 16:51:29 +00:00
size_t remote_shard_count = 0;
size_t local_shard_count = 0;
2013-12-07 16:51:29 +00:00
};
using ClusterPtr = std::shared_ptr<Cluster>;
2016-03-04 02:40:48 +00:00
2016-03-01 17:47:53 +00:00
class Clusters
2013-12-07 16:51:29 +00:00
{
2016-03-01 17:47:53 +00:00
public:
Clusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers");
2016-03-01 17:47:53 +00:00
Clusters(const Clusters &) = delete;
Clusters & operator=(const Clusters &) = delete;
2016-03-01 17:47:53 +00:00
ClusterPtr getCluster(const std::string & cluster_name) const;
void setCluster(const String & cluster_name, const ClusterPtr & cluster);
void updateClusters(const Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name);
2016-03-01 17:47:53 +00:00
public:
using Impl = std::map<String, ClusterPtr>;
Impl getContainer() const;
protected:
Impl impl;
mutable std::mutex mutex;
2013-12-07 16:51:29 +00:00
};
using ClustersPtr = std::shared_ptr<Clusters>;
2013-12-07 16:51:29 +00:00
}