#pragma once #include #include #include #include #include namespace DB { /// Cluster contains connection pools to each node /// With the local nodes, the connection is not established, but the request is executed directly. /// Therefore we store only the number of local nodes /// In the config, the cluster includes nodes or class Cluster { public: Cluster(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & cluster_name); /// Construct a cluster by the names of shards and replicas. Local are treated as well as remote ones. /// 'clickhouse_port' - port that this server instance listen for queries. /// This parameter is needed only to check that some address is local (points to ourself). Cluster(const Settings & settings, const std::vector> & names, const String & username, const String & password, UInt16 clickhouse_port); Cluster(const Cluster &) = delete; Cluster & operator=(const Cluster &) = delete; /// is used to set a limit on the size of the timeout static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit); public: struct Address { /** In configuration file, * addresses are located either in elements: * * example01-01-1 * 9000 * * * ... * or in and inside in elements: * * * example01-01-1 * 9000 * * * */ Poco::Net::SocketAddress resolved_address; String host_name; UInt16 port; String user; String password; String default_database; /// this database is selected when no database is specified for Distributed table UInt32 replica_num; bool is_local; Address() = default; Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix); Address(const String & host_port_, const String & user_, const String & password_, UInt16 clickhouse_port); /// Returns 'escaped_host_name:port' String toString() const; /// Returns 'host_name:port' String readableString() const; static String toString(const String & host_name, UInt16 port); static void fromString(const String & host_port_string, String & host_name, UInt16 & port); /// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database String toStringFull() const; }; using Addresses = std::vector
; using AddressesWithFailover = std::vector; struct ShardInfo { public: bool isLocal() const { return !local_addresses.empty(); } bool hasRemoteConnections() const { return pool != nullptr; } size_t getLocalNodeCount() const { return local_addresses.size(); } bool hasInternalReplication() const { return has_internal_replication; } public: /// Name of directory for asynchronous write to StorageDistributed if has_internal_replication std::string dir_name_for_internal_replication; /// Number of the shard, the indexation begins with 1 UInt32 shard_num; UInt32 weight; Addresses local_addresses; /// nullptr if there are no remote addresses ConnectionPoolWithFailoverPtr pool; bool has_internal_replication; }; using ShardsInfo = std::vector; String getHashOfAddresses() const { return hash_of_addresses; } const ShardsInfo & getShardsInfo() const { return shards_info; } const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; } const ShardInfo & getAnyShardInfo() const { if (shards_info.empty()) throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR); return shards_info.front(); } /// The number of remote shards. size_t getRemoteShardCount() const { return remote_shard_count; } /// The number of clickhouse nodes located locally /// we access the local nodes directly. size_t getLocalShardCount() const { return local_shard_count; } /// The number of all shards. size_t getShardCount() const { return shards_info.size(); } /// Get a subcluster consisting of one shard - index by count (from 0) of the shard of this cluster. std::unique_ptr getClusterWithSingleShard(size_t index) const; private: using SlotToShard = std::vector; SlotToShard slot_to_shard; public: const SlotToShard & getSlotToShard() const { return slot_to_shard; } private: void initMisc(); /// Hash list of addresses and ports. /// We need it in order to be able to perform resharding requests /// on tables that have the distributed engine. void calculateHashOfAddresses(); /// For getClusterWithSingleShard implementation. Cluster(const Cluster & from, size_t index); String hash_of_addresses; /// Description of the cluster shards. ShardsInfo shards_info; /// Any remote shard. ShardInfo * any_remote_shard_info = nullptr; /// Non-empty is either addresses or addresses_with_failover. /// The size and order of the elements in the corresponding array corresponds to shards_info. /// An array of shards. For each shard, an array of replica addresses (servers that are considered identical). AddressesWithFailover addresses_with_failover; size_t remote_shard_count = 0; size_t local_shard_count = 0; }; using ClusterPtr = std::shared_ptr; class Clusters { public: Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name = "remote_servers"); Clusters(const Clusters &) = delete; Clusters & operator=(const Clusters &) = delete; ClusterPtr getCluster(const std::string & cluster_name) const; void updateClusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name); public: using Impl = std::map; Impl getContainer() const; protected: Impl impl; mutable std::mutex mutex; }; using ClustersPtr = std::shared_ptr; }